一、什么是Redis中的Stream?

支持多播的可持久化的消息队列,其作者也坦言借鉴了Kafka的设计。

它其实是一个消息链表,每个消息都有唯一的消息id,消息是持久化的,Redis重启后消息仍在。

每个Stream都可以挂载多给 消费组,每个消费组会有个游标,表示当前消费组已经消费到哪条消息了。

同一个消费组可以挂接多个消费者,每个消费者之间是竞争关系,一个消费者消费了消息,游标就有向前移动。

消费者内部会有一个状态变量pengding_ids,它记录了当前已经被客户端读取,但是还没有ack的消息。

二、消息相关指令

xadd:向Strem追加消息

xdel:从Stream中删除消息,只是做标记。

xrange:获取Stream中的消息列表,会自动过滤已经删除的消息。

xlen:获取Stream的消息长度。

del:删除整个Stream消息列表中的所有消息。

xadd stream1 * name jxl age 18
xadd stream1 * name zsh age 18
xlen stream1
xrange stream1 - +
xrange stream1 111 +
xdel stream1 111
del stream1

区别一条消息的是消息id,Redis生成的消息id是毫秒时间戳加中横线加从0开始的序号。所以消息id的顺序就是Stream中的消息的顺序。*号的意思是自动生成消息id。

三、独立消费相关指令

独立消费所有消息:

xread count 2 streams stream1 0-0

独立消费之后的消息,历史消息不消费了:

xread count 2 streams stream1 $

四、消费组消费相关指令

创建消费组group1,并指定从哪里开始消费。0-0表示从头开始消费。$表示只接受新消息。

xgroup create stream1 group1 0-0

获取Stream中消息:

xinfo stream stream1

获取Stream的消费组信息:

xinfo groups stream1

消费消息:

xreadgroup GROUP group1 consumer1 count 1 streams stream1 >

查看消费组里的消费消息的情况,有几个消息在pending中。

xinfo consumers stream1 group1

确认消息的消费:

xack stream1 group1 messageid
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐