Python 操作 Rabbit MQ 工作队列 (四)
一、工作队列简介:
主要介绍,我们将会创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列
:又称为
任务队列
,为了避免等待一些占用大量资源、时间的操作。当我们把任务当作
消息发送到队列中
,一个运行在后台的工作者进程就会取出任务,然后进行处理。当运行多个工作者,任务就会在它们之间共享。
二、准备工作:
首先,我们将发送一些字符串,把这些字符串当作复杂的任务,我们使用time.sleep()函数来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)就会耗时1秒钟。比如"Hello…",就会耗时6秒种。
将之前的send.py文件,做简单调整,以便可发送随意的消息,到工作队列中。名命为
new_task.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 创建一个实例 本地访问
IP
地址可以为 localhost
后面5672是端口地址
(
可以不用指定
,
因为默认就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 声明一个管道
,
在管道里发送消息
channel
=
connection
.
channel
(
)
# 在管道里声明队列名称
channel
.
queue_declare
(
queue
=
'hello'
)
# 参数exchange
=
''
表示默认交换
,
目前记住rabbitmq消息永远不是直接发送到队列中的
,
它需要通过交换
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'hello'
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 队列关闭
connection
.
close
(
)
更改
receive.py
脚本,将消息体中每一个点号(.)模拟1秒种的操作,它会从任务队列中获取消息并执行。名命为
worker.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 创建实例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 声明管道
channel
=
connection
.
channel
(
)
# 这里又声明一次
'hello'
队列
,
因为你不知道哪个程序
(
send
.
py
)
先运行
,
所以要声明两次
channel
.
queue_declare
(
queue
=
'hello'
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
time
.
sleep
(
body
.
count
(
'.'
)
)
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告诉生产者
,
消息处理完成
# 消费消息
channel
.
basic_consume
(
queue
=
'hello'
,
# 从指定的消息队列中接收消息
on_message_callback
=
callback
)
# 如果收到消息
,
就调用callback函数来处理
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 开始消费消息
三、循环调度:
使用工作队列的一个好处就是它能够
并行的处理队列
,如果累积了很多任务,我们仅需要添加更多的工作者(workers)就可以了,扩展很简单。
1.首先,先同时运行两个
worker.py
脚本,它们会从队列中获取消息;
2.需要打开三个终端,两个用来运行
worker.py
脚本,这俩个终端就是我们的两个消费者(Consumers),C1和C2;
# shell
-
1
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
# shell
-
2
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
3.打开第三个终端,使用
new_task.py
进行发送任务:
# shell
-
3
python new_task
.
py python new_task
.
py First message
.
[
x
]
sent python new_task
.
py First message
.
python new_task
.
py python new_task
.
py Second message
.
.
[
x
]
sent python new_task
.
py Second message
.
.
python new_task
.
py python new_task
.
py Third message
...
[
x
]
sent python new_task
.
py Third message
...
python new_task
.
py python new_task
.
py Fourth message
...
.
[
x
]
sent python new_task
.
py Fourth message
...
.
python new_task
.
py python new_task
.
py Fifth message
...
.
.
[
x
]
sent python new_task
.
py Fifth message
...
.
.
4.查看工作者(workers)如果处理,生产者发送的消息:
# shell
-
1
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
[
X
]
Receivedpython new_task
.
py First message
.
[
X
]
Receivedpython new_task
.
py Third message
...
[
X
]
Receivedpython new_task
.
py Fifth message
...
.
.
# shell
-
2
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
[
X
]
Receivedpython new_task
.
py Second message
.
.
[
X
]
Receivedpython new_task
.
py Fourth message
...
总结 :
-
RabbitMQ 会按顺序把消息发送给每个消费者(Consumer),平均每个消费者都会收到同等数量的消息。
这种发送消息的方式叫做轮询
。
四、消息确认:
当处理一个比较耗时的任务时,如果消费者运行到一半就挂掉了,怎么办?
当消息被RabbitMQ发送给消费者之后,马上就会在内存中移除。这种情况,仅需要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。
当然,我们不向丢失任何任务消息。
如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者。
解决办法 :
-
为了防止消息跌势,RabbitMQ提供了消息的响应。消息者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
-
若消费者(Consunmer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者。这样,即使工作者偶尔挂掉,也不会丢失消息。
-
消息没有超时这个概念的;当工作者与它断开连接的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
-
消息响应,默认是开启的。之前栗子中,使用
no_ack=True
标识把它关闭。当工作者完成了任务,就发送一个响应。def callback ( ch , method , properties , body ) : print '[X] Received{}' . format ( body , ) time . sleep ( body . count ( '.' ) ) ch . basic_ack ( delivery_tag = method . delivery_tag ) # 告诉生产者 , 消息处理完成 # 消费消息 channel . basic_consume ( queue = 'hello' , # 从指定的消息队列中接收消息 on_message_callback = callback , # 如果收到消息 , 就调用callback函数来处理 on_ack = False ) # 开启消息的响应 , 默认是开启的
运行上面代码,将两个工作者进行杀掉,然后启动生产者进行消息的发送。当工作者挂掉以后,所有没有响应的消息都会重新发送。
# shell - 3 python new_task . py 01 Message . python new_task . py 02 Message . python new_task . py 03 Message . python new_task . py 04 Message . python new_task . py 05 Message .
# 查看当前队列中的消息: rabbitmqctl list_queues Listing queues ... hello 5 # 队列名字 'hello' 中 有 5 条信息没有得到响应处理
# 重新启动工作者: python worker . py === === = 正在等待消息 === === == [ X ] Received01 Message . [ X ] Received02 Message . [ X ] Received03 Message . [ X ] Received04 Message . [ X ] Received05 Message .
# 查看当前队列中的消息 rabbitmqctl list_queues Listing queues ... hello 0 # 队列名字 'hello' 所有消息已经释放
注意
:很容易犯的错误,就是忘了
basic_ack
,后果很严重。消息在程序退出之后就会重新发送,如果它不能够释放没有得到响应的消息,
RabbitMQ就会占用越来越多的内存
。
五、消息持久化:
如果没有特意告诉RabbitMQ,那么在它退出或者崩溃时,
将会丢失所有队列和消息
。为了保证不丢失,有两个事情需要注意:
我们必须把队列和消息设置为持久化
。
1.将队列声明持久化:
将队列声明为持久化(durable):
channel
.
queue_declare
(
queue
=
'hello'
,
durable
=
True
)
-
这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫’hello’的非持久化队列,RabbitMQ不允许你使用不同的参数重新定义一个队列,它会报错。但我们现在使用一个快捷的解决方法,
用不同的队列名
。channel . queue_declare ( queue = 'task_queue' , durable = True )
这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改 。
2.将发送消息设为持久化:
这时,我们可以确保在RabbitMQ重启之后,queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化,将
delivery_mode的属性设为2
。
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'task_queue'
,
body
=
message
,
properties
=
pika
.
BasicProperties
(
delivery_mode
=
2
)
# 使消息持久化
)
注意
:将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉
RabbitMQ要把消息存储到硬盘中
,但从RabbitMQ收到消息到保持之间还是有一个很小的时间间隔。因为RabbitMQ并不是所有的消息都使用fsync,它有可能只是保持到缓存中,并不一定会写到硬盘中。并不能
保证真正的持久化
。
六、公平调度:
以上 循环调度 的栗子,可以看出,它并不是按照我们所期望的那样进行分发。如果有两个工作者,处理奇数的消息比较繁忙,处理偶数的消息比较轻松。这时RabbitMQ并不知道这些,它依然会一如既往的派发消息。
这时因为RabbitMQ,只管分发进入队列的消息,不会关心有多少消费者没有作出响应。
使用
basic.qos方法,并设置prefetch_count=1
,告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者,直到它已经处理上一条消息并且作出了响应。
这样,RabbitMQ就会把消息分发给下一个空闲的工作者
。
channel
.
basic_qos
(
prefetch_count
=
1
)
注意 :如果所有工作者都处理繁忙的状态,你的队列就会被填满。解决此问题,需要添加更多的工作者(workers),或者使用其他的策略。
七、整理本节最终代码:
new_task.py
完整代码:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 创建一个实例 本地访问
IP
地址可以为 localhost
后面5672是端口地址
(
可以不用指定
,
因为默认就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 声明一个管道
,
在管道里发送消息
channel
=
connection
.
channel
(
)
# 在管道里声明队列名称
channel
.
queue_declare
(
queue
=
'task_queue'
,
durable
=
True
)
# 参数exchange
=
''
表示默认交换
,
目前记住rabbitmq消息永远不是直接发送到队列中的
,
它需要通过交换
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'task_queue'
,
body
=
message
,
properties
=
pika
.
BasicProperties
(
delivery_mode
=
2
)
# 使消息持久化
)
print
"[x] sent {}"
.
format
(
message
,
)
# 队列关闭
connection
.
close
(
)
worker.py
完整代码:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 创建实例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 声明管道
channel
=
connection
.
channel
(
)
# 这里又声明一次
'hello'
队列
,
因为你不知道哪个程序
(
send
.
py
)
先运行
,
所以要声明两次
channel
.
queue_declare
(
queue
=
'task_queue'
,
durable
=
True
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
time
.
sleep
(
body
.
count
(
'.'
)
)
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告诉生产者
,
消息处理完成
# 公平调度
,
同一时刻
,
不要发送超过
1
条消息给下一个工作者
channel
.
basic_qos
(
prefetch_count
=
1
)
# 消费消息
channel
.
basic_consume
(
queue
=
'task_queue'
,
# 从指定的消息队列中接收消息
on_message_callback
=
callback # 如果收到消息
,
就调用callback函数来处理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 开始消费消息
这些持久化的选项,可以使得在RabbitMQ重启之后仍然能够恢复。