redis 实现消息队列的4种方案
Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。Redis具有内置复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过哨兵模式(Sentinel)和集群模式(Cluster)自动分区。
- 支持数据结构:字符串(string)、哈希散列(hash)、列表(list)、集合(set)、有序集合(sorted sets)、数据流(stream)。
- 支持查询方式:位图(bitmaps),超级日志(hyperloglogs),地理信息(geo)。
MQ
消息队列(Message Queue,简称 MQ)。简单理解,生产者先将消息投递一个叫做【队列】的容器中,然后再从这个容器中取出消息,最后再转发给消费者。
消息队列使用场景
系统解耦
在分布式环境下,系统间的相互依赖,最终会会导致整个依赖关系混乱,特别在微服务环境下,会出现相互依赖,甚至是循环依赖的情况,对后期系统的拆分和优化都带来极大负担。那么我们就可以用MQ来进行处理。上游系统将数据投递到MQ,下游系统取MQ的数据进行消费,投递和消费可以用同步的方式处理,不会影响上游系统的性能。
异步处理
如果采用同步的方式,系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列,将不必要的业务逻辑异步处理。
流量削峰
消息中间件的存储能力能够有效的帮助消费者进行缓冲。试想下,正常流量下消费者能够愉快的进行消费,瞬时高峰流量来的时候,消费者消费能力跟不上,刚好阻塞在消息中间件,等峰值过后,消费者又能很快的将阻塞的消息进行消费。流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛!
数据分发
大部分开源的MQ中间件基本都支持一对多或者广播的模式,而且都可以根据规则选择分发的对象。这样上游的一份数据,众多下游系统中,可以根据规则选择是否接收这些数据,这样扩展性就很强了。
消息队列特点
先进先出
不能先进先出,都不能说是队列了。消息队列的顺序在入队的时候就基本已经确定了,一般是不需人工干预的。而且,最重要的是,数据是只有一条数据在使用中。这也是MQ在诸多场景被使用的原因。
发布/订阅
发布/订阅是一种很高效的处理方式,如果不发生阻塞,基本可以当做是同步操作。这种处理方式能非常有效的提升服务器利用率,这样的应用场景非常广泛。
持久化
持久化确保MQ的使用不只是一个部分场景的辅助工具,而是让MQ能像数据库一样存储核心的数据。
分布式
在现在大流量、大数据的使用场景下,只支持单体应用的服务器软件基本是无法使用的,支持分布式的部署,才能被广泛使用。而且,MQ的定位就是一个高性能的中间件。
消息队列模式
在JMS标准中,有两种消息模型P2P(Point toPoint)和Publish/Sub(Pub/Sub)。
P2P
点对点,一个发,一个消费。涉及到的角色发布者(Publisher)、消费者(Consumer)、消息队列(Queue)。
- 一个消息只能被一个消费者消费,消费后会从队列里移除。
- 发布者和消费者无关系,发布者发送消息的行为不会随消费者而改变。
- 消费者消费完成消息,需要向队列Ack,消息队列发现消息消费成功即做消息移除。
Pub/Sub
发布订阅模式,一个发布,多方订阅。涉及到的角色有发布者(Publisher)、主题(Topic)、订阅者(Subscriber)。
- 每个消息可以有多个消费者。
- 针对某个主题(Topic)的订阅者,必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者必须保持运行的状态。
Redis消息队列
MQ应用有很多,比如 ActiveMQ、RabbitMQ、RocketMQ、Kafka等,但是也可以基于redis来实现。
- 基于
Stream (流)数据类型。 - 基于
List (列表)数据类型。 - 基于
Sorted Set (有序集合)类型。 - 使用
pub/sub (订阅/发布)模式。
基于stream
Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,消息是持久化的,Redis重启后,内容还在。
- 每个 stream 都有唯一的名称,它就是 Redis的key,使用
xadd
指令追加消息时创建。 - 每个消息都有唯一的ID,使用
xadd
指令追加消息时创建。 - 每个 stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 stream 之上往前移动,表示当前消费组已经消费到哪条消息了。消费组使用
xgroup create
创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化 last_delivered_id 变量。每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者者有一个组内唯一名称。 - 每个消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个 pending_ids 变量在Redis官方被称之为 PEL(Pending Entries List),这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
增删改查
xadd
:追加消息。xdel
:删除消息。这里的删除仅仅是设置了标志位,不影响消息总长度xrange
:获取消息列表,会自动过滤已经删除的消息。xlen
:消息长度。del
:删除Stream。
独立消费
我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread
,可以将Stream当成普通的消息队列(list)来使用,可以完全忽略消费组(Consumer Group)的存在。
创建消费组
Stream通过xgroup create
指令创建消费组(Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
消费
Stream提供了xreadgroup
指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。
Stream消息太多怎么办
读者很容易想到,要是消息积累太多,Stream的链表岂不是很长,内容会不会爆掉就是个问题了。xdel指令又不会删除消息,它只是给消息做了个标志位。Redis自然考虑到了这一点,所以它提供了一个定长Stream功能。在xadd
的指令提供一个定长长度maxlen,就可以将老的消息干掉,确保最多不超过指定长度。
127.0.0.1:6379> xlen codehole (integer)5 127.0.0.1:6379> xadd codeholemaxlen 3 * name xiaorui age 1 1527855160273-0 127.0.0.1:6379> xlen codehole (integer)3
我们看到Stream的长度被砍掉了。
消息如果忘记ACK会怎样?
Stream在每个消费者结构中保存了正在处理中的消息ID列表PEL,如果消费者收到了消息处理完了但是没有回复ack,就会导致PEL列表不断增长,如果有很多消费组的话,那么这个PEL占用的内存就会放大。
PEL如何避免消息丢失?
在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是PEL里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到PEL中的消息ID列表。不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的PEL消息以及自last_delivered_id之后的新消息。
分区Partition
Redis没有原生支持分区的能力,想要使用分区,需要分配多个Stream,然后在客户端使用一定的策略来讲消息放入不同的stream。
<?php //连接reids $redis = new Redis(); $redis->connect('127.0.0.1', 6379); //xadd:追加消息 //xdel:删除消息,删除标志位,不影响消息总长度 //xrange:消息列表,过滤已删除的消息 //xlen:消息长度 //del: 删除所有消息 $redis->rawCommand('del','codehole'); // 星号表示自动生成id,后面参数key,value $redis->rawCommand('xadd','codehole','*','name','user1','age','20'); $redis->rawCommand('xadd','codehole','*','name','user2','age','18'); $redis->rawCommand('xadd','codehole','*','name','user3','age','19'); $redis->rawCommand('xadd','codehole','*','name','user4','age','19'); //maxlen 定长长度,将老消息干掉,确保链表不会超过指定长度 $redis->rawCommand('xadd','codehole','maxlen','3','*','name','user5','age','19'); //XDEL codehole id //$redis->rawCommand('xdel','codehole','1538561700640-0'); $res = $redis->rawCommand('xlen','codehole'); echo "<pre>"; var_dump($res); echo '<br />'; // -最小值 +最大值 $res = $redis->rawCommand('xrange','codehole','-','+'); print_r($res); echo '<br />'; $id = $res[1][0]; // 指定最小消息列表 $res = $redis->rawCommand('xrange','codehole',$id,'+'); // var_dump($res); // echo '<br />'; // 指定最大消息列表 $res = $redis->rawCommand('xrange','codehole','-',$id); // var_dump($res); // echo '<br />'; // 指定最大消息列表 $res = $redis->rawCommand('xrange','codehole','-',$id); // var_dump($res); // echo '<br />'; /************************独立消费************************/ //从stream中头部读取两条消息 $res = $redis->rawCommand('xread','count','2','streams','codehole','0-0'); // var_dump($res); // echo '<br />'; //从尾部读取一条消息,这里不会返回任何消息 $res = $redis->rawCommand('xread','count','1','streams','codehole','$'); // var_dump($res); // echo '<br />'; //block 0 表示永久阻塞,直到消息到来,block 1000表示阻塞1秒,如果1秒没新消息,返回null //从尾部阻塞等待消息到来,然后新开一个窗口塞消息,这时候阻塞解除返回新消息内容 // $res = $redis->rawCommand('xread','block','0','count','1','streams','codehole','$'); // var_dump($res); // echo '<br />'; /************************消费组************************/ // 星号表示自动生成id,后面参数key,value $redis->rawCommand('xadd','mq','*','msg','1'); $redis->rawCommand('xadd','mq','*','msg','2'); $redis->rawCommand('xadd','mq','*','msg','3'); $redis->rawCommand('xadd','mq','*','msg','4'); $redis->rawCommand('xadd','mq','*','msg','5'); //创建消费组mqGroup 为消息队列 mq 从第一条开始消费 $redis->rawCommand('xgroup','create','mq','mqGroup','0'); //从从尾部开始消费 //$redis->rawCommand('xgroup','create','mq','mqGroup','$'); //消费者A,消费第1条 $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>'); //消费者A,消费第2条 $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>'); //消费者B,消费第3条 $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerB', 'count', '1' ,'streams', 'mq', '>'); //消费者A,消费第4条 $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerA', 'count', '1' ,'streams', 'mq', '>'); //消费者c,消费第5条 $res = $redis->rawCommand('xreadgroup','group', 'mqGroup', 'consumerC', 'count', '1' ,'streams', 'mq', '>'); //获取strarm信息 $res = $redis->rawCommand('xinfo','stream','mq'); echo "<pre>"; print_r($res); echo '<br />'; //获取strarm消费组信息 $res = $redis->rawCommand('xinfo','groups','mq'); print_r($res); echo '<br />'; //同一个消费组有多个消费者,观察每个消费者的状态 $res = $redis->rawCommand('xinfo','consumers','mq','mqGroup'); print_r($res); echo '<br />'; //mpGroup的Pending等待列表情况 + 0 10 //使用 -:start +:end 10:count 选项可以获取详细信息 //$res = $redis->rawCommand('xpending','mq','mqGroup'); //$res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10'); $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA'); print_r($res); echo '<br />'; //通知消息处理结束,用消息ID标识 $msg_id = $res[0][0]; $res = $redis->rawCommand('xack','mq','mqGroup',$msg_id); print_r($res); echo '<br />'; //再次查看Pending列表 $res = $redis->rawCommand('xpending','mq','mqGroup','-','+','10','consumerA'); print_r($res); echo '<br />'; //转移超过36s的消息id到消费者B的Pending列表 $redis->rawCommand('xclaim','mq','mqGroup','consumerB','36000',$msg_id);
基于List
使用rpush
和lpush
操作入队列,lpop
和rpop
操作出队列。List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。
但是当队列为空时,lpop和rpop会一直空轮训,消耗资源;所以引入阻塞读blpop
、brpop
(b代表blocking),阻塞读在队列没有数据的时候进入休眠状态。一旦数据到来则立刻醒过来,消息延迟几乎为零。
空闲连接
如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常,还有重试。
缺点:
- 做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认。
- 不能做广播模式,如pub/sub,消息发布/订阅模型
- 不能重复消费,一旦消费就会被删除
- 不支持分组消费
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); //发送消息 $redis->lPush($list, $value); //消费消息 while (true) { try { $msg = $redis->rPop($list); if (!$msg) { sleep(1); } //业务处理 } catch (Exception $e) { echo $e->getMessage(); } }
上面代码会有个问题如果队列长时间是空的,那pop就不会不断的循环,这样会导致redis的QPS升高,影响性能。所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。还有一个需要注意的点是我们需要是用try/catch来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用。
使用pub/sub
subscribe
:用于订阅信道。publish
:向信道发送消息。unsubscribe
:取消订阅。
此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。
优点
- 典型的广播模式,一个消息可以发布到多个消费者。
- 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息。
- 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息。
缺点
- 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。
- 不能保证每个消费者接收的时间是一致的。
- 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时。
可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
基于 Sorted Set
Sortes Set(有序列表),类似于java的SortedSet和HashMap的结合体,一方面她是一个set,保证内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的
排序权重。内部实现是“跳跃表”。
有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据
Score排序的特征,就可以制作一个有序的消息队列了。
优点
就是可以自定义消息ID,在消息ID有意义时,比较重要。
缺点
缺点也明显,不允许重复消息(因为是集合),同时消息ID确定有错误会导致消息的顺序出错。