• 首页
  • vue
  • TypeScript
  • JavaScript
  • scss
  • css3
  • html5
  • php
  • MySQL
  • redis
  • jQuery
  • xreadgroup 命令

    XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。在阅读本页之前,你可能必须先理解XREAD命令才有意义。


    语法

    XREADGROUP GROUP group consumer[COUNT count][BLOCK milliseconds][NOACK] STREAMS key[key ...] ID[ID ...]

    消费组消息(消费的消息会记录在pending列表里,等待xack确认,需要先创建消费组);

    • [COUNT count]:设置获取消费消息数量。
    • [BLOCK milliseconds]:设置阻塞等待时间(ms)(通常跟>一起使用获取新消息)。
    • [ID ...]:读取id之后的消息,id为0可获取已读但未确认的消息(可多次消费,不会获得未读取的消息),id为>表示读取组内未读取的消息(同组内成员会消费不同消息)。


    • group:消费组名。
    • consumer:消费者名。
    • count:读取数量。
    • milliseconds:阻塞毫秒数。
    • key:队列名。
    • ID:消息 ID。


    快速了解消费者组

    此命令与XREAD的区别是它支持消费者组

    如果没有消费者组,仅使用XREAD,所有客户端都将获得所有到达流的条目。相反,如果使用带有XREADGROUP的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。

    在消费者组中,给定的消费者(即从流中消费消息的客户端)必须使用唯一的消费者名称进行标识。名称只是一个字符串。

    消费者组的保证之一是,给定的消费者只能看到发送给它的历史消息,因此每条消息只有一个所有者。然而,还有一个特殊的特性叫做消息认领,其允许其他消费者在某些消费者无法恢复时认领消息。为了实现这样的语义,消费者组要求消费者使用XACK命令显式确认已成功处理的消息。这是必要的,因为流将为每个消费者组跟踪哪个消费者正在处理什么消息。

    这是如何理解您是否要使用消费者组:

    1. 如果你有一个流和多个客户端,并且你希望所有的客户端都获取到完整的信息,那么你不需要使用消费者组。
    2. 如果你有一个流和多个客户端,并且你希望在你的客户端上对流进行分区共享,以便每个客户端都能获得一个到达流的消息的子集,那么你需要使用消费者组。

    XREAD和XREADGROUP之间的差异

    从语法的角度来看,这两个命令几乎是相同的,但是XREADGROUP需要一个特殊和强制的选项:

    GROUP <group-name> <consumer-name>
    

    组名只是关联到流的消费者组的名称。该组是使用XGROUP命令创建的。消费者名称是客户端用于在消费者组内标识自己的字符串。消费者会在第一次出现在消费者组内时被自动创建。不同的消费者应该选择不同的消费者名称。

    当你使用XREADGROUP读取时,服务器将会记住某个给定的消息已经传递给你:消息会被存储在消费者组内的待处理条目列表(PEL)中,即已送达但尚未确认的消息ID列表。

    客户端必须使用XACK确认消息处理,以便从待处理条目列表中删除待处理条目。可以使用XPENDING命令检查待处理条目列表。

    使用XREADGROUP时在STREAMS选项中指定的ID可以是以下两种之一:

    • 特殊ID>,意味着消费者希望只接收从未发送给任何其他消费者的消息。这意思是说,请给我新的消息。
    • 任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将具有返回发送命令的消费者的待处理条目的效果。所以,基本上如果ID不是>,命令将让客户端访问它的待处理条目(已发送给它,但尚未确认的条目)。

    就像XREADXREADGROUP命令也可以以阻塞的方式使用。在这方面没有区别。

    当消息被传递给消费者时,会发生什么?

    两件事:

    1. 如果消息从未被发送给其他消费者,也即,如果我们正在谈论新消息,则创建待处理条目列表(PEL)。
    2. 相反,如果该消息已经发送给该消费者,并且它只是再次重新获取相同的消息,那么最后送达时间会被更新为当前时间,并且送达次数会加1。你可以使用XPENDING命令访问这些消息属性。

    用法示例

    通常,你使用这样的命令来获取新消息并处理它们。在伪代码中:

    WHILE true
        entries = XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
        if entries == nil
            puts "Timeout... try again"
            CONTINUE
        end
    
        FOREACH entries AS stream_entries
            FOREACH stream_entries as message
                process_message(message.id,message.fields)
    
                # ACK the message as processed
                XACK mystream $GroupName message.id
            END
        END
    END
    

    通过这种方式,例子中的消费者代码将会只获取新消息,处理它们,以及通过XACK确认它们。但是以上案例的代码是不完整的,因为它没有处理崩溃后的恢复事宜。如果我们在处理消息的过程中崩溃了,则我们的消息将继续保留在待处理条目列表中,因此我们可以通过给XREADGROUP初始ID为0并执行相同的循环来访问我们的消息历史。一旦提供的ID为0并且回复是一组空的消息,我们就知道我们已经处理并确认完了所有的待处理消息:我们可以开始使用>作为ID,以便获取新消息并重新加入正在处理新消息的消费者。


    消费者组模式

    当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:



    消费者组模式的支持主要由两个命令实现:

    • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作。
    • XREADGROUP,分组消费消息操作。

    进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

    # 生产者生成10条消息
    redis> MULTI
    redis> XADD mq * msg 1 # 生成一个消息:msg 1
    redis> XADD mq * msg 2
    redis> XADD mq * msg 3
    redis> XADD mq * msg 4
    redis> XADD mq * msg 5
    redis> EXEC
     1) "1553585533795-0"
     2) "1553585533795-1"
     3) "1553585533795-2"
     4) "1553585533795-3"
     5) "1553585533795-4"
    
    # 创建消费组 mqGroup
    redis> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
    OK
    
    # 消费者A,消费第1条
    redis> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
    1) 1) "mq"
       2) 1) 1) "1553585533795-0"
             2) 1) "msg"
                2) "1"
    # 消费者A,消费第2条
    redis> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
    1) 1) "mq"
       2) 1) 1) "1553585533795-1"
             2) 1) "msg"
                2) "2"
    # 消费者B,消费第3条
    redis> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
    1) 1) "mq"
       2) 1) 1) "1553585533795-2"
             2) 1) "msg"
                2) "3"
    # 消费者A,消费第4条
    redis> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
    1) 1) "mq"
       2) 1) 1) "1553585533795-3"
             2) 1) "msg"
                2) "4"
    # 消费者C,消费第5条
    redis> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
    1) 1) "mq"
       2) 1) 1) "1553585533795-4"
             2) 1) "msg"
                2) "5"
    
    
    

    上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

    XGROUP CREATE mq mqGroup 0,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

    XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

    可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

    以上就是消费组的基础操作。

    上篇:xgroup 命令

    下篇:xack 命令