一、背景

最近在看redis这方面的常识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些相似,能够当作一个简略的音讯队列来应用。

二、redis中Stream类型的特点

  1. 是可长久化的,能够保证数据不失落。
  2. 反对音讯的多播、分组生产。
  3. 反对音讯的有序性。

三、Stream的构造

解释:

  1. 消费者组: Consumer Group,即应用 XGROUP CREATE 命令创立的,一个消费者组中能够存在多个消费者,这些消费者之间是竞争关系。

    1. 同一条音讯,只能被这个消费者组中的某个消费者获取。
    2. 多个消费者之间是互相独立的,互不烦扰。
  2. 消费者: Consumer 生产音讯。
  3. last_delivered_id: 这个id保障了在同一个消费者组中,一个音讯只能被一个消费者获取。每当消费者组的某个消费者读取到了这个音讯后,这个last_delivered_id的值会往后挪动一位,保障消费者不会读取到反复的音讯。
  4. pending_ids:记录了消费者读取到的音讯id列表,然而这些音讯可能还没有解决,如果认为某个音讯解决,须要调用ack命令。这样就确保了某个音讯肯定会被执行一次。
  5. 音讯内容:是一个键值对的格局。
  6. Stream 中 音讯的 ID: 默认状况下,ID应用 * ,redis能够主动生成一个,格局为 工夫戳-序列号,也能够本人指定,个别应用默认生成的即可,且后生成的id号要比之前生成的大。

四、Stream的命令

1、XADD 往Stream开端增加音讯

1、命令格局

xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

2、举例

xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条音讯)

1、向流中减少一条数据,

127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中减少一个 username 是zhangsan的数据 *示意主动生成id"1635999858912-0" # 返回的是ID127.0.0.1:6379> keys *1) "stream-key" # 能够看到stream主动创立了127.0.0.1:6379>

2、向流中减少数据,不主动创立流

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以退出失败(nil)127.0.0.1:6379> keys *(empty array)127.0.0.1:6379>

3、手动指定ID的值

127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是本人传递的1-1,而不是应用*主动生成"1-1" # 返回的是id的值127.0.0.1:6379>

4、设置一个固定大小的Stream

1、准确指定Stream的大小

指定指定Stream的大小比含糊指定Stream的大小会略微多少耗费一些性能。

2、含糊指定Stream的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first"1636001034141-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second"1636001044506-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third"1636001057846-0"127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636001057846-0" 9) "groups"10) (integer) 011) "first-entry"12) 1) "1636001034141-0"    2) 1) "first"       2) "first"13) "last-entry"14) 1) "1636001057846-0"    2) 1) "third"       2) "third"127.0.0.1:6379>

~ 含糊指定流的大小,能够看到指定的是1,实际上曾经到了3.

2、XRANGE查看Stream中的音讯

1、命令格局

xrange key start end [COUNT count]

2、筹备数据

127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username zhangsanQUEUED127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> exec1) "1636003481706-0"2) "1636003481706-1"127.0.0.1:6379> xadd stream-key * username wangwu"1636003499055-0"127.0.0.1:6379>

应用redis的事务操作,获取到同一毫秒产生的多条数据,工夫戳一样,序列号不一样

3、举例

1、获取所有的数据(-+的应用)

127.0.0.1:6379> xrange stream-key - +1) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"3) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

-: 示意最小id的值

+:示意最大id的值

2、获取指定id范畴内的数据,闭区间

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-01) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"2) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

3、获取指定id范畴内的数据,开区间

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-01) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"127.0.0.1:6379>

(:示意开区间

4、获取某个毫秒后所有的数据

127.0.0.1:6379> xrange stream-key 1636003481706 +1) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"3) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

间接写毫秒不写前面的序列号即可。

5、获取单条数据

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-01) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

startend的值写的一样即可获取单挑数据。

6、获取固定条数的数据

127.0.0.1:6379> xrange stream-key - + count 11) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"127.0.0.1:6379>

应用 count进行限度

3、XREVRANGE反向查看Stream中的音讯

XREVRANGE key end start [COUNT count]

应用形式和XRANGE相似,略。

4、XDEL删除音讯

1、命令格局

xdel key ID [ID ...]

2、筹备数据

127.0.0.1:6379> xadd stream-key * username zhangsan"1636004176924-0"127.0.0.1:6379> xadd stream-key * username lisi"1636004183638-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636004189211-0"127.0.0.1:6379>

3、举例

需要:往Stream中退出3条音讯,而后删除第2条音讯

127.0.0.1:6379> xdel stream-key 1636004183638-0(integer) 1 # 返回的是删除记录的数量127.0.0.1:6379> xrang stream -key - +127.0.0.1:6379> xrange stream-key - +1) 1) "1636004176924-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636004189211-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

留神:

须要留神的是,咱们从Stream中删除一个音讯,这个音讯并不是被真正的删除了,而是被标记为删除,这个时候这个音讯还是占据着内容空间的。如果所有Stream中所有的音讯都被标记删除,这个时候才会回收内存空间。然而这个Stream并不会被删除。

6、XLEN查看Stream中元素的长度

1、命令格局

xlen key

2、举例

查看Stream中元素的长度

127.0.0.1:6379> xadd stream-key * username zhangsan"1636004690578-0"127.0.0.1:6379> xlen stream-key(integer) 1127.0.0.1:6379> xlen not-exists-stream-key(integer) 0127.0.0.1:6379>

留神:

如果xlen前方的key不存在则返回0,否则返回元素的个数。

7、XTRIM对Stream中的元素进行修剪

1、命令格局

xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]

2、筹备数据

127.0.0.1:6379>  xadd stream-key * username zhangsan"1636009745401-0"127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> xadd stream-key * username wangwuQUEUED127.0.0.1:6379(TX)> exec1) "1636009763955-0"2) "1636009763955-1"127.0.0.1:6379> xadd stream-key * username zhaoliu"1636009769625-0"127.0.0.1:6379>

3、举例

1、maxlen准确限度

127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最初的2个音讯(integer) 2127.0.0.1:6379> xrange stream-key - + # 能够看到之前退出的2个音讯被删除了1) 1) "1636009763955-1"   2) 1) "username"      2) "wangwu"2) 1) "1636009769625-0"   2) 1) "username"      2) "zhaoliu"127.0.0.1:6379>

上方的意思是,保留stream-key这个Stream中最初的2个音讯。

2、minid含糊限度

minid 是删除比这个id小的数据,本地测试的时候没有测试进去,略。

8、XREAD独立生产音讯

XREAD只是读取音讯,读取完之后并不会删除音讯。 应用XREAD读取音讯,是齐全独立与消费者组的,多个客户端能够同时读取音讯。

1、命令格局

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

2、筹备数据

127.0.0.1:6379> xadd stream-key * username zhangsan"1636011801365-0"127.0.0.1:6379> xadd stream-key * username lisi"1636011806261-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636011810905-0"127.0.0.1:6379>

3、举例

1、获取用户名是wangwu的数据

127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据须要是 > 1636011806261-01) 1) "stream-key"   2) 1) 1) "1636011810905-0"         2) 1) "username"            2) "wangwu"

2、获取2条数据

127.0.0.1:6379> xread count 2 streams stream-key 0-01) 1) "stream-key"   2) 1) 1) "1636011801365-0"         2) 1) "username"            2) "zhangsan"      2) 1) "1636011806261-0"         2) 1) "username"            2) "lisi"127.0.0.1:6379>

count限度单次读取最初的音讯,因为以后读取可能没有这么多。

3、非阻塞读取Stream对尾的数据

即读取队列尾的下一个音讯,在非阻塞模式下始终是nil

127.0.0.1:6379> xread streams stream-key $(nil)

4、阻塞读取Stream对尾的数据

留神:

  1. $示意读取队列最新进来的一个音讯,不是Stream的最初一个音讯。是xread block执行后,再次应用xadd增加音讯后,xread block才会返回。
  2. block 0示意永恒阻塞,当音讯到来时,才接触阻塞。block 1000示意阻塞1000ms,如果1000ms还没有音讯到来,则返回nil
  3. xread进行程序生产 当应用xread进行程序音讯时,须要记住返回的音讯id,同时下次调用xread时,须要将上次返回的音讯id传递进去。
  4. xread读取音讯,齐全忽视生产组,此时Stream就能够了解为一个一般的list。

9、消费者组相干操作

1、消费者组命令

2、筹备数据

1、创立Stream的名称是 stream-key

2、创立2个音讯,aa和bb

127.0.0.1:6379> xadd stream-key * aa aa"1636362619125-0"127.0.0.1:6379> xadd stream-key * bb bb"1636362623191-0"

3、创立消费者组

1、创立一个从头开始生产的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(示意从头开始生产)
2、创立一个从Stream最新的一个音讯生产的消费者组
xgroup create stream-key g2 $

$示意从最初一个元素生产,不包含Stream中的最初一个元素,即生产最新的音讯。

4、创立一个从某个音讯之后生产的消费者组
xgroup create stream-key g3 1636362619125-0  #1636362619125-0 这个是上方aa音讯的id的值

1636362619125-0某个音讯的具体的ID,这个g3消费者组中的音讯都是大于>这个id的音讯。

3、从消费者中读取音讯

127.0.0.1:6379> xreadgroup group g1(生产组名) c1(消费者名,主动创立) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未调配给另外的消费者的音讯开始读取)1) 1) "stream-key"   2) 1) 1) "1636362619125-0"         2) 1) "aa"            2) "aa"      2) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >(nil) # 返回 nil 是因为 g2生产组是从最新的一条信息开始读取(创立消费者组时应用了$),须要在另外的窗口执行`xadd`命令,才能够再次读取到音讯127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条音讯是因为,在创立消费者组时,指定了aa音讯的id,bb音讯的id大于aa,所以读取进去了。1) 1) "stream-key"   2) 1) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379>

4、读取消费者的pending音讯

127.0.0.1:6379> xgroup create stream-key g4 0-0OK127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name"   2) "c1"   3) "pending"   4) (integer) 2   5) "idle"   6) (integer) 88792127.0.0.1:6379> xinfo consumers stream-key g4(empty array)127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-01) 1) "stream-key"   2) 1) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-01) 1) "stream-key"   2) (empty array)127.0.0.1:6379>

5、转移消费者的音讯

127.0.0.1:6379> xpending stream-key g1 - + 10 c11) 1) "1636362619125-0"   2) "c1"   3) (integer) 2686183   4) (integer) 12) 1) "1636362623191-0"   2) "c1"   3) (integer) 102274   4) (integer) 7127.0.0.1:6379> xpending stream-key g1 - + 10 c2(empty array)127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-01) 1) "1636362623191-0"   2) 1) "bb"      2) "bb"127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0"   2) "c2"   3) (integer) 17616   4) (integer) 8127.0.0.1:6379>

也能够通过xautoclaim来实现。

6、一些监控命令

1、查看生产组中消费者的pending音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0"   2) "c2"   3) (integer) 1247680   4) (integer) 8127.0.0.1:6379>
2、查看生产组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name"   2) "c1"   3) "pending"   4) (integer) 1   5) "idle"   6) (integer) 14748642) 1) "name"   2) "c2"   3) "pending"   4) (integer) 1   5) "idle"   6) (integer) 1290069127.0.0.1:6379>
3、查看生产组信息
127.0.0.1:6379> xinfo groups stream-key1) 1) "name"   2) "g1"   3) "consumers"   4) (integer) 2   5) "pending"   6) (integer) 2   7) "last-delivered-id"   8) "1636362623191-0"2) 1) "name"   2) "g2"   3) "consumers"......
4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 2 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636362623191-0" 9) "groups"10) (integer) 411) "first-entry"12) 1) "1636362619125-0"    2) 1) "aa"       2) "aa"13) "last-entry"14) 1) "1636362623191-0"    2) 1) "bb"       2) "bb"127.0.0.1:6379>

五、参考文档

1、https://redis.io/topics/streams-intro

2、https://www.runoob.com/redis/redis-stream.html