Redis(九):新增数据结构 Stream
文章目录一、Stream1、增删改查2、独立消费3、创建消费组一、Stream Redis5.0 被作者 Antirez 突然放了出来,增加了很多新的特色功能。而 Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列 ,作者坦言 Redis Stream 狠狠地借鉴了 Kafka 的设计。&
Stream
Redis5.0 被作者 Antirez 突然放了出来,增加了很多新的特色功能。而 Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列 ,作者坦言 Redis Stream 狠狠地借鉴了 Kafka 的设计。
Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd
指令追加消息时自动创建。
每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id ,在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create
进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组 Consumer Group 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。
同一个消费组 Consumer Group 可以挂接多个消费者 Consumer,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取
的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
-
消息 ID
消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是 整数-整数 ,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。 -
消息内容
消息内容就是 键值对,形如 hash 结构的键值对,这没什么特别之处。
1、增删改查
(1)xadd 追加消息,生产消息:
XADD key ID field string [field string ...]
需要提供 key,消息ID 方案,消息内容,其中消息内容为key-value 型数据。 ID,最常使用 *
,表示由 Redis 生成消息 ID,这也是强烈建议的方案。 field string [field string], 就是当前消息内容,由 1个 或多个 key-value 构成。
(2)xlen 消息长度:
(3)xrange 获取消息列表,会自动过滤已经删除的消息。- 表示最小值 , + 表示最大值:
(4)xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度:
(5)del 删除 整个 Stream:
2、独立消费
我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令 xread
,**可以将 Stream 当成普通的消息队列 (list) 来使用。**使用 xread 时,我们可以完全忽略消费组 Consumer Group的存在,就好比 Stream 就是一个普通的列表 (list)。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
其中,[COUNT count],用于限定获取的消息数量;[BLOCK milliseconds],用于设置 XREAD 为阻塞模式,默认为非阻塞模式。ID,用于设置由哪个消息ID开始读取。使用 0 表示从第一条消息开始。这里需要注意,消息队列 ID 是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用$
,表示最新的消息 ID。(在非阻塞模式下$
无意义)。
XREAD 读消息时 分为 阻塞 和 非阻塞 模式,使用 BLOCK 选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
例如:
从 Stream 头部读取两条消息:
从 Stream 尾部读取一条消息,因为默认非阻塞模式,所以这里不会返回任何消息:
从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来:
这时重新打开一个窗口,在这个窗口往 Stream 里塞消息:
再切换到前面的窗口,可以看到 阻塞解除了,返回了新的消息内容, 而且还显示了一个等待时间,这里等待了 22 s 。
客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。
block 0 表示永远阻塞,直到消息到来,block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil。
3、创建消费组
Stream 通过 xgroup create
指令创建消费组 Consumer Group,需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是说,消息队列中有 10 条消息,三个消费者都可以消费到这 10 条消息。
但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有 10 条消息,三个消费者分别消费其中的某些消息,比如消费者 A 消费消息 1、2、5、8;消费者 B 消费消息 4、9、10,而消费者 C 消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:
消费者组模式的支持主要由两个命令实现:
XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作。
XREADGROUP,分组消费消息操作。
例如,使用 5 个消息,思路是:创建一个 Stream 消息队列,生产者生成 5 条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:
创建消费组 mqGroup:
XGROUP CREATE mq mqGroup 0
,用于在消息队列 mq 上创建消费组 mpGroup,最后一个参数 0,表示该组从第一条消息开始消费。(意义与XREAD 的 0 一致)。除了支持 CREATE
外,还支持 SETID
设置起始ID,DESTROY
销毁组,DELCONSUMER
删除组内消费者等操作。
消费者A,消费第 1 条:(注意不用漏了这个 “>”,否则会报 (error) ERR Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified.
,这是因为命令末尾应带有 ID 或 $ ,我们可以为 XReadGroupArgs 结构添加 [ ] 字符串类型的 ID 字段,如果为 nil,则在命令末尾添加“>” 。)
然后接下来,消费者 B 消费第 2 条;消费者 A 消费第 3 条;消费者 B 消费第 4 条;消费者 B 消费第 5 条:
两个个在同一组 mpGroup 消费者A、B 在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, B->2, A->3, B->4, B->5。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
,用于组mqGroup 内消费者 consumerA 在队列 mq 中消费,参数 > 表示未被组内消费的起始消息,参数 count 1 表示获取一条。语法与 XREAD 基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM 类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。
4、Pending 等待列表
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令 XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。
可以看到,有 5 个 已读取,但是未处理的消息,消费者 A 有 2 个;消费者 B 有 3 个。
可以看到,使用 start end count 选项可以获取详细信息,从读取到现在经历的毫秒数、消息被读取的次数。
再加上消费者参数,可以获取具体某个消费者的Pending列表。
每个Pending的消息有4个属性:
- 消息ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
上面的结果我们可以看到,我们之前读取的消息,都被记录在 Pending 列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成:
再次查看 Pending 列表:
可以看到,已读取但未处理的消息已经变为 4 个。
有了这样一个 Pending 机制,就意味着 在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转交给其他的消费者处理,就是消息转移。
5、消息转移
消息转移的操作时,将某个消息转移到自己的 Pending 列表中。使用语法 XCLAIM 来实现,需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移。
可以看到,当前属于消费者 A 的消息 1609946268230-2 已经 1890574 ms没有处理了,转移超过 36s 的消息到消费者 B 的 Pending 列表里去:
然后再去查看 Pending 列表:
OKK 转移成功啦~。而且可以看到,IDLE 被重置了,读取次数也累加了一次。
转移除了要指定 ID 外,还需要指定 IDLE ,保证是长时间未处理的 才被转移。被转移的消息的 IDLE 会被重置,用以保证不会被重复转移,因为 可能会出现 将过期的消息 同时转移给多个消费者 的并发操作。设置了IDLE ,则可以避免后面的转移不会成功,因为 IDLE 不满足条件。例如下面的连续两条转移,第二条不会成功。
6、坏消息问题 / Dead Letter 死信问题
正如上面所说,如果某个消息,不能被消费者处理,也就是不能被 XACK,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此。此时该消息的 delivery counter 就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用 XDEL 语法,如:
删除队列中的消息:
查看队列:
确实没有那条消息了。但是要注意,并没有删除 Pending 中的消息,因此,查看Pending,会发现消息还会在:
可以执行XACK标识其处理完毕,再去看 Pending 列表,就会发现,消息被删除啦。
更多推荐
所有评论(0)