Python 操作 Rabbit MQ 基础 (三)

系统 2024 0

Python 操作 Rabbit MQ 基础:

一、简介:

1.介绍:

RabbitMQ是一个消息代理:它接收和转发消息。

可以把它比作为邮局,当您要发布邮件放在邮箱中时,可以确定这封邮件让哪位快递员来进行发送到您的收件人手中。

2.术语:

1.发送消息的程序是 生产者

img

2.队列可以理解为邮箱,用来存储一些邮件。队列的由主机的 存储器 磁盘限制约束 ,它本质上是一个大的 消息缓冲器 。很多生产者可以发送到一个队列的消息,并且许多消费者可以尝试从一个队列接收数据:

img

3.消费者可以理解为接收人。一个消费者是一个程序, 主要是等待接收信息

img

注意

  • 生产者、消费者、消息队列 不必驻留在同一个主机上

二、发送"Hello world":

实现功能:

生产者 → 消息队列 → 消费者

  • 生产者发送单个消息到消息队列中,并由消费者取出,进行处理。

  • 图解:

    img

RabbitMQ有许多不同语言的客户端,我们使用的Python客户端,那么使用 Pika 1.0.0 包:

安装命令: python -m pip install pika --upgrade

三、生产者:

img

新创建一个程序 send.py ,将向队列发送一条消息:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 创建一个实例  本地访问
              
                IP
              
              地址可以为 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指定
              
                ,
              
               因为默认就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 声明一个管道
              
                ,
              
               在管道里发送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 在管道里声明队列名称
channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                'hello'
              
              
                )
              
              

# 参数exchange
              
                =
              
              
                ''
              
              表示默认交换
              
                ,
              
               目前记住rabbitmq消息永远不是直接发送到队列中的
              
                ,
              
               它需要通过交换
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                ''
              
              
                ,
              
               routing_key
              
                =
              
              
                'hello'
              
              
                ,
              
               body
              
                =
              
              
                'hello world'
              
              
                )
              
              

# 队列关闭
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

注意

  • 若没有看到’已发送’消息,有可能是消息队列没有足够的磁盘空间可用。默认情况下它至少需要200MB空间,因此会拒绝接收消息。

四、消费者:

img

新创建第二个程序 receive.py ,将从队列中接收消息并打印出来:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               time

# 创建实例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 声明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 这里又声明一次
              
                'hello'
              
              队列
              
                ,
              
              
                因为你不知道哪个程序
              
              
                (
              
              send
              
                .
              
              py
              
                )
              
              先运行
              
                ,
              
               所以要声明两次
channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                'hello'
              
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'ch:'
              
              
                ,
              
               ch
              
                )
              
              
                print
              
              
                (
              
              
                'method:'
              
              
                ,
              
               method
              
                )
              
              
                print
              
              
                (
              
              
                'properties:'
              
              
                ,
              
               properties
              
                )
              
              
                print
              
              
                (
              
              
                '--------收到消息:---------{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                )
              
              
                )
              
                # 看消息是否已经接收
    time
              
                .
              
              
                sleep
              
              
                (
              
              
                20
              
              
                )
              
                # 睡
              
                20
              
              秒
    ch
              
                .
              
              
                basic_ack
              
              
                (
              
              delivery_tag
              
                =
              
              method
              
                .
              
              delivery_tag
              
                )
              
                # 告诉生产者
              
                ,
              
               消息处理完成


# 消费消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              
                'hello'
              
              
                ,
              
                # 从指定的消息队列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                )
              
                # 如果收到消息
              
                ,
              
               就调用callback函数来处理


              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 开始消费消息

            
          

列出目前有的队列: sudo rabbitmqctl list_queues

五、运行程序:

1.首先在终端上使用我们的程序,先让 消费者的程序运行起来 ,等待生产者的发送:

            
              python receive
              
                .
              
              py
# 
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

2.现在开始让生产者,发送消息:

            
              python send
              
                .
              
              py 
# 
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              发出消息
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                -
              
            
          

3.可以看到消费者的打印信息:

            
              
                (
              
              
                'ch:'
              
              
                ,
              
              
                <
              
              BlockingChannel impl
              
                =
              
              
                <
              
              Channel number
              
                =
              
              
                1
              
              
                OPEN
              
               conn
              
                =
              
              
                <
              
              SelectConnection 
              
                OPEN
              
               transport
              
                =
              
              
                <
              
              pika
              
                .
              
              adapters
              
                .
              
              utils
              
                .
              
              io_services_utils
              
                .
              
              _AsyncPlaintextTransport object at 
              
                0x153b590
              
              
                >
              
               params
              
                =
              
              
                <
              
              ConnectionParameters host
              
                =
              
              localhost port
              
                =
              
              
                5672
              
               virtual_host
              
                =
              
              
                /
              
               ssl
              
                =
              
              False
              
                >>>
              
              
                >
              
              
                )
              
              
                (
              
              
                'method:'
              
              
                ,
              
              
                <
              
              Basic
              
                .
              
              
                Deliver
              
              
                (
              
              
                [
              
              
                'consumer_tag=ctag1.cc4070930568408fb20c783f85f9336e'
              
              
                ,
              
              
                'delivery_tag=1'
              
              
                ,
              
              
                'exchange='
              
              
                ,
              
              
                'redelivered=False'
              
              
                ,
              
              
                'routing_key=hello'
              
              
                ]
              
              
                )
              
              
                >
              
              
                )
              
              
                (
              
              
                'properties:'
              
              
                ,
              
              
                <
              
              BasicProperties
              
                >
              
              
                )
              
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              收到消息:
              
                --
              
              
                --
              
              
                --
              
              
                --
              
              
                -
              
              hello world

            
          

4.可以查看一下目前有的队列:

            
              rabbitmqctl list_queues   
              
                // 执行命令
              
              
Listing queues 
              
                ...
              
              
hello	
              
                0
              
              
                // hello 队列的名称  0:消息数量
              
            
          

5.我们已经成功的通过RabbitMQ发送第一条消息,hello world,但是 receive.py 程序不会退出,它将保持准备接收更多消息,使用 Ctrl +c 中断


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论