关于java:延时队列我在项目里是怎么实现的

5次阅读

共计 3084 个字符,预计需要花费 8 分钟才能阅读完成。

我是 3y,一年 CRUD 教训用十年的 markdown 程序员👨🏻‍💻长年被誉为职业八股文选手

前阵子,有个小伙伴找到问我,如果要实现 延时发送,那是基于什么来做的。

我看到这个问题之后,略微思考了下,感觉的确也是 austin 平台所须要实现的性能。对于前端而言,只有让业务方在创立模板的时候填选屏蔽类型,后端依据这个字段削减一点点细节,这个需要就做完了,简略

提早音讯如何实现?

提早音讯就是字面上的意思:当接管到音讯之后,我须要 隔一段时间 进行解决(绝对于立马解决,它隔了一段时间,所以他叫提早音讯)。

在原生的 Java 有 DelayQueue 供咱们去应用,在应用的时候,咱们 add 进去的队列的元素须要实现 Delayed 接口(同时该接口继承了 Comparable 接口,所以咱们 DelayQueue有序 的)

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

poll 的源码上能够清晰地发现实质上就是在 取数的时候判断了下工夫

long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
  return q.poll();

有的人就反驳到:这不是废话吗?必定要判断工夫啊,不判断工夫怎么晓得我要提早的音讯什么时候执行

明确了这点之后,咱们再来别的计划。因为在生产环境中是不太可能应用 JDK 原生提早队列的,它是 没有长久化 的,重启就会导致数据失落。

austin 我的项目应用内存队列去解耦解决数据曾经有人提出服务器重启的时候该怎么办,我的解决思路就是通过 优雅敞开服务器 这种伎俩去尽量避免数据失落,而提早队列这种就不能这么干了,咱们等不了这么久的。

略微想想还有什么 存储 适宜当队列且有长久化机制的呢?

答案不言而喻:Redis和音讯队列 (Kafka/RocketMQ/RabbmitMQ 等)

咱们先来看 Redis 里提供了一种数据结构叫做 zset,它是 可排序 的汇合并且 Redis 原生就反对长久化。有赞 的提早队列就是基于通过 zset 进行设计和存储的。整体架构如下图:

简略了解这张图就是:将须要提早的音讯搁置 Redis,通过 Timer 轮询失去可执行的音讯,将可执行的音讯搁置不同的 Topic 供业务方自行生产。

更多的设计思路能够参考有赞的技术原文,这里我不再赘述:https://tech.youzan.com/queuing_delay/

通过 timer 去轮询 zset 查看是否有可执行的音讯是一种思路,也有人通过 Redis 的 过期回调 的姿态也能达到提早音讯的成果(把音讯执行的工夫定义为 key 过期的工夫,当 key 触发了过期回调,那阐明该音讯可执行了)。

说完 Redis,咱们再来看看音讯队列。在austin 我的项目上应用音讯队列是 Kafka,而Kafka 在官网是没有提供提早队列这种机制的。不过 RabbmitMQRocketMQ都有对应的机制,咱们能够简略看看窥探下它们的实现思路。

RabbmitMQ它的提早队列机制实质上也是通过TTL(Time To Live 音讯存活的工夫)所实现的,当队列里的元素触发了过期时,会被送往到Dead Letter Exchanges(死信队列中)。咱们能够将死信队列的元素再次转发,对其进行生产,从而达到提早队列的成果。

毕竟 RabbmitMQ 是专门做音讯队列的,所以它对音讯的 可靠性 会比 Redis 更加高(音讯投递的可靠性、至多解决一次的生产语义)

RocketMQ反对在咱们投递音讯的时候设置 提早等级

Message message = new Message("TestTopic", ("Hello scheduled message" + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);

默认反对 18 个提早等级,别离是:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

当咱们设置了提早等级的音讯之后,RocketMQ不会把音讯间接投递到对应的 topic,而是 转发到 对应提早等级的队列中。在 Broker 外部会为每个提早队列起 TimerTask 来进行判断是否有音讯达到了工夫。

ScheduleMessageService#start

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}  

如果到期了,则将音讯从新存储到 CommitLog,转发到真正指标的topic

RocketMQ 提早队列比拟感兴趣的,举荐看这篇文章:https://cloud.tencent.com/developer/article/1581368

实现需求

在后面提到咱们能够利用 JDK 原生的延时队列,又或是 Redis 的 zset 数据结构或者其过期工夫机制、又或是 RabbitMQ 应用 TTL+ 死信队列机制、又或是RocketMQ 的延时等级队列机制来实现咱们的 需要(延时队列)

针对此次需要,下面所讲的延时队列,我都没用到 …

austin我的项目引入的是 Kafka,不太可能去为了延时队列去引入第二种音讯队列(RabbitMQ 在互联网应该用得绝对较少,RocketMQ须要改变配置文件的提早等级能力反对更丰盛的延时需要)。

如果基于 Kafka 或者 Redis 去 二次开发 延时队列,开发成本还是有不少的,在 GitHub 也还没捞到我想要的轮子。

于是,我换了一种计划:万物皆扫表

针对这次需要(早晨发的音讯,次日早上发送),就不须要上延时队列,因为 austin 曾经接入了分布式定时工作框架了(对应的实现是xxl-job

只有把早晨的接管到的音讯扔进 Redis list,而后启个定时工作(每天早上 9 点)轮询该list 是否有数据,如果有再从新做解决就完事了。

总结

这篇文章次要讲述了 如果咱们要应用延时队列,咱们能够有什么计划,他们的设计是怎么样的 。在需要侧上看,这个需要就是「延时队列」的场景,但基于现状的零碎架构和开发成本思考,咱们是能够用 另类(分布式定时工作框架)的形式去把需要给实现了。

很多时候,咱们看到的零碎很烂,技术栈很烂,发现好多场景都没有用到 最佳实际 而感到烦恼,在 年老的时候 都想有重构的心。但实际上每引入一个中间件都是须要付出老本的,毛糙也有毛糙的益处。

只有业务能完满反对,那就是好的计划。想要搞本人想搞的技术,那就做开源,如果有一天我感觉分布式定时工作来实现此次需要不悦目了,我再花工夫来重构才干掉,当初就这么实现吧(// TODO)。

如果你切实是感觉看着糟心,欢送提个 pull request,这样我就不得不把这种实现给干掉了(我对提过来的pull request 都会审慎且 用心 解决)

都看到这里了,点个赞一点都不过分吧?我是 3y,下期见。

austin 我的项目 源码Gitee 链接:gitee.com/austin

austin 我的项目源码 GitHub 链接:github.com/austin

正文完
 0