关于java:跟Kafka学技术系列之时间轮

8次阅读

共计 6683 个字符,预计需要花费 17 分钟才能阅读完成。

写在后面

kafka 是一个分布式消息中间件,其高可用高吞吐的特点是大数据畛域首选的消息中间件,Kafka 是分布式音讯队列的程序读写文件分段组织串联起来思维的鼻祖,包含 RocketMq 这些音讯队列都是借鉴了 Kafka 晚期的架构和设计思路革新而来,所以在架构设计层面,Kafka 有十分多值得借鉴的中央。本文是作者介绍 Kafka 优良架构设计文章中的一篇,文中的代码和流程图均是 base on 0.10.2.0 版本。

引出环形队列和提早队列

从 2 个面试题说起,第 1 个问题,如果一台机器上有 10w 个定时工作,如何做到高效触发?

具体场景是:

有一个 APP 实时音讯通道零碎,对每个用户会保护一个 APP 到服务器的 TCP 连贯,用来实时收发音讯,对这个 TCP 连贯,有这样一个需要:“如果间断 30s 没有申请包(例如登录,音讯,keepalive 包),服务端就要将这个用户的状态置为离线”。

其中,单机 TCP 同时在线量约在 10w 级别,keepalive 申请包较扩散大略 30s 一次,吞吐量约在 3000qps。

怎么做?

罕用计划应用 time 定时工作,每秒扫描一次所有连贯的汇合 Map<uid, last_packet_time>,把连接时间(每次有新的申请更新对应连贯的连接时间)比以后工夫的差值大 30s 的连贯找进去解决。

另一种计划,应用环形队列法:

三个重要的数据结构:

1)30s 超时,就创立一个 index 从 0 到 30 的环形队列(实质是个数组)

2)环上每一个 slot 是一个 Set<uid>,工作汇合

3)同时还有一个 Map<uid, index>,记录 uid 落在环上的哪个 slot 里

这样当有某用户 uid 有申请包达到时:

1)从 Map 构造中,查找出这个 uid 存储在哪一个 slot 里

2)从这个 slot 的 Set 构造中,删除这个 uid

3)将 uid 重新加入到新的 slot 中,具体是哪一个 slot 呢 => Current Index 指针所指向的上一个 slot,因为这个 slot,会被 timer 在 30s 之后扫描到

4)更新 Map,这个 uid 对应 slot 的 index 值

哪些元素会被超时掉呢?

Current Index 每秒种挪动一个 slot,这个 slot 对应的 Set<uid> 中所有 uid 都应该被个体超时!如果最近 30s 有申请包来到,肯定被放到 Current Index 的前一个 slot 了,Current Index 所在的 slot 对应 Set 中所有元素,都是最近 30s 没有申请包来到的。

所以,当没有超时时,Current Index 扫到的每一个 slot 的 Set 中应该都没有元素。

两种计划比照:

计划一每次都要轮询所有数据,而计划二应用环形队列只须要轮询这一刻须要过期的数据,如果没有数据过期则没有数据要解决,并且是批量超时,并且因为是环形构造更加节约空间,这很适宜高性能场景。

第二个问题:在开发过程中有提早肯定工夫的工作要执行,怎么做?

如果不反复造轮子的话,咱们的抉择当然是提早队列或者 Timer。

提早队列和在 Timer 中增 加延时工作采纳数组示意的最小堆的数据结构实现,每次放入新元素和移除队首元素工夫复杂度为 O(nlog(n))。

工夫轮

计划二所采纳的环形队列,就是工夫轮的底层数据结构,它可能让须要解决的数据(工作的形象)集中,在 Kafka 中存在大量的提早操作,比方提早生产、提早拉取以及提早删除等。Kafka 并没有应用 JDK 自带的 Timer 或者 DelayQueue 来实现提早的性能,而是基于工夫轮自定义了一个用于实现提早性能的定时器(SystemTimer)。JDK 的 Timer 和 DelayQueue 插入和删除操作的均匀工夫复杂度为 O(nlog(n)),并不能满足 Kafka 的高性能要求,而基于工夫轮能够将插入和删除操作的工夫复杂度都降为 O(1)。工夫轮的利用并非 Kafka 独有,其利用场景还有很多,在 Netty、Akka、Quartz、Zookeeper 等组件中都存在工夫轮的踪影。

工夫轮的数据结构

参考下图,Kafka 中的工夫轮(TimingWheel)是一个存储定时工作的环形队列,底层采纳数组实现,数组中的每个元素能够寄存一个定时工作列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项示意的都是定时工作项(TimerTaskEntry),其中封装了真正的定时工作 TimerTask。在 Kafka 源码中对这个 TimeTaskList 是用一个名称为 buckets 的数组示意的,所以前面介绍中可能 TimerTaskList 也会被称为 bucket。

工夫轮相干名词解释

tickMs:工夫轮由多个工夫格组成,每个工夫格就是 tickMs,它代表以后工夫轮的根本时间跨度。

wheelSize:代表每一层工夫轮的格数

interval:以后工夫轮的总体时间跨度,interval=tickMs × wheelSize

startMs:结构当层工夫轮时候的以后工夫,第一层的工夫轮的 startMs 是 TimeUnit.NANOSECONDS.toMillis(nanoseconds()), 下层工夫轮的 startMs 为上层工夫轮的 currentTime。

currentTime:示意工夫轮以后所处的工夫,currentTime 是 tickMs 的整数倍(通过 currentTime=startMs – (startMs % tickMs 来保正 currentTime 肯定是 tickMs 的整数倍),这个运算类比钟表中分钟里 65 秒分针指针指向的还是 1 分钟)。currentTime 能够将整个工夫轮划分为到期局部和未到期局部,currentTime 以后指向的工夫格也属于到期局部,示意刚好到期,须要解决此工夫格所对应的 TimerTaskList 的所有工作。

工夫轮中的工作寄存

若工夫轮的 tickMs=1ms,wheelSize=20,那么能够计算得出 interval 为 20ms。初始状况下表盘指针 currentTime 指向工夫格 0,此时有一个定时为 2ms 的工作插入进来会寄存到工夫格为 2 的 TimerTaskList 中。随着工夫的一直推移,指针 currentTime 一直向前推动,过了 2ms 之后,当达到工夫格 2 时,就须要将工夫格 2 所对应的 TimeTaskList 中的工作做相应的到期操作。此时若又有一个定时为 8ms 的工作插入进来,则会寄存到工夫格 10 中,currentTime 再过 8ms 后会指向工夫格 10。如果同时有一个定时为 19ms 的工作插入进来怎么办?新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入到本来曾经到期的工夫格 1 中。总之,整个工夫轮的总体跨度是不变的,随着指针 currentTime 的一直推动,以后工夫轮所能解决的时间段也在一直后移,总体工夫范畴在 currentTime 和 currentTime+interval 之间。

工夫轮的升降级

如果此时有个定时为 350ms 的工作该如何解决?间接裁减 wheelSize 的大小么?Kafka 中不乏几万甚至几十万毫秒的定时工作,这个 wheelSize 的裁减没有底线,就算将所有的定时工作的到期工夫都设定一个下限,比方 100 万毫秒,那么这个 wheelSize 为 100 万毫秒的工夫轮不仅占用很大的内存空间,而且效率也会拉低。Kafka 为此引入了层级工夫轮的概念,当工作的到期工夫超过了以后工夫轮所示意的工夫范畴时,就会尝试增加到下层工夫轮中

参考上图,复用之前的案例,第一层的工夫轮 tickMs=1ms, wheelSize=20, interval=20ms。第二层的工夫轮的 tickMs 为第一层工夫轮的 interval,即为 20ms。每一层工夫轮的 wheelSize 是固定的,都是 20,那么第二层的工夫轮的总体时间跨度 interval 为 400ms。以此类推,这个 400ms 也是第三层的 tickMs 的大小,第三层的工夫轮的总体时间跨度为 8000ms。
方才提到的 350ms 的工作,不会插入到第一层工夫轮,会插入到 interval=20*20 的第二层工夫轮中,具体插入到工夫轮的哪个 bucket 呢?先用 350/tickMs(20)=virtualId(17),而后 virtualId(17) %wheelSize (20) = 17,所以 350 会放在第 17 个 bucket。如果此时有一个 450ms 后执行的工作,那么会放在第三层工夫轮中,依照方才的计算公式,会放在第 0 个 bucket。第 0 个 bucket 里会蕴含

[400,800)ms 的工作。随着工夫流逝,当工夫过来了 400ms,那么 450ms 后就要执行的工作还剩下 50ms 的工夫能力执行,此时有一个工夫轮降级的操作,将 50ms 工作从新提交到层级工夫轮中,那么此时 50ms 的工作依据公式会放入第二个工夫轮的第 2 个 bucket 中,此 bucket 的工夫范畴为 [40,60)ms,而后再通过 40ms,这个 50ms 的工作又会被监控到,此时间隔工作执行还有 10ms,同样将 10ms 的工作提交到层级工夫轮,此时会退出到第一层工夫轮的第 10 个 bucket,所以再通过 10ms 后,此工作到期,最终执行。

整个工夫轮的降级降级操作是不是很相似于咱们的时钟?第一层工夫轮 tickMs=1s, wheelSize=60,interval=1min,此为秒钟;第二层 tickMs=1min,wheelSize=60,interval=1hour,此为分钟;第三层 tickMs=1hour,wheelSize 为 12,interval 为 12hours,此为时钟。而钟表的指针就对应程序中的 currentTime,这个前面剖析代码时候会讲到(对这个的了解也是工夫轮了解的重点和难点)。

Kafka 中工作增加和驱动工夫轮滚动的外围流程:

重点代码介绍

这是往 SystenTimer 中增加一个工作

// 在 Systemtimer 中增加一个工作,工作被包装为一个 TimerTaskEntry
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// 先判断是否能够增加进工夫轮中,如果不能够增加进去代表工作曾经过期或者工作被勾销,留神这里的 timingWheel 持有上一层工夫轮的援用,所以可能存在递归调用
  if (!timingWheel.add(timerTaskEntry)) {
    // Already expired or cancelled
    if (!timerTaskEntry.cancelled)
     // 过期工作间接线程池异步执行掉
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
} 

timingWheel 增加工作,递归增加直到增加该工作进适合的工夫轮的 bucket 中

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  val expiration = timerTaskEntry.expirationMs
  // 工作勾销
  if (timerTaskEntry.cancelled) {
    // Cancelled
    false
  } else if (expiration < currentTime + tickMs) {
    // 工作过期后会被执行
    false
  } else if (expiration < currentTime + interval) {// 工作过期工夫比以后工夫轮工夫加周期小阐明工作过期工夫在本工夫轮周期内
    val virtualId = expiration / tickMs
    // 找到工作对应本工夫轮的 bucket
    val bucket = buckets((virtualId % wheelSize.toLong).toInt)
    bucket.add(timerTaskEntry)
    // Set the bucket expiration time
   // 只有本 bucket 内的工作都过期后才会 bucket.setExpiration 返回 true 此时将 bucket 放入提早队列
    if (bucket.setExpiration(virtualId * tickMs)) {
     //bucket 是一个 TimerTaskList,它实现了 java.util.concurrent.Delayed 接口,外面是一个多任务组成的链表,图 2 有阐明
      queue.offer(bucket)
    }
    true
  } else {
    // Out of the interval. Put it into the parent timer
    // 工作的过期工夫不在本工夫轮周期内阐明须要降级工夫轮,如果不存在则结构上一层工夫轮,持续用上一层工夫轮增加工作
    if (overflowWheel == null) addOverflowWheel()
    overflowWheel.add(timerTaskEntry)
  }
} 

在本层级工夫轮里增加上一层工夫轮里的过程,留神的是在下一层工夫轮的 interval 为上一层工夫轮的 tickMs

private[this] def addOverflowWheel(): Unit = {
  synchronized {if (overflowWheel == null) {
      overflowWheel = new TimingWheel(
        tickMs = interval,
        wheelSize = wheelSize,
        startMs = currentTime,
        taskCounter = taskCounter,
        queue
      )
    }
  }
} 

驱动工夫轮滚动过程:

// 留神这里会存在一个递归,始终驱动工夫轮的指针滚动直到工夫有余于驱动下层的工夫轮滚动。def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {
   // 把以后工夫打平为工夫轮 tickMs 的整数倍
    currentTime = timeMs - (timeMs % tickMs)
    // Try to advance the clock of the overflow wheel if present
    // 驱动下层工夫轮,这里的传给下层的 currentTime 工夫是本层工夫轮打平过的,然而在下层工夫轮还是会持续打平
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  }
} 

这里是驱动源代码:

// 循环 bucket 外面的工作列表,一个个从新增加进工夫轮,对符合条件的工夫轮进行升降级或者执行工作
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
 
/*
 * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,
 * waits up to timeoutMs before giving up.
 */
def advanceClock(timeoutMs: Long): Boolean = {var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
  if (bucket != null) {writeLock.lock()
    try {while (bucket != null) {
        // 驱动工夫轮
        timingWheel.advanceClock(bucket.getExpiration())
       // 循环 buckek 也就是工作列表,工作列表一个个持续增加进工夫轮以此来降级或者降级工夫轮,把过期工作找进去执行
        bucket.flush(reinsert)
       // 循环
        // 这里就是从提早队列取出 bucket,bucket 是有延迟时间的,取出代表该 bucket 过期,咱们通过 bucket 能取到 bucket 蕴含的工作列表
        bucket = delayQueue.poll()}
    } finally {writeLock.unlock()
    }
    true
  } else {false}
} 

总结

kafka 的提早队列应用工夫轮实现,可能反对大量工作的高效触发,然而在 kafka 提早队列实现计划里还是看到了 delayQueue 的影子,应用 delayQueue 是对工夫轮外面的 bucket 放入提早队列,以此来推动工夫轮滚动,然而基于将插入和删除操作则放入工夫轮中,将这些操作的工夫复杂度都降为 O(1),晋升效率。Kafka 对性能的极致谋求让它把最合适的组件放在最适宜的地位。

正文完
 0