前言
最近我的项目中应用到了 Redis 中的 stream 数据类型作为音讯队列应用,相比于其余 redis 实现的音讯队列更加不便。因为是第一次应用,记录下知识点当前备用。
Stream 类型
基于 redis 的音讯队列有好多种实现,然而大多都有其特点和问题,自身 redis 只是个缓存啊????,预计官网都看不下去了,这才在 redis 5.0 里加了一种数据类型专门用来实现典型的音讯队列。
stream 类型简直具备了一个音讯队列所须要用到的所有性能,包含但不限于:
- 音讯 ID 的序列化生成
- 音讯遍历
- 解决未确认的音讯
- 音讯的阻塞和非阻塞读取
- 音讯的分组生产
- 音讯队列监控
等等 ….
上面介绍 stream 数据类型的应用办法。
stream 类型的应用
xadd 命令
语法格局为:
XADD key ID field value [field value …]
- key,用来指定 stream 的名字
- ID,用来指定 ID 值,最罕用的是 *
- field value [field value …],key-value 类型数据
xadd 用来在指定的 key 中增加音讯,如果 key 不存在,则主动创立。增加的音讯为 key-value
类型,能够一次增加多个音讯。
指定 ID,最罕用的是 *,示意由 redis 主动生成 ID,主动生成的 ID 为 1526919030474-55
格局,由毫秒工夫戳和序列号组成,序列号用于辨别同一毫秒内生成的音讯,保障 ID 始终是递增的。如果因为一些其余起因零碎时钟慢了,导致生成的工夫戳小于了 redis 中记录的值,则会取零碎中记录的最大值持续递增,保障 ID 的递增状态。
127.0.0.1:6379> xadd message 1 key1 value1 key2 value2
"1-0"
127.0.0.1:6379> xadd message 1-2 key1 value1 key2 value2
"1-2"
ID 在个别状况下是由 redis 主动指定的,但其实 ID 也是能够自定义的,为了保障 ID 的自增状态,手动指定的 ID 必须要大于零碎中存在的 ID,只不过个别不这么做。
127.0.0.1:6379> xadd message * key1 value1 key2 value2
"1604475735664-0"
以上命令增加了 key1-value 和 key2-value2 两条音讯到 message 这个 key 中,返回值为以后音讯的 ID,由 redis 主动生成,此时音讯队列中就有一条音讯能够被读取了。
127.0.0.1:6379> xadd message maxlen 10 * key3 value3
"1604476672762-0"
maxlen 参数用来限度 key 中音讯的最大数量,然而准确限度 key 中音讯的数量是低效的,能够应用 ~
符号粗略的限度 key 中音讯的数量,redis 会在能够删除整个宏结点时才去删除多余的音讯,理论数量可能会比限度数量多几十个,这是失常的,然而不会少于限度的数量。
xread 命令
语法格局为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] id [id …]
- [COUNT count],用来获取音讯的数量
- [BLOCK milliseconds],用来设置阻塞模式和阻塞超时工夫,默认为非阻塞
- id [id …],用来设置读取的起始 ID,相当于 where id > $id,阻塞模式中能够应用 $ 来获取最新的音讯 ID,非阻塞模式下无意义。
- key,指定 stream 的名字
xread 命令用于从一个或多个 key 中读取音讯,仅返回 ID 值大于参数中传入的 ID 的音讯。此命令有阻塞用法和非阻塞用法,如果 key 中没有音讯,则返回空。
127.0.0.1:6379> xread streams message 0
1) 1) "message"
2) 1) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
3) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
以上命令在非阻塞模式输入了所有的音讯,因为不存在 ID 比 0 还小的音讯,所以输入了所有的音讯。
阻塞模式中,能够应用 $ 符号来获取最新的音讯。如果在指定超时工夫内没有新的音讯,则返回空。
127.0.0.1:6379> xread count 10 block 10000 streams message $
(nil)
(10.02s)
127.0.0.1:6379> xread count 10 block 10000 streams message $
1) 1) "message"
2) 1) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
(5.00s)
输出命令后,能够察看到命令没有任何输入,此时新开一个 redis-cli,输出 xadd message * keyblock2 value
将一条新的音讯增加到 key 中,能够看到下面的命令返回了方才增加的值和阻塞工夫。
xlen 命令
语法格局:
XLEN key
返回 key 中音讯的数量,如果 key 不存在,则会返回 0。即便 key 中音讯的数量为 0,key 也不会被主动删除,因为可能还存在和 key 关联的消费者组。
127.0.0.1:6379> xlen message
(integer) 5
返回 message 中所有音讯的数量。
xrange 命令
语法如下:
XRANGE key start end [COUNT count]
- key,指定 stream 的名字
- start,起始 ID
- end,终止 ID
- [COUNT count],读取的数量
该命令返回与给定的 ID 范畴相匹配的音讯。ID 的范畴由 start 和 end 参数来指定。
127.0.0.1:6379> xrange message - +
1) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
3) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
4) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
5) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
此命令由两个非凡的 ID,应用 – 示意最小的 ID 值,应用 + 示意最大的 ID 值,能够查问所有的音讯。
127.0.0.1:6379> xrange message 1 1
1) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
即便 ID 不残缺也能够,只输出 ID 值会输入所有领有雷同 ID 不同序列号的音讯。
127.0.0.1:6379> xrange message - + count 3
1) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
3) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
应用 count 参数能够限度输入音讯的数量。
通过简略的循环能够应用大量内存迭代一个 key 中所有的值,只须要将上次迭代的后果中最大的 ID 作为下一次迭代的起始 ID 即可。
xrevrange 命令
语法阐明:
XREVRANGE key end start [COUNT count]
xrevrange 命令和 xrange 命令语法完全相同,只有一点不同,xrevrange 是反向遍历的,不再赘述。
127.0.0.1:6379> xrevrange message + -
1) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
2) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
3) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
4) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
5) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
xtrim 命令
语法阐明:
XTRIM key MAXLEN [~] count
- key,指定 stream 的名字
- maxlen,指定修剪策略,以后只实现了这一种
- [~],是否近似修剪
- count,修剪后的数量
xtrim 命令会从 ID 值比拟小的音讯开始抛弃。
127.0.0.1:6379> xread streams message 0
1) 1) "message"
2) 1) 1) "1-0"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
3) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
4) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
5) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
127.0.0.1:6379> xtrim message maxlen 4
(integer) 1
xtrim 命令的返回值是修剪掉的 ID 的数量。
127.0.0.1:6379> xrange message - +
1) 1) "1-2"
2) 1) "key1"
2) "value1"
3) "key2"
4) "value2"
2) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
3) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
4) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
再次查看能够看到 key 中的音讯数量被修剪掉了一个,只剩下了四个。
127.0.0.1:6379> xtrim message maxlen ~ 2
(integer) 0
如果应用了 ~ 参数,则可能不会进行修剪。此参数通知 redis 在可能删除整个宏节点时才执行修剪,这样做效率更高,并且能够保障音讯的数量不小于所须要的数量。
xdel 命令
语法阐明:
XDEL key ID [ID …]
- key,指定 stream 的名字
- ID [ID …],须要删除的 ID 值
xdel 命令用于从 key 中删除指定 ID 的音讯,当 ID 不存在时,返回的数目可能和删除的数目不统一。在执行 xdel 命令时,redis 并不会在内存中删除对应的音讯,而只会把它标记为删除,在所有节点都被删除之后整个节点被销毁,内存被回收。
127.0.0.1:6379> xdel message 1-2
(integer) 1
127.0.0.1:6379> xrange message - +
1) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
2) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
3) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
删除了 ID 为 1-2 的音讯。
xgroup 命令
语法阐明:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
- [CREATE key groupname id-or-$],在指定的 key 中创立分组,并且指定分组读取音讯的终点,如果指定了 0,分组将能够读取指定 key 的所有历史音讯,如果指定了 $,分组将能够读取指定 key 的新音讯,将不能读取历史音讯。也能够指定任意的开始 ID。
- [SETID key groupname id-or-$],从新给已存在的分组设置音讯读取的终点。例如将终点设置为 0 就能够从新读取所有的历史音讯
- [DESTROY key groupname],销毁指定 key 中的一个分组
- [CREATECONSUMER key groupname consumername],在指定的 key 和指定的分组中创立一个消费者。当某个命令提及了新的消费者名称时,也会主动创立新的消费者。
- [DELCONSUMER key groupname consumername],在指定的 key 和指定的分组中销毁一个消费者。
xgroup 是一个命令组,能够通过不同的关键字执行不同的命令。
127.0.0.1:6379> xgroup create message read_group $
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
(nil)
127.0.0.1:6379> xadd message * readkey readvalue
"1604494179721-0"
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
2) 1) 1) "1604494179721-0"
2) 1) "readkey"
2) "readvalue"
应用 creat 命令创立一个 read_group 分组,指定 ID 的终点为最初一个 ID,间接读取的话是空值(xreadgroup 命令上面说)。
应用 xadd 命令增加一条新的音讯,再次读取发现能够失常读取到新退出的音讯。
127.0.0.1:6379> xgroup setid message read_group 0
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
2) 1) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
2) 1) "1604478059261-0"
2) 1) "keyblock"
2) "value"
3) 1) "1604478070071-0"
2) 1) "keyblock2"
2) "value"
4) 1) "1604494179721-0"
2) 1) "readkey"
2) "readvalue"
应用 setid 命令从新设置读取 ID 的终点,能够读取到所有的历史音讯。
127.0.0.1:6379> xinfo groups message
1) 1) "name"
2) "read_group"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 4
7) "last-delivered-id"
8) "1604494179721-0"
应用 xinfo 命令查问新创建的分组信息,能够看到分组名字,消费者数量,最初退出的音讯的 ID 值(xinfo 命令下边说)。
127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
2) "read"
3) "pending"
4) (integer) 4
5) "idle"
6) (integer) 476449
应用 xinfo 命令查看新退出的消费者的信息,能够看到消费者名字,处于 pending(待处理)状态的音讯数量(pending 状态下边说)。
127.0.0.1:6379> xgroup delconsumer message read_group read
(integer) 4
127.0.0.1:6379> xinfo consumers message read_group
(empty array)
应用 delconsumer 命令删除分组中的消费者,应用 xinfo 命令查看分组中的消费者,返回一个空数组,阐明删除胜利。delconsumer 命令的返回值为以后消费者所领有的 pending(待处理)状态的音讯数量。
127.0.0.1:6379> xgroup destroy message read_group
(integer) 1
127.0.0.1:6379> xinfo groups message
(empty array)
应用 destroy 命令删除 message 中的分组,应用 xinfo 命令查看,返回一个空数组,阐明删除胜利。destroy 命令的返回值为删除胜利的分组数量。 留神:即便由沉闷的消费者和 pending(待处理)状态的音讯,分组依然会被删除,须要确保在须要时才执行此命令。
xinfo 命令
语法阐明:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
- [CONSUMERS key groupname],查问指定 key 和指定分组中的消费者信息。
- [GROUPS key],查问指定 key 中的分组信息。
- [STREAM key],查问指定 key 中所有的信息。
127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
2) "read"
3) "pending"
4) (integer) 4
5) "idle"
6) (integer) 206796
读取 message 的 read_group 分组中所有的消费者信息。
127.0.0.1:6379> xinfo groups message
1) 1) "name"
2) "read_group"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 4
7) "last-delivered-id"
8) "1604494179721-0"
读取 message 中所有的分组信息。
127.0.0.1:6379> xinfo stream message
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1604494179721-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1604476672762-0"
2) 1) "key3"
2) "value3"
13) "last-entry"
14) 1) "1604494179721-0"
2) 1) "readkey"
2) "readvalue"
读取 message 所有的信息。
xpending 命令
语法阐明:
XPENDING key group [start end count] [consumer]
- key,指定的 key
- group,指定的分组
- [start end count],起始 ID 和完结 ID 还有数量
- consumer,消费者名字
127.0.0.1:6379> xpending message readgroup
1) (integer) 2
2) "1604496633846-0"
3) "1604496640734-0"
4) 1) 1) "read"
2) "2"
xpending 命令能够查看对应分组中未确认的音讯的数量和其所对应的消费者的名字还有起始和终止 ID。
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
2) "read"
3) (integer) 513557
4) (integer) 4
2) 1) "1604496640734-0"
2) "read"
3) (integer) 482927
4) (integer) 1
应用 xpending 命令能够查看处于未确认状态的音讯的具体信息。
xreadgroup 命令
语法阐明:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
- GROUP,固定
- group,分组名
- consumer,消费者名
- [COUNT count],每次获取音讯的数量
- [BLOCK milliseconds],阻塞模式和超时工夫
- [NOACK],不须要确认音讯,实用于不怎么重要的能够失落的音讯
- STREAMS,固定
- key [key …],指定的 key
- ID [ID …],指定的音讯 ID,> 指定读取所有未生产的音讯,其余值指定被挂起的音讯
xreadgroup 命令通过与消费者组和消费者的联合能够做到音讯的读取与确认,在 xread 的根底上细化了读取音讯操作。
从语法上来看,xreadgroup 和 xread 命令简直雷同,xreadgroup 命令多了一个强制性的参数:GROUP groupname consumername
。
当多个消费者同时生产同一个音讯队列时,会反复生产雷同的音讯,每条音讯都会被每个消费者生产一遍。然而如果想要多个消费者合作生产同一个音讯队列时,就须要用到消费者组。
例如推送零碎,必定是不能够反复推送的,也就是说每条音讯只能够被生产一遍,这时就能够应用多个消费者来生产同一个推送队列,升高每个消费者零碎的压力。
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> xadd message * key1 value1
"1604496633846-0"
127.0.0.1:6379> xadd message * key2 value2
"1604496640734-0"
127.0.0.1:6379> xgroup create message readgroup 0
OK
127.0.0.1:6379> xadd message * key3 value3
"1604496696501-0"
127.0.0.1:6379> xadd message * key4 value4
"1604496704823-0"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
2) 1) 1) "1604496633846-0"
2) 1) "key1"
2) "value1"
127.0.0.1:6379> xreadgroup group readgroup read count 1 streams message >
1) 1) "message"
2) 1) 1) "1604496640734-0"
2) 1) "key2"
2) "value2"
从头开始,清空数据库,从新给 message 增加音讯,创立分组,应用 readgroup 命令读取最新的音讯。
127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
2) "read"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 53146
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
2) "read"
3) (integer) 513557
4) (integer) 4
2) 1) "1604496640734-0"
2) "read"
3) (integer) 482927
4) (integer) 1
读取了两条音讯,应用 xinfo 命令查看,能够看到有两条音讯处于 pending(待处理)状态,应用 xpending 命令能够查看处于未确认状态的音讯的具体信息。
xack 命令
语法阐明:
XACK key group ID [ID …]
- key,指定的 key
- group,指定的 group
- D [ID …],须要确认的音讯的 ID
xack 命令从 pending 队列中删除挂起的音讯,也就是确认之前未确认的音讯。当应用 xreadgroup 命令读取音讯时,音讯同时被存储到 PEL 中,期待被确认,调用 xack 命令能够从 PEL 中删除挂起的音讯并且开释内存,确保不失落音讯。
127.0.0.1:6379> xack message readgroup 1604496633846-0
(integer) 1
127.0.0.1:6379> xpending message readgroup
1) (integer) 1
2) "1604496640734-0"
3) "1604496640734-0"
4) 1) 1) "read"
2) "1"
127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
2) "read"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 418489
应用 xack 命令确认一条音讯,再次应用 xpending 命令查看未确认音讯的数量,只剩一条未确认音讯,应用 xinfo 命令查看处于 pending 状态的音讯数量也为 1,确认音讯胜利。
xclaim 命令
语法阐明:
XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
- key,指定的 key
- group,指定的分组
- consumer,指定的消费者
- min-idle-time,指定音讯最小闲暇数,指定闲暇了多久的音讯会被选中
- ID [ID …],音讯的 ID
- [IDLE ms],设置音讯的闲暇工夫,如果不提供,默认为 0
- [TIME ms-unix-time],和 IDLE 雷同,unix 工夫戳
- RETRYCOUNT,设置重试次数,通常 xclaim 不会扭转这个值,它通常用于 xpending 命令,用来发现一些长时间未被解决的音讯。
- FORCE,在 PEL 中创立待处理音讯,即便指定的 ID 尚未调配给客户端的 PEL。
- JUSTID,只返回认领的音讯 ID 数组,不返回理论音讯。
xclaim 命令用于更改未确认音讯的所有权,如果有消费者在读取了音讯之后未解决实现就挂掉了,那么音讯会始终在 pending 队列中,占用内存,这时须要应用 xclaim 命令更改此条音讯的所属者,让其余的消费者去生产这条音讯。
127.0.0.1:6379> xreadgroup group readgroup read2 count 1 streams message >
1) 1) "message"
2) 1) 1) "1604496696501-0"
2) 1) "key3"
2) "value3"
127.0.0.1:6379> xack message readgroup 1604496696501-0
(integer) 1
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496640734-0"
2) "read"
3) (integer) 1258517
4) (integer) 2
127.0.0.1:6379> xpending message readgroup - + 10 read2
(empty array)
新创建一个消费者,读取一条音讯,而后将音讯确认掉,查看两个消费者中未确认音讯的数量,read 有一条,read2 没有未确认的音讯。
127.0.0.1:6379> xclaim message readgroup read2 0 1604496640734-0
1) 1) "1604496640734-0"
2) 1) "key2"
2) "value2"
127.0.0.1:6379> xpending message readgroup - + 10 read2
1) 1) "1604496640734-0"
2) "read2"
3) (integer) 3724
4) (integer) 4
127.0.0.1:6379> xpending message readgroup - + 10 read
(empty array)
应用 xclaim 命令将音讯所有权转移给 read2 这个消费者,能够看到,消费者 read2 的 pending 队列中有一条未确认音讯,消费者 read 的 pending 队列中曾经没有音讯了。
总结
第一次应用 redis 做音讯队列,体验上来说还是很不错的,第一次做这种命令总结,以上内容如有脱漏和谬误,请大家斧正。
参考链接
https://redis.io/commands#stream