• 首页
  • vue
  • TypeScript
  • JavaScript
  • scss
  • css3
  • html5
  • php
  • MySQL
  • redis
  • jQuery
  • redis 实现消息队列的4种方案

    Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。Redis具有内置复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过哨兵模式(Sentinel)和集群模式(Cluster)自动分区。

    • 支持数据结构:字符串(string)、哈希散列(hash)、列表(list)、集合(set)、有序集合(sorted sets)、数据流(stream)。
    • 支持查询方式:位图(bitmaps),超级日志(hyperloglogs),地理信息(geo)。

    MQ

    消息队列(Message Queue,简称 MQ)。简单理解,生产者先将消息投递一个叫做【队列】的容器中,然后再从这个容器中取出消息,最后再转发给消费者。


    消息队列使用场景

    系统解耦

    在分布式环境下,系统间的相互依赖,最终会会导致整个依赖关系混乱,特别在微服务环境下,会出现相互依赖,甚至是循环依赖的情况,对后期系统的拆分和优化都带来极大负担。那么我们就可以用MQ来进行处理。上游系统将数据投递到MQ,下游系统取MQ的数据进行消费,投递和消费可以用同步的方式处理,不会影响上游系统的性能。

    异步处理

    如果采用同步的方式,系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列,将不必要的业务逻辑异步处理。

    流量削峰

    消息中间件的存储能力能够有效的帮助消费者进行缓冲。试想下,正常流量下消费者能够愉快的进行消费,瞬时高峰流量来的时候,消费者消费能力跟不上,刚好阻塞在消息中间件,等峰值过后,消费者又能很快的将阻塞的消息进行消费。流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛!

    数据分发

    大部分开源的MQ中间件基本都支持一对多或者广播的模式,而且都可以根据规则选择分发的对象。这样上游的一份数据,众多下游系统中,可以根据规则选择是否接收这些数据,这样扩展性就很强了。

    消息队列特点

    先进先出

    不能先进先出,都不能说是队列了。消息队列的顺序在入队的时候就基本已经确定了,一般是不需人工干预的。而且,最重要的是,数据是只有一条数据在使用中。这也是MQ在诸多场景被使用的原因。

    发布/订阅

    发布/订阅是一种很高效的处理方式,如果不发生阻塞,基本可以当做是同步操作。这种处理方式能非常有效的提升服务器利用率,这样的应用场景非常广泛。

    持久化

    持久化确保MQ的使用不只是一个部分场景的辅助工具,而是让MQ能像数据库一样存储核心的数据。

    分布式

    在现在大流量、大数据的使用场景下,只支持单体应用的服务器软件基本是无法使用的,支持分布式的部署,才能被广泛使用。而且,MQ的定位就是一个高性能的中间件。

    消息队列模式

    在JMS标准中,有两种消息模型P2P(Point toPoint)和Publish/Sub(Pub/Sub)。

    P2P

    点对点,一个发,一个消费。涉及到的角色发布者(Publisher)、消费者(Consumer)、消息队列(Queue)。

    • 一个消息只能被一个消费者消费,消费后会从队列里移除。
    • 发布者和消费者无关系,发布者发送消息的行为不会随消费者而改变。
    • 消费者消费完成消息,需要向队列Ack,消息队列发现消息消费成功即做消息移除。

    Pub/Sub

    发布订阅模式,一个发布,多方订阅。涉及到的角色有发布者(Publisher)、主题(Topic)、订阅者(Subscriber)。

    • 每个消息可以有多个消费者。
    • 针对某个主题(Topic)的订阅者,必须创建一个订阅者之后,才能消费发布者的消息。
    • 为了消费消息,订阅者必须保持运行的状态。


    Redis消息队列

    MQ应用有很多,比如 ActiveMQ、RabbitMQ、RocketMQ、Kafka等,但是也可以基于redis来实现。

    • 基于Stream(流)数据类型。
    • 基于List(列表)数据类型。
    • 基于Sorted Set(有序集合)类型。
    • 使用pub/sub(订阅/发布)模式。


    基于stream

    Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,消息是持久化的,Redis重启后,内容还在。

    • 每个 stream 都有唯一的名称,它就是 Redis的key,使用xadd指令追加消息时创建。
    • 每个消息都有唯一的ID,使用xadd指令追加消息时创建。
    • 每个 stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 stream 之上往前移动,表示当前消费组已经消费到哪条消息了。消费组使用xgroup create创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化 last_delivered_id 变量。每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者者有一个组内唯一名称。
    • 每个消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个 pending_ids 变量在Redis官方被称之为 PEL(Pending Entries List),这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。


    增删改查

    • xadd:追加消息。
    • xdel:删除消息。这里的删除仅仅是设置了标志位,不影响消息总长度
    • xrange:获取消息列表,会自动过滤已经删除的消息。
    • xlen:消息长度。
    • del:删除Stream。


    独立消费

    我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用,可以完全忽略消费组(Consumer Group)的存在。


    创建消费组

    Stream通过xgroup create指令创建消费组(Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。


    消费

    Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。

    Stream消息太多怎么办

    读者很容易想到,要是消息积累太多,Stream的链表岂不是很长,内容会不会爆掉就是个问题了。xdel指令又不会删除消息,它只是给消息做了个标志位。Redis自然考虑到了这一点,所以它提供了一个定长Stream功能。在xadd的指令提供一个定长长度maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

    127.0.0.1:6379> xlen codehole
    (integer)5
    
    127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
    1527855160273-0
    
    127.0.0.1:6379> xlen codehole
    (integer)3
    

    我们看到Stream的长度被砍掉了。


    消息如果忘记ACK会怎样?

    Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。


    PEL如何避免消息丢失?

    在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的PEL消息以及自last_delivered_id之后的新消息。


    分区Partition

    Redis没有原生支持分区的能力,想要使用分区,需要分配多个Stream,然后在客户端使用一定的策略来讲消息放入不同的stream。


    <?php 
    
        //连接reids
        $redis = new Redis();
        $redis->connect('127.0.0.1', 6379);
    
        //xadd:追加消息
        //xdel:删除消息,删除标志位,不影响消息总长度
        //xrange:消息列表,过滤已删除的消息
        //xlen:消息长度
        //del: 删除所有消息
    
        $redis->rawCommand('del','codehole');
    
        // 星号表示自动生成id,后面参数key,value
        $redis->rawCommand('xadd','codehole','*','name','user1','age','20');
        $redis->rawCommand('xadd','codehole','*','name','user2','age','18');
        $redis->rawCommand('xadd','codehole','*','name','user3','age','19');
        $redis->rawCommand('xadd','codehole','*','name','user4','age','19');
    
        //maxlen 定长长度,将老消息干掉,确保链表不会超过指定长度
        $redis->rawCommand('xadd','codehole','maxlen','3','*','name','user5','age','19');
    
        //XDEL codehole  id
        //$redis->rawCommand('xdel','codehole','1538561700640-0');
    
        $res = $redis->rawCommand('xlen','codehole');
        echo "<pre>";
        var_dump($res);
        echo '<br />';
    
        // -最小值 +最大值
        $res = $redis->rawCommand('xrange','codehole','-','+');
        print_r($res);
        echo '<br />';
    
        $id = $res[1][0];
    
        // 指定最小消息列表
        $res = $redis->rawCommand('xrange','codehole',$id,'+');
        // var_dump($res);
        // echo '<br />';
    
        // 指定最大消息列表
        $res = $redis->rawCommand('xrange','codehole','-',$id);
        // var_dump($res);
        // echo '<br />';
        
        // 指定最大消息列表
        $res = $redis->rawCommand('xrange','codehole','-',$id);
        // var_dump($res);
        // echo '<br />';
    
        /************************独立消费************************/
    
        //从stream中头部读取两条消息
        $res = $redis->rawCommand('xread','count','2','streams','codehole','0-0');
        // var_dump($res);
        // echo '<br />';
    
        //从尾部读取一条消息,这里不会返回任何消息
        $res = $redis->rawCommand('xread','count','1','streams','codehole','$');
        // var_dump($res);
        // echo '<br />';
    
        //block 0 表示永久阻塞,直到消息到来,block 1000表示阻塞1秒,如果1秒没新消息,返回null
        //从尾部阻塞等待消息到来,然后新开一个窗口塞消息,这时候阻塞解除返回新消息内容
        // $res = $redis->rawCommand('xread','block','0','count','1','streams','codehole','$');
        // var_dump($res);
        // echo '<br />';
    
    
        /************************消费组************************/
    
    
      // 星号表示自动生成id,后面参数key,value
        $redis->rawCommand('xadd','mq','*','msg','1');
        $redis->rawCommand('xadd','mq','*','msg','2');
        $redis->rawCommand('xadd','mq','*','msg','3');
        $redis->rawCommand('xadd','mq','*','msg','4');
        $redis->rawCommand('xadd','mq','*','msg','5');
        
    
        //创建消费组mqGroup  为消息队列 mq 从第一条开始消费
        $redis->rawCommand('xgroup','create','mq','mqGroup','0');
    
        //从从尾部开始消费
        //$redis->rawCommand('xgroup','create','mq','mqGroup','$');
    
        //消费者A,消费第1条
        $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
    
        //消费者A,消费第2条
        $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
    
        //消费者B,消费第3条
        $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerB', 'count', '1' ,'streams', 'mq', '>');
    
        //消费者A,消费第4条
        $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>');
    
        //消费者c,消费第5条
        $res = $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerC', 'count', '1' ,'streams', 'mq', '>');
    
        //获取strarm信息
        $res = $redis->rawCommand('xinfo','stream','mq');
        echo "<pre>";
        print_r($res);
        echo '<br />';
    
        //获取strarm消费组信息
        $res = $redis->rawCommand('xinfo','groups','mq');
        print_r($res);
        echo '<br />';
    
        //同一个消费组有多个消费者,观察每个消费者的状态
        $res = $redis->rawCommand('xinfo','consumers','mq','mqGroup');
        print_r($res);
        echo '<br />';
    
        //mpGroup的Pending等待列表情况  + 0 10
        //使用 -:start +:end 10:count 选项可以获取详细信息
        //$res = $redis->rawCommand('xpending','mq','mqGroup');
        //$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10');
        $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
        print_r($res);
        echo '<br />';
    
        //通知消息处理结束,用消息ID标识
        $msg_id = $res[0][0];
        $res = $redis->rawCommand('xack','mq','mqGroup',$msg_id);
        print_r($res);
        echo '<br />';
    
        //再次查看Pending列表
        $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA');
        print_r($res);
        echo '<br />';
    
        //转移超过36s的消息id到消费者B的Pending列表
        $redis->rawCommand('xclaim','mq','mqGroup','consumerB','36000',$msg_id);
       
      


    基于List

    使用rpushlpush操作入队列,lpoprpop操作出队列。List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

    但是当队列为空时,lpop和rpop会一直空轮训,消耗资源;所以引入阻塞读blpopbrpop(b代表blocking),阻塞读在队列没有数据的时候进入休眠状态。一旦数据到来则立刻醒过来,消息延迟几乎为零。


    空闲连接

    如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。


    缺点:

    • 做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认。
    • 不能做广播模式,如pub/sub,消息发布/订阅模型
    • 不能重复消费,一旦消费就会被删除
    • 不支持分组消费


    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);
    
    //发送消息
    $redis->lPush($list, $value);
    
    //消费消息
    while (true) {
        try {
            $msg = $redis->rPop($list);
            if (!$msg) {
                sleep(1);
            }
            //业务处理
         
        } catch (Exception $e) {
            echo $e->getMessage();
        }
    }
    
    

    上面代码会有个问题如果队列长时间是空的,那pop就不会不断的循环,这样会导致redis的QPS升高,影响性能。所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。还有一个需要注意的点是我们需要是用try/catch来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用。


    使用pub/sub

    • subscribe:用于订阅信道。
    • publish:向信道发送消息。
    • unsubscribe:取消订阅。

    此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

    优点

    • 典型的广播模式,一个消息可以发布到多个消费者。
    • 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息。
    • 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。

    缺点

    • 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。
    • 不能保证每个消费者接收的时间是一致的。
    • 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。
    • 可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。


      基于 Sorted Set

      Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面她是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的

      排序权重。内部实现是“跳跃表”。

      有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据

      Score排序的特征,就可以制作一个有序的消息队列了。

      优点

      就是可以自定义消息ID,在消息ID有意义时,比较重要。

      缺点

      缺点也明显,不允许重复消息(因为是集合),同时消息ID确定有错误会导致消息的顺序出错。