一、背景
最近在看redis
这方面的常识,发现在redis5
中产生了一种新的数据类型Stream
,它和kafka
的设计有些相似,能够当作一个简略的音讯队列来应用。
二、redis中Stream类型的特点
- 是可长久化的,能够保证数据不失落。
- 反对音讯的多播、分组生产。
- 反对音讯的有序性。
三、Stream的构造
解释:
消费者组:
Consumer Group,即应用XGROUP CREATE
命令创立的,一个消费者组中能够存在多个消费者,这些消费者之间是竞争
关系。- 同一条音讯,只能被这个消费者组中的某个消费者获取。
- 多个消费者之间是互相独立的,互不烦扰。
消费者:
Consumer 生产音讯。last_delivered_id:
这个id保障了在同一个消费者组中,一个音讯只能被一个消费者获取。每当消费者组的某个消费者读取到了这个音讯后,这个last_delivered_id的值会往后挪动一位,保障消费者不会读取到反复的音讯。pending_ids
:记录了消费者读取到的音讯id列表,然而这些音讯可能还没有解决,如果认为某个音讯解决,须要调用ack
命令。这样就确保了某个音讯肯定会被执行一次。音讯内容:
是一个键值对
的格局。Stream 中 音讯的 ID:
默认状况下,ID应用*
,redis能够主动生成一个,格局为工夫戳-序列号
,也能够本人指定,个别应用默认生成的即可,且后生成的id号要比之前生成的大。
四、Stream的命令
1、XADD 往Stream开端增加音讯
1、命令格局
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
2、举例
xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条音讯)
1、向流中减少一条数据,
127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中减少一个 username 是zhangsan的数据 *示意主动生成id"1635999858912-0" # 返回的是ID127.0.0.1:6379> keys *1) "stream-key" # 能够看到stream主动创立了127.0.0.1:6379>
2、向流中减少数据,不主动创立流
127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以退出失败(nil)127.0.0.1:6379> keys *(empty array)127.0.0.1:6379>
3、手动指定ID的值
127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是本人传递的1-1,而不是应用*主动生成"1-1" # 返回的是id的值127.0.0.1:6379>
4、设置一个固定大小的Stream
1、准确指定Stream的大小
指定指定Stream的大小比含糊指定Stream的大小会略微多少耗费一些性能。
2、含糊指定Stream的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first"1636001034141-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second"1636001044506-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third"1636001057846-0"127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636001057846-0" 9) "groups"10) (integer) 011) "first-entry"12) 1) "1636001034141-0" 2) 1) "first" 2) "first"13) "last-entry"14) 1) "1636001057846-0" 2) 1) "third" 2) "third"127.0.0.1:6379>
~
含糊指定流的大小,能够看到指定的是1,实际上曾经到了3.
2、XRANGE查看Stream中的音讯
1、命令格局
xrange key start end [COUNT count]
2、筹备数据
127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username zhangsanQUEUED127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> exec1) "1636003481706-0"2) "1636003481706-1"127.0.0.1:6379> xadd stream-key * username wangwu"1636003499055-0"127.0.0.1:6379>
应用redis的事务操作,获取到同一毫秒产生的多条数据,工夫戳一样,序列号不一样
3、举例
1、获取所有的数据(-
和+
的应用)
127.0.0.1:6379> xrange stream-key - +1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan"2) 1) "1636003481706-1" 2) 1) "username" 2) "lisi"3) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu"127.0.0.1:6379>
-:
示意最小id的值
+:
示意最大id的值
2、获取指定id范畴内的数据,闭区间
127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-01) 1) "1636003481706-1" 2) 1) "username" 2) "lisi"2) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu"127.0.0.1:6379>
3、获取指定id范畴内的数据,开区间
127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-01) 1) "1636003481706-1" 2) 1) "username" 2) "lisi"127.0.0.1:6379>
(:
示意开区间
4、获取某个毫秒后所有的数据
127.0.0.1:6379> xrange stream-key 1636003481706 +1) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan"2) 1) "1636003481706-1" 2) 1) "username" 2) "lisi"3) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu"127.0.0.1:6379>
间接写毫秒
不写前面的序列号即可。
5、获取单条数据
127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-01) 1) "1636003499055-0" 2) 1) "username" 2) "wangwu"127.0.0.1:6379>
start
和end
的值写的一样即可获取单挑数据。
6、获取固定条数的数据
127.0.0.1:6379> xrange stream-key - + count 11) 1) "1636003481706-0" 2) 1) "username" 2) "zhangsan"127.0.0.1:6379>
应用 count
进行限度
3、XREVRANGE反向查看Stream中的音讯
XREVRANGE key end start [COUNT count]
应用形式和XRANGE
相似,略。
4、XDEL删除音讯
1、命令格局
xdel key ID [ID ...]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan"1636004176924-0"127.0.0.1:6379> xadd stream-key * username lisi"1636004183638-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636004189211-0"127.0.0.1:6379>
3、举例
需要:往Stream中退出3条音讯,而后删除第2条音讯
127.0.0.1:6379> xdel stream-key 1636004183638-0(integer) 1 # 返回的是删除记录的数量127.0.0.1:6379> xrang stream -key - +127.0.0.1:6379> xrange stream-key - +1) 1) "1636004176924-0" 2) 1) "username" 2) "zhangsan"2) 1) "1636004189211-0" 2) 1) "username" 2) "wangwu"127.0.0.1:6379>
留神:
须要留神的是,咱们从Stream中删除一个音讯,这个音讯并不是被真正的删除了,而是被标记为删除,这个时候这个音讯还是占据着内容空间的。如果所有Stream中所有的音讯都被标记删除,这个时候才会回收内存空间。然而这个Stream并不会被删除。
6、XLEN查看Stream中元素的长度
1、命令格局
xlen key
2、举例
查看Stream中元素的长度
127.0.0.1:6379> xadd stream-key * username zhangsan"1636004690578-0"127.0.0.1:6379> xlen stream-key(integer) 1127.0.0.1:6379> xlen not-exists-stream-key(integer) 0127.0.0.1:6379>
留神:
如果xlen
前方的key
不存在则返回0,否则返回元素的个数。
7、XTRIM对Stream中的元素进行修剪
1、命令格局
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan"1636009745401-0"127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> xadd stream-key * username wangwuQUEUED127.0.0.1:6379(TX)> exec1) "1636009763955-0"2) "1636009763955-1"127.0.0.1:6379> xadd stream-key * username zhaoliu"1636009769625-0"127.0.0.1:6379>
3、举例
1、maxlen准确限度
127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最初的2个音讯(integer) 2127.0.0.1:6379> xrange stream-key - + # 能够看到之前退出的2个音讯被删除了1) 1) "1636009763955-1" 2) 1) "username" 2) "wangwu"2) 1) "1636009769625-0" 2) 1) "username" 2) "zhaoliu"127.0.0.1:6379>
上方的意思是,保留stream-key
这个Stream中最初的2个音讯。
2、minid含糊限度
minid 是删除比这个id小的数据,本地测试的时候没有测试进去
,略。
8、XREAD独立生产音讯
XREAD
只是读取音讯,读取完之后并不会删除音讯。 应用XREAD
读取音讯,是齐全独立与消费者组的,多个客户端能够同时读取音讯。
1、命令格局
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
2、筹备数据
127.0.0.1:6379> xadd stream-key * username zhangsan"1636011801365-0"127.0.0.1:6379> xadd stream-key * username lisi"1636011806261-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636011810905-0"127.0.0.1:6379>
3、举例
1、获取用户名是wangwu的数据
127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据须要是 > 1636011806261-01) 1) "stream-key" 2) 1) 1) "1636011810905-0" 2) 1) "username" 2) "wangwu"
2、获取2条数据
127.0.0.1:6379> xread count 2 streams stream-key 0-01) 1) "stream-key" 2) 1) 1) "1636011801365-0" 2) 1) "username" 2) "zhangsan" 2) 1) "1636011806261-0" 2) 1) "username" 2) "lisi"127.0.0.1:6379>
count
限度单次读取最初的音讯,因为以后读取可能没有这么多。
3、非阻塞读取Stream对尾的数据
即读取队列尾的下一个音讯,在非阻塞模式下始终是nil
127.0.0.1:6379> xread streams stream-key $(nil)
4、阻塞读取Stream对尾的数据
留神:
$
示意读取队列最新进来的一个音讯,不是Stream的最初一个音讯。是xread block
执行后,再次应用xadd
增加音讯后,xread block
才会返回。block 0
示意永恒阻塞,当音讯到来时,才接触阻塞。block 1000
示意阻塞1000ms,如果1000ms还没有音讯到来,则返回nil
xread进行程序生产
当应用xread进行程序音讯时,须要记住返回的音讯id,同时下次调用xread时,须要将上次返回的音讯id传递进去。xread
读取音讯,齐全忽视生产组,此时Stream
就能够了解为一个一般的list。
9、消费者组相干操作
1、消费者组命令
2、筹备数据
1、创立Stream的名称是 stream-key
2、创立2个音讯,aa和bb
127.0.0.1:6379> xadd stream-key * aa aa"1636362619125-0"127.0.0.1:6379> xadd stream-key * bb bb"1636362623191-0"
3、创立消费者组
1、创立一个从头开始生产的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(示意从头开始生产)
2、创立一个从Stream最新的一个音讯生产的消费者组
xgroup create stream-key g2 $
$
示意从最初一个元素生产,不包含Stream中的最初一个元素,即生产最新的音讯。
4、创立一个从某个音讯之后生产的消费者组
xgroup create stream-key g3 1636362619125-0 #1636362619125-0 这个是上方aa音讯的id的值
1636362619125-0
某个音讯的具体的ID,这个g3
消费者组中的音讯都是大于>
这个id的音讯。
3、从消费者中读取音讯
127.0.0.1:6379> xreadgroup group g1(生产组名) c1(消费者名,主动创立) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未调配给另外的消费者的音讯开始读取)1) 1) "stream-key" 2) 1) 1) "1636362619125-0" 2) 1) "aa" 2) "aa" 2) 1) "1636362623191-0" 2) 1) "bb" 2) "bb"127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >(nil) # 返回 nil 是因为 g2生产组是从最新的一条信息开始读取(创立消费者组时应用了$),须要在另外的窗口执行`xadd`命令,才能够再次读取到音讯127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只读取到一条音讯是因为,在创立消费者组时,指定了aa音讯的id,bb音讯的id大于aa,所以读取进去了。1) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb"127.0.0.1:6379>
4、读取消费者的pending音讯
127.0.0.1:6379> xgroup create stream-key g4 0-0OK127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name" 2) "c1" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 88792127.0.0.1:6379> xinfo consumers stream-key g4(empty array)127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-01) 1) "stream-key" 2) 1) 1) "1636362623191-0" 2) 1) "bb" 2) "bb"127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-01) 1) "stream-key" 2) (empty array)127.0.0.1:6379>
5、转移消费者的音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c11) 1) "1636362619125-0" 2) "c1" 3) (integer) 2686183 4) (integer) 12) 1) "1636362623191-0" 2) "c1" 3) (integer) 102274 4) (integer) 7127.0.0.1:6379> xpending stream-key g1 - + 10 c2(empty array)127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-01) 1) "1636362623191-0" 2) 1) "bb" 2) "bb"127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0" 2) "c2" 3) (integer) 17616 4) (integer) 8127.0.0.1:6379>
也能够通过xautoclaim
来实现。
6、一些监控命令
1、查看生产组中消费者的pending音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0" 2) "c2" 3) (integer) 1247680 4) (integer) 8127.0.0.1:6379>
2、查看生产组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name" 2) "c1" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 14748642) 1) "name" 2) "c2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 1290069127.0.0.1:6379>
3、查看生产组信息
127.0.0.1:6379> xinfo groups stream-key1) 1) "name" 2) "g1" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1636362623191-0"2) 1) "name" 2) "g2" 3) "consumers"......
4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 2 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636362623191-0" 9) "groups"10) (integer) 411) "first-entry"12) 1) "1636362619125-0" 2) 1) "aa" 2) "aa"13) "last-entry"14) 1) "1636362623191-0" 2) 1) "bb" 2) "bb"127.0.0.1:6379>
五、参考文档
1、https://redis.io/topics/streams-intro
2、https://www.runoob.com/redis/redis-stream.html