一、背景
最近在看 redis
这方面的常识,发现在 redis5
中产生了一种新的数据类型 Stream
,它和kafka
的设计有些相似,能够当作一个简略的音讯队列来应用。
二、redis 中 Stream 类型的特点
- 是可长久化的,能够保证数据不失落。
- 反对音讯的多播、分组生产。
- 反对音讯的有序性。
三、Stream 的构造
解释:
-
消费者组:
Consumer Group, 即应用XGROUP CREATE
命令创立的,一个消费者组中能够存在多个消费者,这些消费者之间是竞争
关系。- 同一条音讯,只能被这个消费者组中的某个消费者获取。
- 多个消费者之间是互相独立的,互不烦扰。
消费者:
Consumer 生产音讯。last_delivered_id:
这个 id 保障了在同一个消费者组中,一个音讯只能被一个消费者获取。每当消费者组的某个消费者读取到了这个音讯后,这个 last_delivered_id 的值会往后挪动一位,保障消费者不会读取到反复的音讯。pending_ids
:记录了消费者读取到的音讯 id 列表,然而这些音讯可能还没有解决,如果认为某个音讯解决,须要调用ack
命令。这样就确保了某个音讯肯定会被执行一次。音讯内容:
是一个键值对
的格局。Stream 中 音讯的 ID:
默认状况下,ID 应用*
,redis 能够主动生成一个,格局为工夫戳 - 序列号
,也能够本人指定,个别应用默认生成的即可,且后生成的 id 号要比之前生成的大。
四、Stream 的命令
1、XADD 往 Stream 开端增加音讯
1、命令格局
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
2、举例
xadd 命令 返回的是数据的 id, xx-yy (xx 指的是毫秒数,yy 指的是在这个毫秒内的第几条音讯)
1、向流中减少一条数据,
127.0.0.1:6379> xadd stream-key * username zhangsan # 向 stream-key 这个流中减少一个 username 是 zhangsan 的数据 * 示意主动生成 id
"1635999858912-0" # 返回的是 ID
127.0.0.1:6379> keys *
1) "stream-key" # 能够看到 stream 主动创立了
127.0.0.1:6379>
2、向流中减少数据,不主动创立流
127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了 nomkstream 参数,而 not-exists-stream 之前不存在,所以退出失败
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>
3、手动指定 ID 的值
127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处 id 的值是本人传递的 1 -1, 而不是应用 * 主动生成
"1-1" # 返回的是 id 的值
127.0.0.1:6379>
4、设置一个固定大小的 Stream
1、准确指定 Stream 的大小
指定指定 Stream 的大小比含糊指定 Stream 的大小会略微多少耗费一些性能。
2、含糊指定 Stream 的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
1) "length"
2) (integer) 3
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1636001057846-0"
9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1636001034141-0"
2) 1) "first"
2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
2) 1) "third"
2) "third"
127.0.0.1:6379>
~
含糊指定流的大小,能够看到指定的是 1,实际上曾经到了 3.
2、XRANGE 查看 Stream 中的音讯
1、命令格局
xrange key start end [COUNT count]
2、筹备数据
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>
应用 redis 的事务操作,获取到同一毫秒产生的多条数据,工夫戳一样,序列号不一样
3、举例
1、获取所有的数据 (-
和+
的应用)
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
-:
示意最小 id 的值
+:
示意最大 id 的值
2、获取指定 id 范畴内的数据,闭区间
127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
2) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
3、获取指定 id 范畴内的数据,开区间
127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
127.0.0.1:6379>
(:
示意开区间
4、获取某个毫秒后所有的数据
127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
间接写 毫秒
不写前面的序列号即可。
5、获取单条数据
127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
start
和 end
的值写的一样即可获取单挑数据。
6、获取固定条数的数据
127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
127.0.0.1:6379>
应用 count
进行限度
3、XREVRANGE 反向查看 Stream 中的音讯
XREVRANGE key end start [COUNT count]
应用形式和 XRANGE
相似,略。
4、XDEL 删除音讯
1、命令格局
xdel key ID [ID ...]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>
3、举例
需要:往 Stream 中退出 3 条音讯,而后删除第 2 条音讯
127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是删除记录的数量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636004189211-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
留神:
须要留神的是,咱们从 Stream 中删除一个音讯,这个音讯并不是被真正的删除了,而是被 标记为删除,这个时候这个音讯还是占据着内容空间的。如果所有 Stream 中所有的音讯都被标记删除,这个时候才会回收内存空间。然而这个 Stream 并不会被删除。
6、XLEN 查看 Stream 中元素的长度
1、命令格局
xlen key
2、举例
查看 Stream 中元素的长度
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>
留神:
如果 xlen
前方的 key
不存在则返回 0,否则返回元素的个数。
7、XTRIM 对 Stream 中的元素进行修剪
1、命令格局
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * username zhaoliu
"1636009769625-0"
127.0.0.1:6379>
3、举例
1、maxlen 准确限度
127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最初的 2 个音讯
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 能够看到之前退出的 2 个音讯被删除了
1) 1) "1636009763955-1"
2) 1) "username"
2) "wangwu"
2) 1) "1636009769625-0"
2) 1) "username"
2) "zhaoliu"
127.0.0.1:6379>
上方的意思是,保留 stream-key
这个 Stream 中最初的 2 个音讯。
2、minid 含糊限度
minid 是删除比这个 id 小的数据,本地测试的时候 没有测试进去
,略。
8、XREAD 独立生产音讯
XREAD
只是读取音讯,读取完之后并不会删除音讯。应用 XREAD
读取音讯,是齐全独立与消费者组的,多个客户端能够同时读取音讯。
1、命令格局
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636011801365-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636011806261-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636011810905-0"
127.0.0.1:6379>
3、举例
1、获取用户名是 wangwu 的数据
127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是 lisi 的 id,即读取到的数据须要是 > 1636011806261-0
1) 1) "stream-key"
2) 1) 1) "1636011810905-0"
2) 1) "username"
2) "wangwu"
2、获取 2 条数据
127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
2) 1) 1) "1636011801365-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636011806261-0"
2) 1) "username"
2) "lisi"
127.0.0.1:6379>
count
限度单次读取最初的音讯,因为以后读取可能没有这么多。
3、非阻塞读取 Stream 对尾的数据
即读取队列尾的下一个音讯,在非阻塞模式下始终是nil
127.0.0.1:6379> xread streams stream-key $
(nil)
4、阻塞读取 Stream 对尾的数据
留神:
$
示意读取队列最新进来的一个音讯,不是 Stream 的最初一个音讯。是xread block
执行后,再次应用xadd
增加音讯后,xread block
才会返回。block 0
示意永恒阻塞,当音讯到来时,才接触阻塞。block 1000
示意阻塞 1000ms,如果 1000ms 还没有音讯到来,则返回nil
xread 进行程序生产
当应用 xread 进行程序音讯时,须要记住返回的音讯 id,同时下次调用 xread 时,须要将上次返回的音讯 id 传递进去。xread
读取音讯,齐全忽视生产组,此时Stream
就能够了解为一个一般的 list。
9、消费者组相干操作
1、消费者组命令
2、筹备数据
1、创立 Stream 的名称是 stream-key
2、创立 2 个音讯,aa 和 bb
127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"
3、创立消费者组
1、创立一个从头开始生产的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(示意从头开始生产)
2、创立一个从 Stream 最新的一个音讯生产的消费者组
xgroup create stream-key g2 $
$
示意从最初一个元素生产,不包含 Stream 中的最初一个元素,即生产最新的音讯。
4、创立一个从某个音讯之后生产的消费者组
xgroup create stream-key g3 1636362619125-0 #1636362619125-0 这个是上方 aa 音讯的 id 的值
1636362619125-0
某个音讯的具体的 ID, 这个 g3
消费者组中的音讯都是 大于 >
这个 id 的音讯。
3、从消费者中读取音讯
127.0.0.1:6379> xreadgroup group g1(生产组名) c1(消费者名,主动创立) count 3(读取 3 条) streams stream-key(Stream 名) >(从该消费者组中还未调配给另外的消费者的音讯开始读取)
1) 1) "stream-key"
2) 1) 1) "1636362619125-0"
2) 1) "aa"
2) "aa"
2) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因为 g2 生产组是从最新的一条信息开始读取(创立消费者组时应用了 $),须要在另外的窗口执行 `xadd` 命令,才能够再次读取到音讯
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只读取到一条音讯是因为,在创立消费者组时,指定了 aa 音讯的 id,bb 音讯的 id 大于 aa, 所以读取进去了。1) 1) "stream-key"
2) 1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379>
4、读取消费者的 pending 音讯
127.0.0.1:6379> xgroup create stream-key g4 0-0
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key"
2) 1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key"
2) (empty array)
127.0.0.1:6379>
5、转移消费者的音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
2) "c1"
3) (integer) 2686183
4) (integer) 1
2) 1) "1636362623191-0"
2) "c1"
3) (integer) 102274
4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
2) "c2"
3) (integer) 17616
4) (integer) 8
127.0.0.1:6379>
也能够通过 xautoclaim
来实现。
6、一些监控命令
1、查看生产组中消费者的 pending 音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
2) "c2"
3) (integer) 1247680
4) (integer) 8
127.0.0.1:6379>
2、查看生产组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1474864
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1290069
127.0.0.1:6379>
3、查看生产组信息
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1636362623191-0"
2) 1) "name"
2) "g2"
3) "consumers"
......
4、查看 Stream 信息
127.0.0.1:6379> xinfo stream stream-key
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1636362623191-0"
9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
2) 1) "aa"
2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379>
五、参考文档
1、https://redis.io/topics/streams-intro
2、https://www.runoob.com/redis/redis-stream.html