MQ-Redis

在进行消息队列预研的时候,发现Redis也能做为消息队列,看看Redis做为消息队列是如何实现的

Redis做为消息队列有三种方案

  1. List
  2. Streams
  3. Pub/Sub

List

Redis的List是简单的字符串列表,底层由 quicklist 实现。

image-20220102114419862

List消息队列原理

命令 用法 描述
LPUSH LPUSH key value [value …] 将一个或多个值value插入到列表key的表头,如果有多个value值,那么各个value值按从左到右的顺序依次插入到表头
RPUSH RPUSH key value [value …] 将一个或多个值value插入到列表key的表尾(最右边)
LPOP LPOP key [count] 移除并返回列表key的头元素(count 指定出队列数目)
BLPOP BLPOP key [key …] timeout 移除并获取列表的第一个元素,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
RPOP PROP key 移除并返回列表key的尾元素
BRPOP BRPOP key [key …] timeout 移除并获取列表的最后一个元素,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
BRPRPLPUSH BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它;如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
RPOPLPUSH RPOPLPUSH source destination 命令RPOPLPUSH 在一个原子时间内,执行以下两个动作:将列表source中的最后一个元素(尾元素)弹出,并返回给客户端。将source弹出的元素插入到猎豹destination,做为destination列表的头元素
LLEN LLEN key 返回列表key的长度。如果key不存在,则key被解释为一个空列表,返回0。如果key不是列表类型,返回一个错误
LRANGE LRANGE key start stop 返回列表key中指定区间内的元素,区间以偏移量 start 和 stop 指定

使用命令组合即可实现消息的出队入队

  • LPUSH、RPOP 左进右出
  • RPUSH、LPOP 右进左出
image-20220102115048709

img

通过LPUSH、RPOP这样的方式,会存在一个性能风险点:

消费者要即使的处理数据,类似要在消费端添加类似 while(true) 的逻辑,不停的调用RPOP或LPOP命令,这样就会给消费者程序带来不必要的性能损失,于是 -->

Redis 提供了BLPOP、BRPOP这样阻塞式读取的命令(带B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据(节省不必要要的CPU开销)

  • LPUSH、BRPOP左进右阻塞出
  • RPUSH、BLPOP右进左阻塞出
image-20220102115726725

因为 Redis 单线程的特点,所以在消费数据时,同一个消息不会同时被多个 consumer 消费掉,但是需要我们考虑消费不成功的情况

可靠队列模式

List 队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。究其原因,就是缺少消息确认机制。

为了保证消息的可靠性,消息队列都会有完善的消息确认机制(Acknowledge),即消费者向队列报告消息已收到或已处理的机制

RPOPLPUSH、BRPOPLPUSH (阻塞)从一个 list 中获取消息的同时把这条消息复制到另一个 list 里(可以当做备份),而且这个过程是原子的

image-20220102120240078 img

数据标识从一个 List 取出后放入另一个 List,业务操作安全执行完成后,再去删除 List 中的数据,如果有问题的话,很好回滚

延时消息

通过 zset 来实现延时消息队列,原理就是将消息加到 zset 结构后,将要被消费的时间戳设置为对应的 score 即可,只要业务数据不会是重复数据就可以

Pub/Sub

消息模型包含

  • 点对点:Point-to-Point(P2P)
  • 发布订阅:Publish/Subscribe(Pub/Sub)

List 实现方式就是点对点模式,Redis的发布订阅模式(消息多播)就是真正的Redis MQ

img

"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制

  • 订阅/发布到频道

  • 订阅/发布到模式

频道可以先理解为是个 Redis 的 key 值,而模式则是一个类似正则匹配的 Key,只是个可以匹配给定模式的频道。这样就不需要显式的去订阅多个名称了,可以通过模式订阅这种方式,一次性关注多个频道

Pub/Sub常用命令

命令 用法 描述
PSUBSCRIBE PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道
PUBSUB PUBSUB subcommand [argument [argument …]] 查看订阅与发布系统状态
PUBLISH PUBLISH channel message 将信息发送到指定的频道
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern …]] 退订所有给定模式的频道
SUBSCRIBE SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息
UNSUBSCRIBE UNSUBSCRIBE [channel [channel …]] 指退订给定的频道

频道

image-20220102121238580

如上创建一个生产者和两个消费者,消费者1 subscribe channel1 channel2,消费者2subscribe channel1。当生产者使用命令PUBBLISH channel message 向 隧道channel1发送消息时,两个消费者都能。向隧道channel2发送消息时,只有消费者1能够收到消息

其中消费者每次都可以收到3个参数的消息

  • 消息的种类
  • 频道的名称
  • 实际的消息

模式

订阅符合给定模式的频道,命令是 PSUBSCRIBE

image-20220102122113100

如上创建一个生产者和两个消费者,一个使用 SUBSCRIBE channel1,另外一个使用 PSUBSCRIBE chann*,当生产者使用PUBLISH channel1 msg3,两个消费者都能收到消息

PSUBSCRIBE 更像是支持匹配模式的消费者

img

Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Streams

Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的

img

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

Streams 是 Redis 专门为消息队列设计的数据类型,所以提供了丰富的消息队列操作命令

Stream常用命令

描述 命令
添加消息到末尾,保证有序,可以自动生成唯一ID XADD key ID field value [field value …]
对流进行修剪,限制长度 XTRIM key MAXLEN [~] count
删除消息 XDEL key ID [ID …]
获取流包含的元素数量,即消息长度 XLEN key
获取消息列表,会自动过滤已经删除的消息 XRANGE key start end [COUNT count]
以阻塞或非阻塞方式获取消息列表 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] id [id …]
创建消费者组 XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
读取消费者组中的消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
将消息标记为"已处理" XACK key group ID [ID …]
为消费者组设置新的最后递送消息ID XGROUP SETID [CREATE key groupname id-or-] [DESTROY key groupname]
删除消费者 XGROUP DELCONSUMER [CREATE key groupname id-or-] [DESTROY key groupname]
删除消费者组 XGROUP DESTROY [CREATE key groupname id-or-] [DESTROY key groupname] [DEL
显示待处理消息的相关信息 XPENDING key group [start end count] [consumer]
查看流和消费者组的相关信息 XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
打印流信息 XINFO STREAM [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
image-20220104231134425
  • * 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
  • 消息ID 必须要比上个 ID 大
  • -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用

独立消费

xread 以阻塞或非阻塞方式获取消息列表,指定BLOCK选项即表示阻塞,超过时间0ms(意味用不超时)

image-20220104231752155
  • 阻塞的从尾部读取流,开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞

没有给流 mystream 传入一个常规的 ID,而是传入了一个特殊的 ID $

$ 意思是: XREAD 应该使用流 streamtest 已经存储的最大 ID 作为最后一个 ID

当然,也可以指定任意有效的 ID。

而且, XREAD 的阻塞形式还可以同时监听多个 Strem,只需要指定多个键名即可

1
127.0.0.1:6379> xread block 0 streams mystream yourstream $ $

多个客户端监听相同的stream,那么它们都会收到消息!!!,如果想多个客户端监听同一个流怎么办呢?便是创建消费者组

创建消费者组

上述 xread 虽然分发到 N 个客户端,如果想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。比如下图这样,三个消费者按轮训的方式去消费一个 Stream

img

Redis Stream 借鉴了很多 Kafka 的设计。

  • Consumer Group:有了消费组的概念,每个消费组状态独立,互不影响,一个消费组可以有多个消费者
  • last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费组已经消费到哪条消息了
  • pending_ids :消费者的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
img

Stream 不像 Kafak 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。

  • xgroup create:创建消费者组
  • xgreadgroup:读取消费组中的消息
  • xack:ack 掉指定消息

img

image-20220104232852906

按消费组消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

image-20220105000229340 image-20220105000302628

参考链接

  1. https://stor.51cto.com/art/202101/640335.htm
  2. http://xiaorui.cc/archives/5285