在Redis中提供了三种实现消息队列的方式:

* List结构:基于List结构来模拟消息队列
* PubSub:基本的点对点消息模型
* Stream:较完善的消息队列模型
<>1. List实现消息队列

Redis的List数据结构类型是一个双向链表,而队列要求进,出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。

* LPUSH,RPOP
* RPUSH,LPOP

但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令

* BRPOP
* BLPOP
上述两个命令的取出效果是阻塞式的。

List实现消息队列的缺点:

* 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
* 只能支持单个消费
<>2. 基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

* SUBSCRIBE channel [channel] :订阅一个或多个频道
* PUBLISH channel msg :向一个频道发送消息
* PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
这里的PSUBSCRIBE与RabbitMQ的匹配相似。

基于PubSub的消息队列的缺点:

* List支持数据持久化,但是PubSub不支持数据持久化
<>3. 基于Stream的消息队列

Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列

添加命令

例如
XADD users * name jack age 21
users是队列,*表示消息id ,后面的部分表示消息体

消费命令

当ID为$时代表读取最新的消息。

例如
XREAD COUNT 1 STREAMS users 0
COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取

注意:Stream的消息队列消费消息后是不会剔除该消息的

缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象

Stream消息队列的优点:

* 消息可回溯(消费后不会被剔除)
* 消息可以被多个消费者读取
* 可以阻塞读取
<>3.1 消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

*
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
* 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
*
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移
如何创建消费者组?
XGROUP CREATE key groupName ID [MKSTREAM]
* key:队列名称
* groupName:消费者组名称
* ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
* MKSTREAM:队列不存在时自动创建

如何从消费者组读取消息?
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK]
STREAMS key[key..] ID [ID..]
* group:消费者组名称
* consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
* count:本次查询最大数量
* BLOCK milliseconds:是否阻塞?阻塞的时间
* NOACK:消费消息后不响应
* STREAMS key:指定队列名称
* ID:获取消息的起始ID >表示从下一个未消费的消息开始
。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
那么消费者消费完消息后如何确认消息呢?
XACK key group ID [ID..]
* key:队列名称
* group:消费者组名称
* ID:消息的ID
java手动模拟消费者监听消息的代码
while(true){ Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT
每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >"); if (message == null){ continue; } try{
// 处理消息的逻辑 处理完毕后要ACK handleMessage(message); }catch (Exception e){ while (true){
// 从等待响应的队列里拿消息 Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称
消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >"); if (unAckMessage == null){
continue; } try { handleMessage(unAckMessage); }catch (Exception e1){ continue;
} } } }

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信