使用Python操作Redis5.0新特性Stream实现订阅发布功能

系统 1624 0

  本文完整代码下载:github链接

  目前在做的工作有一部门是搭建一个可供公司内部使用的推送平台,用的中间件是redis,于是就自然的想用redis5.0版本的新特性来实现这个功能,网上的demo比较少,且大多是终端操作的命令行,写了一个Python的类和大家分享。

在介绍具体实现之前,先大致介绍一下背景。

在Redis5.0版本发布之前,redis也有一个发布、订阅功能,但功能非常简单,只能单纯的发布和订阅,适合在即时通信里使用。缺点非常多:

  1. 消息没有持久化的机制。在Pub/Sub模型中,消费者是和连接(Connection)绑定的,当消费者的连接断掉(网络原因或者消费者进程crash)后,再次重连,那么Channel中的消息将永久消失(对于该消费者而言),也就是说Pub/Sub模型缺少消息回溯的机制

  2. 消费消息的速度和消费者的数量成反比。在Redis的实现中,Redis会把Channel中的消息逐个(Linear)推送给每个消费者,因此当消费者的数量达到一定规模时,服务器的性能将线性下降,因此每个消费者获取到消息的延迟也线性增长

  3. 当生产者产生消息的速度远大于消费者的消费能力的时候(此时可以简单地理解为消息积压),消费者会被强制断开连接,因此会造成消息的丢失,这个特性可以详见redis的配置

  4. 对频道的消费者信息没有展现接口。 在我们的项目里需要管理每一个频道的订阅者,虽然redis本身有记录,但是并没有提供API可以访问。

Redis5.0最大的新特性就是多出了一个数据结构Stream,它是一个新的强大的支持多播的可持久化的消息队列,设计和Kafka非常相似。

  1. Redis Stream有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。
  2. 每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。
  3. 每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。
  4. 同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。【但是每个消费者并没有消费到哪条 消息的单独记录,所以后续我队列的消费者就是一个只含有一个消费者的消费组,这样可以方便记录更多信息】
  5. 消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

 具体实现:


            
              class SubRedis(object):

    def __init__(self):
        if not hasattr(SubRedis, 'pool'):
            SubRedis.getRedisCoon()  #创建redis连接池
        self._coon = redis.Redis(connection_pool=SubRedis.pool)

    @staticmethod
    def getRedisCoon():
        SubRedis.pool = redis.ConnectionPool(host=redisInfo['SubRedisAddress'],password=redisInfo['SubRedisPassword'],port=redisInfo['SubRedisPort'],db=redisInfo['db'])

    #返回一个channel的具体信息: 订阅者数量,最后送达的msg的ID...
    def channel_info(self,channel):
        return self._coon.xinfo_stream(channel)

    #返回一个channel的具体订阅群组的信息(这里是返回订阅者,因为每一个群组里只有一个消费者)
    def channel_consumers_info(self,channel):
        InfoList = self._coon.xinfo_groups(channel)
        for GroupDict in InfoList:
            GroupDict.pop("consumers")
        return InfoList

    #创建消费者
    def create_consumer_group(self,name,channel):
        ret = self._coon.xgroup_create(channel,name,id="$")
        if ret == True:
            print self.channel_consumers_info(channel)
        else:
            logging.error("create consumer %s fill,ret %s" %(name,ret))
    
    #往某一个channel发送消息
    def publish(self,channel,msg):
        msgid = self._coon.xadd(channel,msg)
        return msgid
    def consumer_already_subscribed(self,channel,consumer):
        channel_consumers_infolist = self.channel_consumers_info(channel)
        for consumer_dict in channel_consumers_infolist:
            if consumer in consumer_dict.values():
                logging.warning("consumer %s has already subscribed %s" % (consumer, channel))
                return True
        return False
    
    #已经存在的订阅者订阅新频道
    def subscribe(self,name,channel):
        if(self.consumer_already_subscribed(channel,consumer)):
            return False
        self.create_consumer_group(name,channel)
        print "%s subscribe %s success,channel %s info:"%(name,channel,channel),self.channel_consumers_info(channel)
        return
    
    #监听并写入新消息到文件
    def listen_channel(self,channel,consumer,file):
        if not (self.consumer_already_subscribed(channel,consumer)):
            return False
        mess = self._coon.xreadgroup(consumer,consumer,{channel:">"})
        if mess != []:
            msg_list = mess[0][1]
            for msg in msg_list:
                id, content = msg[0], msg[1]
                content["msgid"] = id
                json_content = json.dumps(content)
                json_content += ","
                print ("new message: ",content)
                with open(file, "a") as f:
                    f.write(json_content)
                self._coon.xack(channel, consumer, id)
            
          

 


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论