乐趣区

关于redis:别再用-Redis-List-实现消息队列了Stream-专为队列而生

应用 Redis 的 List 实现音讯队列有很多局限性,比方:

没有良好的 ACK 机制;
没有 ConsumerGroup 生产组概念;
音讯沉积。
List 是线性构造,想要查问指定数据须要遍历整个列表;
Stream 是 Redis 5.0 引入的一种专门为音讯队列设计的数据类型,Stream 是一个蕴含 0 个或者多个元素的有序队列,这些元素依据 ID 的大小进行有序排列。

它实现了大部分音讯队列的性能:

音讯 ID 系列化生成;
音讯遍历;
音讯的阻塞和非阻塞读;
Consumer Groups 生产组;
ACK 确认机制。
反对多播。
提供了很多音讯队列操作命令,并且借鉴 Kafka 的 Consumer Groups 的概念,提供了生产组性能。

同时提供了音讯的长久化和主从复制机制,客户端能够拜访任何时刻的数据,并且能记住每一个客户端的拜访地位,从而保障音讯不失落。

废话少说,先来看下如何应用,官网文档详见:redis.io/topics/stre…

XADD:插入音讯
「云岚宗众弟子听命,击杀萧炎!」

当云山最初一字落下,那洋溢的紧绷氛围,登时宣告破碎,悬浮半空的泛滥云岚宗长老背地双翼一振,便是咻咻的划过天际,追杀萧炎。

云山应用以下指令向队列中插入「追杀萧炎」命令,让长老率领子弟去执行。

XADD 云岚宗 * task kill name 萧炎
“1645936602161-0”
复制代码
Stream 中的每个元素由键值对的模式组成,不同元素能够蕴含不同数量的键值对。

该命令的语法如下:

XADD streamName id field value [field value ...]
复制代码

音讯队列名称前面的「*」,示意让 Redis 为插入的音讯主动生成惟一 ID,当然也能够本人定义。

音讯 ID 由两局部组成:

以后毫秒内的工夫戳;
程序编号。从 0 为起始值,用于辨别同一时间内产生的多个命令。
通过将元素 ID 与工夫进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将流变成了一种只执行追加操作(append only)的数据结构。

这种个性对于应用流实现音讯队列和事件零碎的用户来说是十分重要的:

用户能够确信,新的音讯和事件只会呈现在已有音讯和事件之后,就像事实世界里新事件总是产生在已有事件之后一样,一切都是有序进行的。

XREAD:读取音讯
云凌老狗应用如下指令接管云山的命令:

XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
   2) 1) 1) "1645936602161-0"
         2) 1) "task"
            2) "kill"
            3) "name"
            4) "萧炎" # 萧炎
复制代码

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

该指令能够同时对多个流进行读取,每个心法对应含意如下:

COUNT:示意每个流中最多读取的元素个数;
BLOCK:阻塞读取,当音讯队列没有音讯的时候,则阻塞期待,0 示意有限期待,单位是毫秒。
ID:音讯 ID,在读取音讯的时候能够指定 ID,并从这个 ID 的下一条音讯开始读取,0-0 则示意从第一个元素开始读取。
如果想应用 XREAD 进行程序生产,每次读取后要记住返回的音讯 ID,下次调用 XREAD 就将上一次返回的音讯 ID 作为参数传递到下一次调用就能够持续生产后续的音讯了。

云韵宗主,我明天刚到云岚宗,历史的音讯就不接了,只想接管我应用 XREAD 阻塞期待的那一刻开始通过 XADD 公布的音讯要咋整?

运行「」心法即可,心法的最初「」心法即可,心法的最初「」心法即可,心法的最初「」符号示意读取最新的阻塞音讯,读取不到则始终死等。

期待过程中,其余长老向队列追加音讯,则会立刻读取到。

XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 $
复制代码
这么容易就实现音讯队列了么?说好的 ACK 机制呢?

这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当从新执行 XREAD COUNT 2 BLOCK 0 STREAMS 云岚宗 0-0 指令的时候又会从新读取到。

所以咱们还须要 ACK 机制,

接下来,咱们来一个真正的音讯队列。

ConsumerGroup
Redis Stream 的 ConsumerGroup(消费者组)容许用户将一个流从逻辑上划分为多个不同的流,并让 ConsumerGroup 的消费者去解决。

它是一个弱小的反对多播的可长久化的音讯队列。Redis Stream 借鉴了 Kafka 的设计。

Stream 的高可用是建设主从复制根底上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是能够反对高可用的。

Redis Stream 的构造如上图所示。有一个音讯链表,每个音讯都有一个惟一的 ID 和对应的内容;
音讯长久化;
每个生产组的状态是独立的,不不影响,同一份的 Stream 音讯会被所有的生产组生产;
一个生产组能够有多个消费者组成,消费者之间是竞争关系,任意一个消费者读取了音讯都会使 last_deliverd_id 往前挪动;
每个消费者有一个 pending_ids 变量,用于记录以后消费者读取了然而还没 ack 的音讯。它用来保障音讯至多被客户端生产了一次。
生产组实现的音讯队列次要波及以下三个指令:

XGROUP 用于创立、销毁和治理消费者组。
XREADGROUP 用于通过消费者组从流中读取。
XACK 是容许消费者将待处理音讯标记为已正确处理的命令。
创立生产组
Stream 通过 XGROUP CREATE 指令创立生产组 (Consumer Group),须要传递起始音讯 ID 参数用来初始化 last_delivered_id 变量。

咱们应用 XADD 往 bossStream 队列插入一些音讯:

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40
复制代码

如下指令,为音讯队列名为 bossStream 创立「青龙门」和「六扇门」两个生产组。

# 语法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龙门 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇门 0-0 MKSTREAM
复制代码

stream:指定队列的名字;
group:指定生产组名字;
start_id:指定生产组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取音讯,0-0 从第一条开始读取,$ 示意从最初一条向后开始读取,只接管新音讯。
MKSTREAM:默认状况下,XGROUP CREATE 命令在指标流不存在时返回谬误。能够应用可选 MKSTREAM 子命令作为 之后的最初一个参数来主动创立流。
读取音讯
让「青龙门」生产组的 consumer1 从 bossStream 阻塞读取一条音讯:

XREADGROUP GROUP 青龙门 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957821396-0"
         2) 1) "name"
            2) "zhangsan"
            3) "age"
            4) "26"
复制代码

语法如下:

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]
复制代码

[] 内的示意可选参数,该命令与 XREAD 大同小异,区别在于新增 GROUP groupName consumerName 选项。

该选项的两个参数别离用于指定被读取的消费者组以及负责解决音讯的消费者。

其中:

:命令的最初参数 >,示意从尚未被生产的音讯开始读取;
BLOCK:阻塞读取;
敲黑板了

如果音讯队列中的音讯被生产组的一个消费者生产了,这条音讯就不会再被这个生产组的其余消费者读取到。

比方 consumer2 执行读取操作:

XREADGROUP GROUP 青龙门 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957838700-0"
         2) 1) "name"
            2) "lisi"
            3) "age"
            4) "2"
复制代码

consumer2 不能再读取到 zhangsan 了,而是读取下一条 lisi 因为这条音讯曾经被 consumer1 读取了。

应用消费者的另一个目标能够让组内的多个消费者分担读取音讯,也就是每个消费者读取局部音讯,从而实现平衡负载。

比方一个生产组有三个消费者 C1、C2、C3 和一个蕴含音讯 1、2、3、4、5、6、7 的流:

XPENDING 查看已读未确认音讯
为了保障消费者在生产的时候产生故障或者宕机重启后仍然能够读取音讯,Stream 外部有一个队列(pending List)保留每个消费者读取然而还没有执行 ACK 的音讯。

如果消费者应用了 XREADGROUP GROUP groupName consumerName 读取音讯,然而没有给 Stream 发送 XACK 命令,音讯仍然保留。

比方查看 bossStream 中的 生产组「青龙门」中各个消费者已读取未确认的音讯信息:

XPENDING bossStream 青龙门
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
复制代码

1)未确认音讯条数;
2) ~ 3)青龙门中所有消费者读取的音讯最小和最大 ID;
查看 consumer1 读取了哪些数据,应用以下命令:

XPENDING bossStream 青龙门 - + 10 consumer1
1) 1) "1645957821396-0"
   2) "consumer1"
   3) (integer) 3758384
   4) (integer) 1
复制代码

ACK 确认
所以当接管到音讯并且生产胜利当前,咱们须要手动 ACK 告诉 Streams,这条音讯就会被删除了。命令如下:

XACK bossStream 青龙门 1645957821396-0 1645957838700-0
(integer) 2
复制代码

语法如下:

XACK key group-key ID [ID …]

生产确认减少了音讯的可靠性,个别在业务解决实现之后,须要执行 ack 确认音讯曾经被生产实现,整个流程的执行如下图所示:

应用 Redisson 实战
应用 maven 增加依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>
复制代码

增加 Redis 配置,码哥的 Redis 没有配置明码,大家依据理论状况配置即可。

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false
复制代码
@Slf4j
@Service
public class QueueService {
    @Autowired
    private RedissonClient redissonClient;
  
    /**
     * 发送音讯到队列
     *
     * @param message
     */
    public void sendMessage(String message) {RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.add("speed", "19");
        stream.add("velocity", "39%");
        stream.add("temperature", "10C");
    }
  
    /**
     * 消费者生产音讯
     *
     * @param message
     */
    public void consumerMessage(String message) {RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.createGroup("sensors_data", StreamMessageId.ALL);
        Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
        for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {Map<String, String> msg = entry.getValue();
          System.out.println(msg);
          stream.ack("sensors_data", entry.getKey());
        }
    }
}
复制代码

最初
如果你感觉此文对你有一丁点帮忙,点个赞。或者能够退出我的开发交换群:1025263163 互相学习,咱们会有业余的技术答疑解惑

如果你感觉这篇文章对你有点用的话,麻烦请给咱们的开源我的项目点点 star:http://github.crmeb.net/u/defu 不胜感激!

PHP 学习手册:https://doc.crmeb.com
技术交换论坛:https://q.crmeb.com

退出移动版