关于java:RPC实现原理之核心技术时间轮

9次阅读

共计 3240 个字符,预计需要花费 9 分钟才能阅读完成。

1. 为什么须要工夫轮?

在 Dubbo 中,为加强零碎的容错能力,会有相应的监听判断解决机制。比方 RPC 调用的超时机制的实现,消费者判断 RPC 调用是否超时,如果超时会将超时后果返回给应用层。

在 Dubbo 最开始的实现中,是将所有的返回后果(DefaultFuture)都放入一个汇合中,并且通过一个定时工作,每隔肯定工夫距离就扫描所有的 future,一一判断是否超时。

这样的实现形式尽管比较简单,然而存在一个问题就是会有很多无意义的遍历操作开销。比方一个 RPC 调用的超时工夫是 10 秒,而设置的超时断定的定时工作是 2 秒执行一次,那么可能会有 4 次左右无意义的循环检测判断操作。

为了解决上述场景中的相似问题,Dubbo 借鉴 Netty,引入了工夫轮算法,缩小无意义的轮询判断操作。

2. 工夫轮原理

对于以上问题,目标是要缩小额定的扫描操作就能够了。比如说一个定时工作是在 5 秒之后执行,那么在 4.9 秒之后才扫描这个定时工作,这样就能够极大缩小 CPU 开销。这时咱们就能够利用时钟轮的机制了。

时钟轮的本质上是参考了生存中的时钟跳动的原理,那么具体是如何实现呢?

在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一个周期,咱们能够将每个工作放到对应的时间槽位上。

如果时钟轮有 10 个槽位,而时钟轮一轮的周期是 10 秒,那么咱们每个槽位的单位工夫就是 1 秒,而下一层工夫轮的周期就是 100 秒,每个槽位的单位工夫也就是 10 秒,这就好比秒针与分针,在秒针周期下,刻度单位为秒,在分针周期下,刻度为分。

假如当初咱们有 3 个工作,别离是工作 A(0.9 秒之后执行)、工作 B(2.1 秒后执行)与工作 C(12.1 秒之后执行),咱们将这 3 个工作增加到时钟轮中,工作 A 被放到第 0 槽位,工作 B 被放到第 2 槽位,工作 C 被放到下一层工夫轮的第 2 个槽位,如下图所示:

通过这个场景咱们能够理解到,时钟轮的扫描周期仍是最小单位 1 秒,然而搁置其中的工作并没有重复扫描,每个工作会按要求只扫描执行一次,这样就可能很好的解决 CPU 节约的问题。

这样可能会呈现一个问题,如果一直叠加时钟轮,有限增长,效率是会出现降落,那么该如何解决?

针对于设定三个时钟轮,小时轮,分钟轮,秒级轮。

3. Dubbo 源码分析

次要是通过 Timer,Timeout,TimerTask 几个接口定义了一个定时器的模型,再通过 HashedWheelTimer 这个类实现了一个工夫轮定时器(默认的时间槽的数量是 512,能够自定义这个值)。它对外提供了简略易用的接口,只须要调用 newTimeout 接口,就能够实现对只需执行一次工作的调度。通过该定时器,Dubbo 在响应的场景中实现了高效的任务调度。

工夫轮外围类 HashedWheelTimer 构造:

4. 工夫轮在 RPC 的利用

  1. 调用超时与重试解决 :下面所讲的客户端调用超时的解决,就能够利用到时钟轮,咱们每发一次申请,都创立一个解决申请超时的定时工作放到时钟轮里,在高并发、高访问量的状况下,时钟轮每次只轮询一个时间槽位中的工作,这样会节俭大量的 CPU。

    源码 FailbackRegistry,代码片段:

        // 构造方法
        public FailbackRegistry(URL url) {super(url);
                this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);    
                // since the retry task will not be very much. 128 ticks is enough.
                // 重试器的时间槽数量,设定为 128
                retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
            }
        
        // 失败工夫工作注册器
        private void addFailedRegistered(URL url) {FailedRegisteredTask oldOne = failedRegistered.get(url);
                if (oldOne != null) {return;}
                FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
                oldOne = failedRegistered.putIfAbsent(url, newTask);
                if (oldOne == null) {
                    // never has a retry task. then start a new task for retry.
                    // 旧工作不存在,则搁置工夫轮,开启新一个工作
                    retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
               }
            }
  2. 定时心跳检测 :RPC 框架调用端定时向服务端发送的心跳检测,来保护连贯状态,咱们能够将心跳的逻辑封装为一个心跳工作,放到时钟轮里。心跳是要定时反复执行的,而时钟轮中的工作执行一遍就被移除了,对于这种须要反复执行的定时工作咱们该如何解决呢?咱们在定时工作逻辑完结的最初,再加上一段逻辑,重设这个工作的执行工夫,把它从新丢回到时钟轮里。这样就能够实现循环执行。

    源码 HeaderExchangeServer 代码片段:

    ...
        // 建设心跳工夫轮,槽位数默认为 128
        private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
                    TimeUnit.SECONDS, TICKS_PER_WHEEL);
        ...
            // 启动心跳工作检测
            private void startIdleCheckTask(URL url) {if (!server.canHandleIdle()) {AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
                    int idleTimeout = getIdleTimeout(url);
                    long idleTimeoutTick = calculateLeastDuration(idleTimeout);
                    CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
                    this.closeTimerTask = closeTimerTask;
        
                    // init task and start timer.
                    // 开启心跳检测工作
                    IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
                }
            }
        ...

    连贯检测,会一直执行,退出工夫轮中。

    AbstractTimerTask 源码:
    @Override
    public void run(Timeout timeout) throws Exception {Collection<Channel> c = channelProvider.getChannels();
        for (Channel channel : c) {if (channel.isClosed()) {continue;}
            // 调用心跳检测工作
            doTask(channel);
        }
        // 从新放入工夫轮中
        reput(timeout, tick);
    }

    还能够参考 HeartbeatTimerTask、ReconnectTimerTask 源码实现。

本文由 mirson 创作,心愿对大家有所帮忙, 谢谢!

正文完
 0