乐趣区

关于后端:RabbitMQRocketMQKafka延迟队列实现

提早队列在理论我的项目中有十分多的利用场景,最常见的比方订单未领取,超时勾销订单,在创立订单的时候发送一条提早音讯,达到延迟时间之后消费者收到音讯,如果订单没有领取的话,那么就勾销订单。
那么,明天咱们须要来谈的问题就是 RabbitMQ、RocketMQ、Kafka 中别离是怎么实现延时队列的,以及他们对应的实现原理是什么?
RabbitMQ
RabbitMQ 自身并不存在提早队列的概念,在 RabbitMQ 中是通过 DLX 死信交换机和 TTL 音讯过期来实现提早队列的。
TTL(Time to Live)过期工夫
有两种形式能够设置 TTL。

通过队列属性设置,这样的话队列中的所有音讯都会领有雷同的过期工夫
对音讯独自设置过期工夫,这样每条音讯的过期工夫都能够不同

那么如果同时设置呢?这样将会以两个工夫中较小的值为准。
针对队列的形式通过参数 x -message-ttl 来设置。
Map<String, Object> args = new HashMap<String, Object>();
args.put(“x-message-ttl”, 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
复制代码
针对音讯的形式通过 setExpiration 来设置。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration(“60000”);
channel.basicPublish(exchangeName, routingKey, mandatory, properties, “message”.getBytes());
复制代码
DLX(Dead Letter Exchange)死信交换机
一个音讯要成为死信音讯有 3 种状况:

音讯被回绝,比方调用 reject 办法,并且须要设置 requeue 为 false
音讯过期
队列达到最大长度

能够通过参数 dead-letter-exchange 设置死信交换机,也能够通过参数 dead-letter- exchange 指定 RoutingKey(未指定则应用原队列的 RoutingKey)。
Map<String, Object> args = new HashMap<String, Object>();
args.put(“x-dead-letter-exchange”, “exchange.dlx”);
args.put(“x-dead-letter-routing-key”, “routingkey”);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
复制代码
原理
当咱们对音讯设置了 TTL 和 DLX 之后,当音讯失常发送,通过 Exchange 达到 Queue 之后,因为设置了 TTL 过期工夫,并且音讯没有被生产(订阅的是死信队列),达到过期工夫之后,音讯就转移到与之绑定的 DLX 死信队列之中。
这样的话,就相当于通过 DLX 和 TTL 间接实现了提早音讯的性能,理论应用中咱们能够依据不同的提早级别绑定设置不同延迟时间的队列来达到实现不同延迟时间的成果。

RocketMQ
RocketMQ 和 RabbitMQ 不同,它自身就有提早队列的性能,然而开源版本只能反对固定延迟时间的音讯,不反对任意工夫精度的音讯(这个如同只有阿里云版本的能够)。
他的默认工夫距离分为 18 个级别,基本上也能满足大部分场景的须要了。
默认提早级别:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。
应用起来也十分的简略,间接通过 setDelayTimeLevel 设置提早级别即可。
setDelayTimeLevel(level)
复制代码
原理
实现原理说起来比较简单,Broker 会依据不同的提早级别创立出多个不同级别的队列,当咱们发送提早音讯的时候,依据不同的提早级别发送到不同的队列中,同时在 Broker 外部通过一个定时器去轮询这些队列(RocketMQ 会为每个提早级别别离创立一个定时工作),如果音讯达到发送工夫,那么就间接把音讯发送到指 topic 队列中。
RocketMQ 这种实现形式是放在服务端去做的,同时有个益处就是雷同延迟时间的音讯是能够保障有序性的。
谈到这里就顺便提一下对于音讯生产重试的原理,这个实质上来说其实是一样的,对于生产失败须要重试的音讯实际上都会被丢到提早队列的 topic 里,到期后再转发到真正的 topic 中。

Kafka
对于 Kafka 来说,原生并不反对提早队列的性能,须要咱们手动去实现,这里我依据 RocketMQ 的设计提供一个实现思路。
这个设计,咱们也不反对任意工夫精度的提早音讯,只反对固定级别的提早,因为对于大部分提早音讯的场景来说足够应用了。
只创立一个 topic,然而针对该 topic 创立 18 个 partition,每个 partition 对应不同的提早级别,这样做和 RocketMQ 一样有个益处就是能达到雷同延迟时间的音讯达到有序性。
原理

首先创立一个独自针对提早队列的 topic,同时创立 18 个 partition 针对不同的提早级别

发送音讯的时候依据提早参数发送到提早 topic 对应的 partition,对应的 key 为延迟时间,同时把原 topic 保留到 header 中

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(“delay_topic”, delayPartition, delayTime, data);
producerRecord.headers().add(“origin_topic”, topic.getBytes(StandardCharsets.UTF_8));
复制代码

内嵌的 consumer 独自设置一个 ConsumerGroup 去生产提早 topic 音讯,生产到音讯之后如果没有达到延迟时间那么就进行 pause,而后 seek 到以后 ConsumerRecord 的 offset 地位,同时应用定时器去轮询提早的 TopicPartition,达到延迟时间之后进行 resume

如果达到了延迟时间,那么就获取到 header 中的实在 topic,间接转发

这里为什么要进行 pause 和 resume 呢?因为如果不这样的话,如果超时未生产达到 max.poll.interval.ms 最大工夫(默认 300s),那么将会触发 Rebalance。

.markdown-body pre,.markdown-body pre>code.hljs{color:#333;background:#f8f8f8}.hljs-comment,.hljs-quote{color:#998;font-style:italic}.hljs-keyword,.hljs-selector-tag,.hljs-subst{color:#333;font-weight:700}.hljs-literal,.hljs-number,.hljs-tag .hljs-attr,.hljs-template-variable,.hljs-variable{color:teal}.hljs-doctag,.hljs-string{color:#d14}.hljs-section,.hljs-selector-id,.hljs-title{color:#900;font-weight:700}.hljs-subst{font-weight:400}.hljs-class .hljs-title,.hljs-type{color:#458;font-weight:700}.hljs-attribute,.hljs-name,.hljs-tag{color:navy;font-weight:400}.hljs-link,.hljs-regexp{color:#009926}.hljs-bullet,.hljs-symbol{color:#990073}.hljs-built_in,.hljs-builtin-name{color:#0086b3}.hljs-meta{color:#999;font-weight:700}.hljs-deletion{background:#fdd}.hljs-addition{background:#dfd}.hljs-emphasis{font-style:italic}.hljs-strong{font-weight:700}

退出移动版