我是3y,一年CRUD
教训用十年的markdown
程序员👨🏻💻长年被誉为职业八股文选手
前阵子,有个小伙伴找到问我,如果要实现延时发送,那是基于什么来做的。
我看到这个问题之后,略微思考了下,感觉的确也是austin
平台所须要实现的性能。对于前端而言,只有让业务方在创立模板的时候填选屏蔽类型,后端依据这个字段削减一点点细节,这个需要就做完了,简略!
提早音讯如何实现?
提早音讯就是字面上的意思:当接管到音讯之后,我须要隔一段时间进行解决(绝对于立马解决,它隔了一段时间,所以他叫提早音讯)。
在原生的Java有DelayQueue
供咱们去应用,在应用的时候,咱们add
进去的队列的元素须要实现Delayed
接口(同时该接口继承了Comparable
接口,所以咱们DelayQueue
是有序的)
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
从poll
的源码上能够清晰地发现实质上就是在取数的时候判断了下工夫
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
有的人就反驳到:这不是废话吗?必定要判断工夫啊,不判断工夫怎么晓得我要提早的音讯什么时候执行。
明确了这点之后,咱们再来别的计划。因为在生产环境中是不太可能应用JDK原生提早队列的,它是没有长久化的,重启就会导致数据失落。
当austin
我的项目应用内存队列去解耦解决数据曾经有人提出服务器重启的时候该怎么办,我的解决思路就是通过优雅敞开服务器这种伎俩去尽量避免数据失落,而提早队列这种就不能这么干了,咱们等不了这么久的。
略微想想还有什么存储适宜当队列且有长久化机制的呢?
答案不言而喻:Redis
和音讯队列(Kafka
/RocketMQ
/RabbmitMQ
等)
咱们先来看Redis
里提供了一种数据结构叫做zset
,它是可排序的汇合并且Redis原生就反对长久化。有赞的提早队列就是基于通过zset
进行设计和存储的。整体架构如下图:
简略了解这张图就是:将须要提早的音讯搁置Redis,通过Timer
轮询失去可执行的音讯,将可执行的音讯搁置不同的Topic
供业务方自行生产。
更多的设计思路能够参考有赞的技术原文,这里我不再赘述:https://tech.youzan.com/queuing_delay/
通过timer
去轮询zset
查看是否有可执行的音讯是一种思路,也有人通过Redis的过期回调的姿态也能达到提早音讯的成果(把音讯执行的工夫定义为key
过期的工夫,当key
触发了过期回调,那阐明该音讯可执行了)。
说完Redis
,咱们再来看看音讯队列。在austin
我的项目上应用音讯队列是Kafka
,而Kafka
在官网是没有提供提早队列这种机制的。不过RabbmitMQ
和RocketMQ
都有对应的机制,咱们能够简略看看窥探下它们的实现思路。
RabbmitMQ
它的提早队列机制实质上也是通过TTL
(Time To Live 音讯存活的工夫)所实现的,当队列里的元素触发了过期时,会被送往到Dead Letter Exchanges
(死信队列中)。咱们能够将死信队列的元素再次转发,对其进行生产,从而达到提早队列的成果。
毕竟RabbmitMQ
是专门做音讯队列的,所以它对音讯的可靠性会比Redis
更加高(音讯投递的可靠性、至多解决一次的生产语义)
RocketMQ
反对在咱们投递音讯的时候设置提早等级
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
默认反对18个提早等级,别离是:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
当咱们设置了提早等级的音讯之后,RocketMQ
不会把音讯间接投递到对应的topic,而是转发到对应提早等级的队列中。在Broker外部会为每个提早队列起TimerTask
来进行判断是否有音讯达到了工夫。
ScheduleMessageService#start
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
如果到期了,则将音讯从新存储到CommitLog,转发到真正指标的topic
对RocketMQ
提早队列比拟感兴趣的,举荐看这篇文章:https://cloud.tencent.com/developer/article/1581368
实现需求
在后面提到咱们能够利用JDK原生的延时队列,又或是Redis的zset
数据结构或者其过期工夫机制、又或是RabbitMQ
应用TTL
+死信队列机制、又或是RocketMQ
的延时等级队列机制来实现咱们的需要(延时队列)
针对此次需要,下面所讲的延时队列,我都没用到…
austin
我的项目引入的是Kafka
,不太可能去为了延时队列去引入第二种音讯队列(RabbitMQ
在互联网应该用得绝对较少,RocketMQ
须要改变配置文件的提早等级能力反对更丰盛的延时需要)。
如果基于Kafka或者Redis去二次开发延时队列,开发成本还是有不少的,在GitHub也还没捞到我想要的轮子。
于是,我换了一种计划:万物皆扫表
针对这次需要(早晨发的音讯,次日早上发送),就不须要上延时队列,因为austin
曾经接入了分布式定时工作框架了(对应的实现是xxl-job
)
只有把早晨的接管到的音讯扔进Redis list
,而后启个定时工作(每天早上9点)轮询该list
是否有数据,如果有再从新做解决就完事了。
总结
这篇文章次要讲述了如果咱们要应用延时队列,咱们能够有什么计划,他们的设计是怎么样的。在需要侧上看,这个需要就是「延时队列」的场景,但基于现状的零碎架构和开发成本思考,咱们是能够用另类(分布式定时工作框架)的形式去把需要给实现了。
很多时候,咱们看到的零碎很烂,技术栈很烂,发现好多场景都没有用到最佳实际而感到烦恼,在年老的时候都想有重构的心。但实际上每引入一个中间件都是须要付出老本的,毛糙也有毛糙的益处。
只有业务能完满反对,那就是好的计划。想要搞本人想搞的技术,那就做开源,如果有一天我感觉分布式定时工作来实现此次需要不悦目了,我再花工夫来重构才干掉,当初就这么实现吧( // TODO)。
如果你切实是感觉看着糟心,欢送提个pull request
,这样我就不得不把这种实现给干掉了(我对提过来的pull request
都会审慎且用心解决)
都看到这里了,点个赞一点都不过分吧?我是3y,下期见。
austin我的项目源码Gitee链接:gitee.com/austin
austin我的项目源码GitHub链接:github.com/austin
发表回复