Python 操作 Rabbit MQ 路由 (六)

系统 1835 0

Python 操作 Rabbit MQ 路由 (六)

一、路由(Routing):

本章打算新增加一个功能,使它可以达到仅订阅消息的一个子集。

举个栗子,我们需要把验证的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中。

二、绑定(Bindings):

绑定(Binding)是指交换机(Exchange)和队列(Queue)的关系 ;

绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做 绑定键(Binding Key)

            
              channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              exchange_name
              
                ,
              
              
                   queue
              
                =
              
              queue_name
              
                ,
              
              
                   routing_key
              
                =
              
              
                'fe_cow'
              
              
                )
              
            
          

注意

  • 绑定键的意义取决于交换机的类型,上一篇使用的 扇形交换机会忽略这个值

三、直连交换机(Direct Exchange):

打算扩展上一篇的功能,使其基于日志的严重程度进行消息过滤。比如把一些比较严重的错误日志写入磁盘,以免将警告或信息日志上浪费磁盘空间,仅记录比较严重的错误。

上一篇我们使用的扇形交换机,没有足够的灵活性,仅能做广播的需求;

直连交换机的路由算法很简单易懂, 交换机将会对绑定键和路由键进行精准匹配,从而确定消息该分发到哪个队列

图解

Python 操作 Rabbit MQ 路由 (六)_第1张图片

详解 :可以看到type=direct,指定交换机的类型为直连交换机,它和两个队列都进行了绑定。第一个队列(Q1)使用orange作为绑定键,第二个队列(Q2)有两个绑定键,一个是black作为绑定键,另一个是green绑定键。

简单理解 :当路由键为orange的消息发布到交换机,就会被路由到队列Q1中,路由键为black或green的消息发布到交换机,就会被路由到队列Q2中,其他的所有消息都将被丢弃。

四、多个绑定:

图解

Python 操作 Rabbit MQ 路由 (六)_第2张图片

详解 :多个队列使用相同的绑定键是合法的。这样一来,直连交换机就和扇形交换机的行为一样,会将消息广播到所有匹配的队列(Q1和Q2)。

五、发送日志:

将发送消息到一个直连的交换机,把日志级别作为路由键,这样接收日志的脚本可根据严重的级别来选择它想要处理的日志,我们假设 severity的值是info、warning、error中的一个

创建直连交换机:

            
              # 交换机名为
              
                :
              
              direct_logs  类型为
              
                :
              
              直连交换机
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               type
              
                =
              
              
                'direct'
              
              
                )
              
            
          

发送消息:

            
              # 交换机名称为
              
                :
              
              direct_logs
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               routing_key
              
                =
              
              severity
              
                ,
              
               body
              
                =
              
              message
              
                )
              
            
          

六、订阅:

处理接收消息的方式跟之前不一样,将会为每个严重级别分别创建一个新的绑定。

            
              # 表示与消费者断开连接
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成队列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue


              
                for
              
               severity 
              
                in
              
               severities
              
                :
              
              
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                       queue
              
                =
              
              queue_name
              
                ,
              
              
                       routing_key
              
                =
              
              severity
              
                )
              
            
          

七、整理本节代码:

图解
Python 操作 Rabbit MQ 路由 (六)_第3张图片
1.以下是 send.py 代码:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

severity 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                ]
              
              
                if
              
              
                len
              
              
                (
              
              sys
              
                .
              
              argv
              
                )
              
              
                >
              
              
                1
              
              
                else
              
              
                'info'
              
              
message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                2
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 创建一个实例  本地访问
              
                IP
              
              地址可以为 localhost 
              
                后面5672是端口地址
              
              
                (
              
                >
              
              以不用指定
              
                ,
              
               因为默认就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 声明一个管道
              
                ,
              
               在管道里发送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交换机的类型为direct
              
                :
              
               执行交换机    交换机名称
              
                :
              
               direct_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 投递消息
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              severity
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              severity
              
                ,
              
               message
              
                )
              
              
# 队列关闭
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

2.以下是 receive.py 代码:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

# 创建实例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 声明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交换机名为 direct_logs 交换机类型为
              
                :
              
              direct
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 表示与消费者断开连接
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成队列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

severities 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                if
              
               not severities
              
                :
              
              
    print 
              
                >>
              
               sys
              
                .
              
              stderr
              
                ,
              
              
                "Usage: %s [info] [warning] [error]"
              
              
                %
              
               \
                         
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                0
              
              
                ]
              
              
                ,
              
              
                )
              
              
    sys
              
                .
              
              
                exit
              
              
                (
              
              
                1
              
              
                )
              
              
                for
              
               severitie 
              
                in
              
               severities
              
                :
              
              
    # 绑定交换机和队列  这里注意的是绑定键
              
                ,
              
               就是根据按照指定严重级别进行记录日志
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                ,
              
               routing_key
              
                =
              
              severitie
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消费消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 从指定的消息队列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就调用callback函数来处理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 开始消费消息

            
          

3.仅希望保存 warning和error级别 日志到磁盘中,需要打开控制台并输入:

            
              python receive
              
                .
              
              py warning error 
              
                >
              
               logs_from_rabbit
              
                .
              
              log

            
          

4.希望所有日志都输出到屏幕中,打开一个新的终端,输入:

            
              python receive
              
                .
              
              py info warning error

            
          

5.要触发一个 error 级别的日志,需要输入:

            
              python send
              
                .
              
              py error 
              
                '发送一个error级别的错误'
              
            
          
            
              # 可以看到步骤
              
                3
              
              的控制台
              
                ,
              
               会出现
              
                :
              
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received发送一个error级别的错误

            
          

简单理解:通过绑定键的名称,来进行由哪个队列进行处理


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论