1.概述Redis Stream是Redis 5.0版本新增加的数据结构。
Redis Stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃,而且没有ACK机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了,简单来说发布订阅(pub/sub)可以分发消息,但无法记录历史消息。
而Redis Stream提供了消息的持久化和主备复制功能,支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream是一个链表,会将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容
Redis Stream组成部分说明:
Message Content
消息
Consumer group
消费组,通过XGROUP CREATE命令创建,同一个消费组可以有多个消费者
Last_delivered_id
游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动
Consumer
消费者,包含在消费组当中
Pending_ids
消费者会布一个状态变量,用于记录被当前消费者已读取但未ack的消息id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理
2.队列(生产)相关命令 2.1 XADD添加消息到队列末尾,如果队列不存在,则会先创建队列
key 键*|id 消息的ID,格式必须是时间戳-序列号这样的方式,下一条的ID必须比上一条的要大,写成*号,系统将自动生成。ID比较大小时,先比较时间戳大小,若相同再比较序列号field value 键值对XADD <key> <*|id> <field value> [<field value ...>]例
127.0.0.1:6379> XADD mystream * name lzj age 28"1758432825026-0"127.0.0.1:6379> XADD mystream * name xxn age 24"1758432832463-0"返回的1748835683065-0和1748835967616-0就是消息的ID,-前的数字为生成消息时的毫秒级时间戳,-后面的数字代表同一毫秒内产生的第几个序列号
在相同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别,因此不用担心序列号会不够用。毫秒数取的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis在增加信息条目时会检査当前id与上一条目的id,自动纠正错误的情况,一定要保证后面的id比前面大,一个流中信息条目的ID必须是单调增的,这是stream的基础。
Stream的数据类型就是Stream
127.0.0.1:6379> type mystreamstream 2.2 XRANGE获取消息列表(可以指定范围),忽略删除的消息
key 对应的streamstart 开始值,用-表示最小end 结束值,用+表示最小count 限制条数XRANGE <key> <start> <end> [Count <count>]例
127.0.0.1:6379> xrange mystream - + 1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28"2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24" 2.3 XREVRANGE和XRANGE相比,区别在于反向获取,ID从大到小
例
127.0.0.1:6379> xrevrange mystream + - 1) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24"2) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28" 2.4 XDEL删除消息,按照ID删除
XDEL <key> <id> [<id ...>]例
XDEL mystream 1748835967616-0 2.5 XLEN获取Stream中的消息长度
127.0.0.1:6379> XLEN mystream2 2.6 XTRIM限制Stream的长度,如果已经超长会进行截取
MAXLEN 允许的最大长度,对流进行修剪限制长度MINID 允许的最小id,从某个id值开始比该id值小的将会被抛弃XTRIM <stream> MAXLEN|MINID <n>例
127.0.0.1:6379> XTRIM mystream MAXLEN 24127.0.0.1:6379> XTRIM mystream MINID 1748841640027-01 2.7 XREAD获取消息(阻塞/非阻塞),返回大于指定ID的消息
COUNT 最多读取多少条BLOCK 是否以阻塞的方式读取消息,默认不阻塞,如果miliseconds设置为0,表示永远阻塞STREAMS 队列ID 指定的ID,用$代表队列内现存最大的ID的后一个ID,用0-0(或0; 00)代表队列中最小的IDXREAD [COUNT <count>] [BLOCK <milliseconds>] STREAMS <key> [<key ...>] ID [<id ...>]例:用$代表队列内现存最大的ID的后一个ID,因为该ID并不存在所以会返回空
127.0.0.1:6379> Xrange mystream - +1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28"2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24"127.0.0.1:6379> XREAD STREAMS mystream $(nil)例:用0-0代表队列中最小的ID,当指定为0-0的同时不指定count时,会返回队列中所有的元素
127.0.0.1:6379> Xrange mystream - +1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28"2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24"127.0.0.1:6379> XREAD STREAMS mystream 0-01) 1) "mystream" 2) 1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28" 2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24"例:阻塞:监听mystream中比最新的一条还靠后的一条,读取不到就会阻塞,一直监听
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS mystream $打开另一个命令窗口,生产消息
127.0.0.1:6379> xadd mystream * k1 v1"1758434052255-0"XREAD停止阻塞,打印出新生产的一条消息和等待时间
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS mystream $1) 1) "mystream" 2) 1) 1) "1758434052255-0" 2) 1) "k1" 2) "v1"(183.38s) 3.消费(组)相关命令 3.1 XGROUP CREATE创建消费组
stresm 队列group 消费组id 创建消费组时必须指定ID,0代表从头消费,$代表从队尾消费(只消费最新消息)XGROUP CREATE <stresm> <group> <id>例:
127.0.0.1:6379> xgroup create mystream group1 $OK127.0.0.1:6379> xgroup create mystream group2 0OK 3.2 XREADGROUP GROUP允许多个消费者作为一个组来合作消费同一个stream中的消息,同一个stream中的消息一旦被消费组里面的一个消费者读取了,同组的其他消费者就无法再次读取
group 消费组consumer 消费者stream 队列id 指定从哪个ID开始读取,特殊写法:>代表获取组内未分发给其他消费者的新消息(未分发必然未ACK),0代表获取已分发但未被消费者ACK的消息count 读取的数量,可以用于每个消费者读取一部分消息,实现消费的负载均衡XREADGROUP GROUP <group> <consumer> [COUNT <count>] STREAMS <stream, ...> <id>例:同组消费者不能重复消费消息
先建两个消费组
127.0.0.1:6379> xgroup create mystream groupA 0OK127.0.0.1:6379> xgroup create mystream groupB 0OKgroupA里面的新建消费者consumer1进行消费
127.0.0.1:6379> XREADGROUP GROUP groupA consumer1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28" 2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24" 3) 1) "1758434052255-0" 2) 1) "k1" 2) "v1"groupA中再去新建消费者consumer2进行消费,无法再消费消息
127.0.0.1:6379> XREADGROUP GROUP groupA consumer2 STREAMS mystream >(nil)但是,groupB中新建一个消费者取消费,可以读取到消息
127.0.0.1:6379> XREADGROUP GROUP groupB consumer1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28" 2) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24" 3) 1) "1758434052255-0" 2) 1) "k1" 2) "v1"例:负载均衡的消费,新建消费组groupC,三个消费者每个消费一条消息
127.0.0.1:6379> xgroup create mystream groupC 0OK127.0.0.1:6379> XREADGROUP GROUP groupC consumer1 COUNT 1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28"127.0.0.1:6379> XREADGROUP GROUP groupC consumer2 COUNT 1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1758432832463-0" 2) 1) "name" 2) "xxn" 3) "age" 4) "24"127.0.0.1:6379> XREADGROUP GROUP groupC consumer3 COUNT 1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1758434052255-0" 2) 1) "k1" 2) "v1" 3.3 消息的ACK机制基于Stream的消息,怎样保证消费者发生故障或宕机以后,仍然能读取未处理完的消息?Stream采用的是一个内部队列pending_list,记录消费组中消费者的读取记录,直到消费者使用xack命令来通知stream消息已经处理完成,这种消费确认机制增强了消息的可靠性。
刚刚的消费操作仅仅是对消息的读取,实际上并没有ACK“签收”
例:通过命令XPENDING查询groupC中已读取,但未确认的消息
127.0.0.1:6379> XPENDING mystream groupC1) (integer) 3 #总数2) "1758432825026-0" #起始ID3) "1758434052255-0" #结束ID4) 1) 1) "consumer1" #每个消费组消费的消息数量 2) "1" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"例:通过命令XPENDING查询groupB中consumer1已读取,但未确认的消息,从小到大查询10个
127.0.0.1:6379> XPENDING mystream groupB - + 10 consumer11) 1) "1758432825026-0" 2) "consumer1" 3) (integer) 2848434 4) (integer) 12) 1) "1758432832463-0" 2) "consumer1" 3) (integer) 2848434 4) (integer) 13) 1) "1758434052255-0" 2) "consumer1" 3) (integer) 2848434 4) (integer) 1例:使用XACK向消息队列确认groupB组的消息1758432825026-0已经处理完成
127.0.0.1:6379> XACK mystream groupB 1758432825026-0(integer) 1127.0.0.1:6379> XPENDING mystream groupB - + 10 consumer11) 1) "1758432832463-0" 2) "consumer1" 3) (integer) 3301753 4) (integer) 12) 1) "1758434052255-0" 2) "consumer1" 3) (integer) 3301753 4) (integer) 1 4.XINFOXINFO命令用于打印出一些stream结构相关的信息
例:XINFO stream打印出mystream队列的详细信息
127.0.0.1:6379> XINFO stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1758434052255-0" 9) "max-deleted-entry-id"10) "0-0"11) "entries-added"12) (integer) 313) "recorded-first-entry-id"14) "1758432825026-0"15) "groups"16) (integer) 517) "first-entry"18) 1) "1758432825026-0" 2) 1) "name" 2) "lzj" 3) "age" 4) "28"19) "last-entry"20) 1) "1758434052255-0" 2) 1) "k1" 2) "v1"例:XINFO groups打印出mystream队列队列上存在的消费组信息
127.0.0.1:6379> XINFO groups mystream1) 1) "name" 2) "group1" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1758434052255-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 02) 1) "name" 2) "group2" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "0-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 33) 1) "name" 2) "groupA" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 3 7) "last-delivered-id" 8) "1758434052255-0" 9) "entries-read" 10) (integer) 3 11) "lag" 12) (integer) 04) 1) "name" 2) "groupB" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1758434052255-0" 9) "entries-read" 10) (integer) 3 11) "lag" 12) (integer) 05) 1) "name" 2) "groupC" 3) "consumers" 4) (integer) 3 5) "pending" 6) (integer) 3 7) "last-delivered-id" 8) "1758434052255-0" 9) "entries-read" 10) (integer) 3 11) "lag" 12) (integer) 0