1. 为什么须要工夫轮?
在 Dubbo 中,为加强零碎的容错能力,会有相应的监听判断解决机制。比方 RPC 调用的超时机制的实现,消费者判断 RPC 调用是否超时,如果超时会将超时后果返回给应用层。
在 Dubbo 最开始的实现中,是将所有的返回后果(DefaultFuture)都放入一个汇合中,并且通过一个定时工作,每隔肯定工夫距离就扫描所有的 future,一一判断是否超时。
这样的实现形式尽管比较简单,然而存在一个问题就是会有很多无意义的遍历操作开销。比方一个 RPC 调用的超时工夫是 10 秒,而设置的超时断定的定时工作是 2 秒执行一次,那么可能会有 4 次左右无意义的循环检测判断操作。
为了解决上述场景中的相似问题,Dubbo 借鉴 Netty,引入了工夫轮算法,缩小无意义的轮询判断操作。
2. 工夫轮原理
对于以上问题,目标是要缩小额定的扫描操作就能够了。比如说一个定时工作是在 5 秒之后执行,那么在 4.9 秒之后才扫描这个定时工作,这样就能够极大缩小 CPU 开销。这时咱们就能够利用时钟轮的机制了。
时钟轮的本质上是参考了生存中的时钟跳动的原理,那么具体是如何实现呢?
在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一个周期,咱们能够将每个工作放到对应的时间槽位上。
如果时钟轮有 10 个槽位,而时钟轮一轮的周期是 10 秒,那么咱们每个槽位的单位工夫就是 1 秒,而下一层工夫轮的周期就是 100 秒,每个槽位的单位工夫也就是 10 秒,这就好比秒针与分针,在秒针周期下,刻度单位为秒,在分针周期下,刻度为分。
假如当初咱们有 3 个工作,别离是工作 A(0.9 秒之后执行)、工作 B(2.1 秒后执行)与工作 C(12.1 秒之后执行),咱们将这 3 个工作增加到时钟轮中,工作 A 被放到第 0 槽位,工作 B 被放到第 2 槽位,工作 C 被放到下一层工夫轮的第 2 个槽位,如下图所示:
通过这个场景咱们能够理解到,时钟轮的扫描周期仍是最小单位 1 秒,然而搁置其中的工作并没有重复扫描,每个工作会按要求只扫描执行一次,这样就可能很好的解决 CPU 节约的问题。
这样可能会呈现一个问题,如果一直叠加时钟轮,有限增长,效率是会出现降落,那么该如何解决?
针对于设定三个时钟轮,小时轮,分钟轮,秒级轮。
3. Dubbo 源码分析
次要是通过 Timer,Timeout,TimerTask 几个接口定义了一个定时器的模型,再通过 HashedWheelTimer 这个类实现了一个工夫轮定时器(默认的时间槽的数量是 512,能够自定义这个值)。它对外提供了简略易用的接口,只须要调用 newTimeout 接口,就能够实现对只需执行一次工作的调度。通过该定时器,Dubbo 在响应的场景中实现了高效的任务调度。
工夫轮外围类 HashedWheelTimer 构造:
4. 工夫轮在 RPC 的利用
-
调用超时与重试解决 :下面所讲的客户端调用超时的解决,就能够利用到时钟轮,咱们每发一次申请,都创立一个解决申请超时的定时工作放到时钟轮里,在高并发、高访问量的状况下,时钟轮每次只轮询一个时间槽位中的工作,这样会节俭大量的 CPU。
源码 FailbackRegistry,代码片段:
// 构造方法 public FailbackRegistry(URL url) {super(url); this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD); // since the retry task will not be very much. 128 ticks is enough. // 重试器的时间槽数量,设定为 128 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); } // 失败工夫工作注册器 private void addFailedRegistered(URL url) {FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) {return;} FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. // 旧工作不存在,则搁置工夫轮,开启新一个工作 retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
-
定时心跳检测 :RPC 框架调用端定时向服务端发送的心跳检测,来保护连贯状态,咱们能够将心跳的逻辑封装为一个心跳工作,放到时钟轮里。心跳是要定时反复执行的,而时钟轮中的工作执行一遍就被移除了,对于这种须要反复执行的定时工作咱们该如何解决呢?咱们在定时工作逻辑完结的最初,再加上一段逻辑,重设这个工作的执行工夫,把它从新丢回到时钟轮里。这样就能够实现循环执行。
源码 HeaderExchangeServer 代码片段:
... // 建设心跳工夫轮,槽位数默认为 128 private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL); ... // 启动心跳工作检测 private void startIdleCheckTask(URL url) {if (!server.canHandleIdle()) {AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); this.closeTimerTask = closeTimerTask; // init task and start timer. // 开启心跳检测工作 IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } } ...
连贯检测,会一直执行,退出工夫轮中。
AbstractTimerTask 源码:
@Override public void run(Timeout timeout) throws Exception {Collection<Channel> c = channelProvider.getChannels(); for (Channel channel : c) {if (channel.isClosed()) {continue;} // 调用心跳检测工作 doTask(channel); } // 从新放入工夫轮中 reput(timeout, tick); }
还能够参考 HeartbeatTimerTask、ReconnectTimerTask 源码实现。
本文由 mirson 创作,心愿对大家有所帮忙, 谢谢!