Redis中的Stream实操:创建-消费-查看
一、什么是Redis中的Stream?支持多播的可持久化的消息队列,其作者也坦言借鉴了Kafka的设计。它其实是一个消息链表,每个消息都有唯一的消息id,消息是持久化的,Redis重启后消息仍在。每个Stream都可以挂载多给 消费组,每个消费组会有个游标,表示当前消费组已经消费到哪条消息了。同一个消费组可以挂接多个消费者,每个消费者之间是竞争关系,一个消费者消费了消息,游标就有向前移动。...
一、什么是Redis中的Stream?
支持多播的可持久化的消息队列,其作者也坦言借鉴了Kafka的设计。
它其实是一个消息链表,每个消息都有唯一的消息id,消息是持久化的,Redis重启后消息仍在。
每个Stream都可以挂载多给 消费组,每个消费组会有个游标,表示当前消费组已经消费到哪条消息了。
同一个消费组可以挂接多个消费者,每个消费者之间是竞争关系,一个消费者消费了消息,游标就有向前移动。
消费者内部会有一个状态变量pengding_ids,它记录了当前已经被客户端读取,但是还没有ack的消息。
二、消息相关指令
xadd:向Strem追加消息
xdel:从Stream中删除消息,只是做标记。
xrange:获取Stream中的消息列表,会自动过滤已经删除的消息。
xlen:获取Stream的消息长度。
del:删除整个Stream消息列表中的所有消息。
xadd stream1 * name jxl age 18
xadd stream1 * name zsh age 18
xlen stream1
xrange stream1 - +
xrange stream1 111 +
xdel stream1 111
del stream1
区别一条消息的是消息id,Redis生成的消息id是毫秒时间戳加中横线加从0开始的序号。所以消息id的顺序就是Stream中的消息的顺序。*号的意思是自动生成消息id。
三、独立消费相关指令
独立消费所有消息:
xread count 2 streams stream1 0-0
独立消费之后的消息,历史消息不消费了:
xread count 2 streams stream1 $
四、消费组消费相关指令
创建消费组group1,并指定从哪里开始消费。0-0表示从头开始消费。$表示只接受新消息。
xgroup create stream1 group1 0-0
获取Stream中消息:
xinfo stream stream1
获取Stream的消费组信息:
xinfo groups stream1
消费消息:
xreadgroup GROUP group1 consumer1 count 1 streams stream1 >
查看消费组里的消费消息的情况,有几个消息在pending中。
xinfo consumers stream1 group1
确认消息的消费:
xack stream1 group1 messageid
更多推荐
所有评论(0)