关于redis:Redis-Stream类型的使用

74次阅读

共计 8898 个字符,预计需要花费 23 分钟才能阅读完成。

一、背景

最近在看 redis 这方面的常识,发现在 redis5 中产生了一种新的数据类型 Stream,它和kafka 的设计有些相似,能够当作一个简略的音讯队列来应用。

二、redis 中 Stream 类型的特点

  1. 是可长久化的,能够保证数据不失落。
  2. 反对音讯的多播、分组生产。
  3. 反对音讯的有序性。

三、Stream 的构造

解释:

  1. 消费者组: Consumer Group, 即应用 XGROUP CREATE 命令创立的,一个消费者组中能够存在多个消费者,这些消费者之间是 竞争 关系。

    1. 同一条音讯,只能被这个消费者组中的某个消费者获取。
    2. 多个消费者之间是互相独立的,互不烦扰。
  2. 消费者: Consumer 生产音讯。
  3. last_delivered_id: 这个 id 保障了在同一个消费者组中,一个音讯只能被一个消费者获取。每当消费者组的某个消费者读取到了这个音讯后,这个 last_delivered_id 的值会往后挪动一位,保障消费者不会读取到反复的音讯。
  4. pending_ids:记录了消费者读取到的音讯 id 列表,然而这些音讯可能还没有解决,如果认为某个音讯解决,须要调用 ack 命令。这样就确保了某个音讯肯定会被执行一次。
  5. 音讯内容:是一个 键值对 的格局。
  6. Stream 中 音讯的 ID: 默认状况下,ID 应用 *,redis 能够主动生成一个,格局为 工夫戳 - 序列号,也能够本人指定,个别应用默认生成的即可,且后生成的 id 号要比之前生成的大。

四、Stream 的命令

1、XADD 往 Stream 开端增加音讯

1、命令格局

xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

2、举例

xadd 命令 返回的是数据的 id, xx-yy (xx 指的是毫秒数,yy 指的是在这个毫秒内的第几条音讯)

1、向流中减少一条数据,

127.0.0.1:6379> xadd stream-key * username zhangsan # 向 stream-key 这个流中减少一个 username 是 zhangsan 的数据 * 示意主动生成 id
"1635999858912-0" # 返回的是 ID
127.0.0.1:6379> keys *
1) "stream-key" # 能够看到 stream 主动创立了
127.0.0.1:6379>

2、向流中减少数据,不主动创立流

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了 nomkstream 参数,而 not-exists-stream 之前不存在,所以退出失败
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>

3、手动指定 ID 的值

127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处 id 的值是本人传递的 1 -1, 而不是应用 * 主动生成
"1-1" # 返回的是 id 的值
127.0.0.1:6379>

4、设置一个固定大小的 Stream

1、准确指定 Stream 的大小

指定指定 Stream 的大小比含糊指定 Stream 的大小会略微多少耗费一些性能。

2、含糊指定 Stream 的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 3
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636001057846-0"
 9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1636001034141-0"
    2) 1) "first"
       2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
    2) 1) "third"
       2) "third"
127.0.0.1:6379>

~ 含糊指定流的大小,能够看到指定的是 1,实际上曾经到了 3.

2、XRANGE 查看 Stream 中的音讯

1、命令格局

xrange key start end [COUNT count]

2、筹备数据

127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>

应用 redis 的事务操作,获取到同一毫秒产生的多条数据,工夫戳一样,序列号不一样

3、举例

1、获取所有的数据 (-+的应用)

127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

-: 示意最小 id 的值

+:示意最大 id 的值

2、获取指定 id 范畴内的数据,闭区间

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
2) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

3、获取指定 id 范畴内的数据,开区间

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
127.0.0.1:6379>

(:示意开区间

4、获取某个毫秒后所有的数据

127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

间接写 毫秒 不写前面的序列号即可。

5、获取单条数据

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

startend 的值写的一样即可获取单挑数据。

6、获取固定条数的数据

127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
127.0.0.1:6379>

应用 count进行限度

3、XREVRANGE 反向查看 Stream 中的音讯

XREVRANGE key end start [COUNT count]

应用形式和 XRANGE 相似,略。

4、XDEL 删除音讯

1、命令格局

xdel key ID [ID ...]

2、筹备数据

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>

3、举例

需要:往 Stream 中退出 3 条音讯,而后删除第 2 条音讯

127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是删除记录的数量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636004189211-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

留神:

须要留神的是,咱们从 Stream 中删除一个音讯,这个音讯并不是被真正的删除了,而是被 标记为删除,这个时候这个音讯还是占据着内容空间的。如果所有 Stream 中所有的音讯都被标记删除,这个时候才会回收内存空间。然而这个 Stream 并不会被删除。

6、XLEN 查看 Stream 中元素的长度

1、命令格局

xlen key

2、举例

查看 Stream 中元素的长度

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>

留神:

如果 xlen 前方的 key 不存在则返回 0,否则返回元素的个数。

7、XTRIM 对 Stream 中的元素进行修剪

1、命令格局

xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]

2、筹备数据

127.0.0.1:6379>  xadd stream-key * username zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * username zhaoliu
"1636009769625-0"
127.0.0.1:6379>

3、举例

1、maxlen 准确限度

127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最初的 2 个音讯
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 能够看到之前退出的 2 个音讯被删除了
1) 1) "1636009763955-1"
   2) 1) "username"
      2) "wangwu"
2) 1) "1636009769625-0"
   2) 1) "username"
      2) "zhaoliu"
127.0.0.1:6379>

上方的意思是,保留 stream-key 这个 Stream 中最初的 2 个音讯。

2、minid 含糊限度

minid 是删除比这个 id 小的数据,本地测试的时候 没有测试进去,略。

8、XREAD 独立生产音讯

XREAD只是读取音讯,读取完之后并不会删除音讯。应用 XREAD 读取音讯,是齐全独立与消费者组的,多个客户端能够同时读取音讯。

1、命令格局

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

2、筹备数据

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636011801365-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636011806261-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636011810905-0"
127.0.0.1:6379>

3、举例

1、获取用户名是 wangwu 的数据

127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是 lisi 的 id,即读取到的数据须要是 > 1636011806261-0
1) 1) "stream-key"
   2) 1) 1) "1636011810905-0"
         2) 1) "username"
            2) "wangwu"

2、获取 2 条数据

127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
   2) 1) 1) "1636011801365-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636011806261-0"
         2) 1) "username"
            2) "lisi"
127.0.0.1:6379>

count限度单次读取最初的音讯,因为以后读取可能没有这么多。

3、非阻塞读取 Stream 对尾的数据

即读取队列尾的下一个音讯,在非阻塞模式下始终是nil

127.0.0.1:6379> xread streams stream-key $
(nil)

4、阻塞读取 Stream 对尾的数据

留神:

  1. $示意读取队列最新进来的一个音讯,不是 Stream 的最初一个音讯。是 xread block 执行后,再次应用 xadd 增加音讯后,xread block才会返回。
  2. block 0示意永恒阻塞,当音讯到来时,才接触阻塞。block 1000示意阻塞 1000ms,如果 1000ms 还没有音讯到来,则返回nil
  3. xread 进行程序生产 当应用 xread 进行程序音讯时,须要记住返回的音讯 id,同时下次调用 xread 时,须要将上次返回的音讯 id 传递进去。
  4. xread读取音讯,齐全忽视生产组,此时 Stream 就能够了解为一个一般的 list。

9、消费者组相干操作

1、消费者组命令

2、筹备数据

1、创立 Stream 的名称是 stream-key

2、创立 2 个音讯,aa 和 bb

127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"

3、创立消费者组

1、创立一个从头开始生产的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(示意从头开始生产)
2、创立一个从 Stream 最新的一个音讯生产的消费者组
xgroup create stream-key g2 $

$示意从最初一个元素生产,不包含 Stream 中的最初一个元素,即生产最新的音讯。

4、创立一个从某个音讯之后生产的消费者组
xgroup create stream-key g3 1636362619125-0  #1636362619125-0 这个是上方 aa 音讯的 id 的值

1636362619125-0某个音讯的具体的 ID, 这个 g3 消费者组中的音讯都是 大于 >这个 id 的音讯。

3、从消费者中读取音讯

127.0.0.1:6379> xreadgroup group g1(生产组名) c1(消费者名,主动创立) count 3(读取 3 条) streams stream-key(Stream 名) >(从该消费者组中还未调配给另外的消费者的音讯开始读取)
1) 1) "stream-key"
   2) 1) 1) "1636362619125-0"
         2) 1) "aa"
            2) "aa"
      2) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因为 g2 生产组是从最新的一条信息开始读取(创立消费者组时应用了 $),须要在另外的窗口执行 `xadd` 命令,才能够再次读取到音讯
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条音讯是因为,在创立消费者组时,指定了 aa 音讯的 id,bb 音讯的 id 大于 aa, 所以读取进去了。1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379>

4、读取消费者的 pending 音讯

127.0.0.1:6379> xgroup create stream-key g4 0-0
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) (empty array)
127.0.0.1:6379>

5、转移消费者的音讯

127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
   2) "c1"
   3) (integer) 2686183
   4) (integer) 1
2) 1) "1636362623191-0"
   2) "c1"
   3) (integer) 102274
   4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
   2) 1) "bb"
      2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 17616
   4) (integer) 8
127.0.0.1:6379>

也能够通过 xautoclaim 来实现。

6、一些监控命令

1、查看生产组中消费者的 pending 音讯
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 1247680
   4) (integer) 8
127.0.0.1:6379>
2、查看生产组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1474864
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1290069
127.0.0.1:6379>
3、查看生产组信息
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1636362623191-0"
2) 1) "name"
   2) "g2"
   3) "consumers"
......
4、查看 Stream 信息
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636362623191-0"
 9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
    2) 1) "aa"
       2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
    2) 1) "bb"
       2) "bb"
127.0.0.1:6379>

五、参考文档

1、https://redis.io/topics/streams-intro

2、https://www.runoob.com/redis/redis-stream.html

正文完
 0