Python 操作 Rabbit MQ 路由 (六)
一、路由(Routing):
本章打算新增加一个功能,使它可以达到仅订阅消息的一个子集。
举个栗子,我们需要把验证的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中。
二、绑定(Bindings):
绑定(Binding)是指交换机(Exchange)和队列(Queue)的关系
;
绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做
绑定键(Binding Key)
。
channel
.
queue_bind
(
exchange
=
exchange_name
,
queue
=
queue_name
,
routing_key
=
'fe_cow'
)
注意 :
-
绑定键的意义取决于交换机的类型,上一篇使用的
扇形交换机会忽略这个值
。
三、直连交换机(Direct Exchange):
打算扩展上一篇的功能,使其基于日志的严重程度进行消息过滤。比如把一些比较严重的错误日志写入磁盘,以免将警告或信息日志上浪费磁盘空间,仅记录比较严重的错误。
上一篇我们使用的扇形交换机,没有足够的灵活性,仅能做广播的需求;
直连交换机的路由算法很简单易懂,
交换机将会对绑定键和路由键进行精准匹配,从而确定消息该分发到哪个队列
;
图解 :
详解 :可以看到type=direct,指定交换机的类型为直连交换机,它和两个队列都进行了绑定。第一个队列(Q1)使用orange作为绑定键,第二个队列(Q2)有两个绑定键,一个是black作为绑定键,另一个是green绑定键。
简单理解 :当路由键为orange的消息发布到交换机,就会被路由到队列Q1中,路由键为black或green的消息发布到交换机,就会被路由到队列Q2中,其他的所有消息都将被丢弃。
四、多个绑定:
图解 :
详解 :多个队列使用相同的绑定键是合法的。这样一来,直连交换机就和扇形交换机的行为一样,会将消息广播到所有匹配的队列(Q1和Q2)。
五、发送日志:
将发送消息到一个直连的交换机,把日志级别作为路由键,这样接收日志的脚本可根据严重的级别来选择它想要处理的日志,我们假设
severity的值是info、warning、error中的一个
。
创建直连交换机:
# 交换机名为
:
direct_logs 类型为
:
直连交换机
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
type
=
'direct'
)
发送消息:
# 交换机名称为
:
direct_logs
channel
.
basic_publish
(
exchange
=
'direct_logs'
,
routing_key
=
severity
,
body
=
message
)
六、订阅:
处理接收消息的方式跟之前不一样,将会为每个严重级别分别创建一个新的绑定。
# 表示与消费者断开连接
,
队列立即删除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成队列的名字
queue_name
=
result
.
method
.
queue
for
severity
in
severities
:
channel
.
queue_bind
(
exchange
=
'direct_logs'
,
queue
=
queue_name
,
routing_key
=
severity
)
七、整理本节代码:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
severity
=
sys
.
argv
[
1
]
if
len
(
sys
.
argv
)
>
1
else
'info'
message
=
' '
.
join
(
sys
.
argv
[
2
:
]
)
or
"Hello World!"
# 创建一个实例 本地访问
IP
地址可以为 localhost
后面5672是端口地址
(
可
>
以不用指定
,
因为默认就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 声明一个管道
,
在管道里发送消息
channel
=
connection
.
channel
(
)
# 指定交换机的类型为direct
:
执行交换机 交换机名称
:
direct_logs
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
exchange_type
=
'direct'
)
# 投递消息
channel
.
basic_publish
(
exchange
=
'direct_logs'
,
routing_key
=
severity
,
body
=
message
)
print
"[x] sent {}"
.
format
(
severity
,
message
)
# 队列关闭
connection
.
close
(
)
2.以下是
receive.py
代码:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
# 创建实例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 声明管道
channel
=
connection
.
channel
(
)
# 指定交换机名为 direct_logs 交换机类型为
:
direct
channel
.
exchange_declare
(
exchange
=
'direct_logs'
,
exchange_type
=
'direct'
)
# 表示与消费者断开连接
,
队列立即删除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成队列的名字
queue_name
=
result
.
method
.
queue
severities
=
sys
.
argv
[
1
:
]
if
not severities
:
print
>>
sys
.
stderr
,
"Usage: %s [info] [warning] [error]"
%
\
(
sys
.
argv
[
0
]
,
)
sys
.
exit
(
1
)
for
severitie
in
severities
:
# 绑定交换机和队列 这里注意的是绑定键
,
就是根据按照指定严重级别进行记录日志
channel
.
queue_bind
(
exchange
=
'direct_logs'
,
queue
=
queue_name
,
routing_key
=
severitie
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消费消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 从指定的消息队列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就调用callback函数来处理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 开始消费消息
3.仅希望保存
warning和error级别
日志到磁盘中,需要打开控制台并输入:
python receive
.
py warning error
>
logs_from_rabbit
.
log
4.希望所有日志都输出到屏幕中,打开一个新的终端,输入:
python receive
.
py info warning error
5.要触发一个
error
级别的日志,需要输入:
python send
.
py error
'发送一个error级别的错误'
# 可以看到步骤
3
的控制台
,
会出现
:
===
===
=
正在等待消息
===
===
==
[
X
]
Received发送一个error级别的错误
简单理解:通过绑定键的名称,来进行由哪个队列进行处理