写在后面

kafka是一个分布式消息中间件,其高可用高吞吐的特点是大数据畛域首选的消息中间件,Kafka是分布式音讯队列的程序读写文件分段组织串联起来思维的鼻祖,包含RocketMq这些音讯队列都是借鉴了Kafka晚期的架构和设计思路革新而来,所以在架构设计层面,Kafka有十分多值得借鉴的中央。本文是作者介绍Kafka优良架构设计文章中的一篇,文中的代码和流程图均是base on 0.10.2.0版本。

引出环形队列和提早队列

从2个面试题说起,第1个问题,如果一台机器上有10w个定时工作,如何做到高效触发?

具体场景是:

有一个APP实时音讯通道零碎,对每个用户会保护一个APP到服务器的TCP连贯,用来实时收发音讯,对这个TCP连贯,有这样一个需要:“如果间断30s没有申请包(例如登录,音讯,keepalive包),服务端就要将这个用户的状态置为离线”。

其中,单机TCP同时在线量约在10w级别,keepalive申请包较扩散大略30s一次,吞吐量约在3000qps。

怎么做?

罕用计划应用time定时工作,每秒扫描一次所有连贯的汇合Map<uid, last_packet_time>,把连接时间(每次有新的申请更新对应连贯的连接时间)比以后工夫的差值大30s的连贯找进去解决。

另一种计划,应用环形队列法:

三个重要的数据结构:

1)30s超时,就创立一个index从0到30的环形队列(实质是个数组)

2)环上每一个slot是一个Set<uid>,工作汇合

3)同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里

这样当有某用户uid有申请包达到时:

1)从Map构造中,查找出这个uid存储在哪一个slot里

2)从这个slot的Set构造中,删除这个uid

3)将uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到

4)更新Map,这个uid对应slot的index值

哪些元素会被超时掉呢?

Current Index每秒种挪动一个slot,这个slot对应的Set<uid>中所有uid都应该被个体超时!如果最近30s有申请包来到,肯定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有申请包来到的。

所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。

两种计划比照:

计划一每次都要轮询所有数据,而计划二应用环形队列只须要轮询这一刻须要过期的数据,如果没有数据过期则没有数据要解决,并且是批量超时,并且因为是环形构造更加节约空间,这很适宜高性能场景。

第二个问题:在开发过程中有提早肯定工夫的工作要执行,怎么做?

如果不反复造轮子的话,咱们的抉择当然是提早队列或者Timer。

提早队列和在Timer中增 加延时工作采纳数组示意的最小堆的数据结构实现,每次放入新元素和移除队首元素工夫复杂度为O(nlog(n))。

工夫轮

计划二所采纳的环形队列,就是工夫轮的底层数据结构,它可能让须要解决的数据(工作的形象)集中,在Kafka中存在大量的提早操作,比方提早生产、提早拉取以及提早删除等。Kafka并没有应用JDK自带的Timer或者DelayQueue来实现提早的性能,而是基于工夫轮自定义了一个用于实现提早性能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的均匀工夫复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于工夫轮能够将插入和删除操作的工夫复杂度都降为O(1)。工夫轮的利用并非Kafka独有,其利用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在工夫轮的踪影。

工夫轮的数据结构

参考下图,Kafka中的工夫轮(TimingWheel)是一个存储定时工作的环形队列,底层采纳数组实现,数组中的每个元素能够寄存一个定时工作列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项示意的都是定时工作项(TimerTaskEntry),其中封装了真正的定时工作TimerTask。在Kafka源码中对这个TimeTaskList是用一个名称为buckets的数组示意的,所以前面介绍中可能TimerTaskList也会被称为bucket。

工夫轮相干名词解释

tickMs:工夫轮由多个工夫格组成,每个工夫格就是tickMs,它代表以后工夫轮的根本时间跨度。

wheelSize:代表每一层工夫轮的格数

interval:以后工夫轮的总体时间跨度,interval=tickMs × wheelSize

startMs:结构当层工夫轮时候的以后工夫,第一层的工夫轮的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),下层工夫轮的startMs为上层工夫轮的currentTime。

currentTime:示意工夫轮以后所处的工夫,currentTime是tickMs的整数倍(通过currentTime=startMs - (startMs % tickMs来保正currentTime肯定是tickMs的整数倍),这个运算类比钟表中分钟里65秒分针指针指向的还是1分钟)。currentTime能够将整个工夫轮划分为到期局部和未到期局部,currentTime以后指向的工夫格也属于到期局部,示意刚好到期,须要解决此工夫格所对应的TimerTaskList的所有工作。

工夫轮中的工作寄存

若工夫轮的tickMs=1ms,wheelSize=20,那么能够计算得出interval为20ms。初始状况下表盘指针currentTime指向工夫格0,此时有一个定时为2ms的工作插入进来会寄存到工夫格为2的TimerTaskList中。随着工夫的一直推移,指针currentTime一直向前推动,过了2ms之后,当达到工夫格2时,就须要将工夫格2所对应的TimeTaskList中的工作做相应的到期操作。此时若又有一个定时为8ms的工作插入进来,则会寄存到工夫格10中,currentTime再过8ms后会指向工夫格10。如果同时有一个定时为19ms的工作插入进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入到本来曾经到期的工夫格1中。总之,整个工夫轮的总体跨度是不变的,随着指针currentTime的一直推动,以后工夫轮所能解决的时间段也在一直后移,总体工夫范畴在currentTime和currentTime+interval之间。

工夫轮的升降级

如果此时有个定时为350ms的工作该如何解决?间接裁减wheelSize的大小么?Kafka中不乏几万甚至几十万毫秒的定时工作,这个wheelSize的裁减没有底线,就算将所有的定时工作的到期工夫都设定一个下限,比方100万毫秒,那么这个wheelSize为100万毫秒的工夫轮不仅占用很大的内存空间,而且效率也会拉低。Kafka为此引入了层级工夫轮的概念,当工作的到期工夫超过了以后工夫轮所示意的工夫范畴时,就会尝试增加到下层工夫轮中

参考上图,复用之前的案例,第一层的工夫轮tickMs=1ms, wheelSize=20, interval=20ms。第二层的工夫轮的tickMs为第一层工夫轮的interval,即为20ms。每一层工夫轮的wheelSize是固定的,都是20,那么第二层的工夫轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的工夫轮的总体时间跨度为8000ms。
方才提到的350ms的工作,不会插入到第一层工夫轮,会插入到interval=20*20的第二层工夫轮中,具体插入到工夫轮的哪个bucket呢?先用350/tickMs(20)=virtualId(17),而后virtualId(17) %wheelSize (20) = 17,所以350会放在第17个bucket。如果此时有一个450ms后执行的工作,那么会放在第三层工夫轮中,依照方才的计算公式,会放在第0个bucket。第0个bucket里会蕴含

[400,800)ms的工作。随着工夫流逝,当工夫过来了400ms,那么450ms后就要执行的工作还剩下50ms的工夫能力执行,此时有一个工夫轮降级的操作,将50ms工作从新提交到层级工夫轮中,那么此时50ms的工作依据公式会放入第二个工夫轮的第2个bucket中,此bucket的工夫范畴为[40,60)ms,而后再通过40ms,这个50ms的工作又会被监控到,此时间隔工作执行还有10ms,同样将10ms的工作提交到层级工夫轮,此时会退出到第一层工夫轮的第10个bucket,所以再通过10ms后,此工作到期,最终执行。

整个工夫轮的降级降级操作是不是很相似于咱们的时钟? 第一层工夫轮tickMs=1s, wheelSize=60,interval=1min,此为秒钟;第二层tickMs=1min,wheelSize=60,interval=1hour,此为分钟;第三层tickMs=1hour,wheelSize为12,interval为12hours,此为时钟。而钟表的指针就对应程序中的currentTime,这个前面剖析代码时候会讲到(对这个的了解也是工夫轮了解的重点和难点)。

Kafka中工作增加和驱动工夫轮滚动的外围流程:

重点代码介绍

这是往SystenTimer中增加一个工作

//在Systemtimer中增加一个工作,工作被包装为一个TimerTaskEntryprivate def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {//先判断是否能够增加进工夫轮中,如果不能够增加进去代表工作曾经过期或者工作被勾销,留神这里的timingWheel持有上一层工夫轮的援用,所以可能存在递归调用  if (!timingWheel.add(timerTaskEntry)) {    // Already expired or cancelled    if (!timerTaskEntry.cancelled)     //过期工作间接线程池异步执行掉      taskExecutor.submit(timerTaskEntry.timerTask)  }} 

timingWheel增加工作,递归增加直到增加该工作进适合的工夫轮的bucket中

def add(timerTaskEntry: TimerTaskEntry): Boolean = {  val expiration = timerTaskEntry.expirationMs  //工作勾销  if (timerTaskEntry.cancelled) {    // Cancelled    false  } else if (expiration < currentTime + tickMs) {    // 工作过期后会被执行    false  } else if (expiration < currentTime + interval) {//工作过期工夫比以后工夫轮工夫加周期小阐明工作过期工夫在本工夫轮周期内    val virtualId = expiration / tickMs    //找到工作对应本工夫轮的bucket    val bucket = buckets((virtualId % wheelSize.toLong).toInt)    bucket.add(timerTaskEntry)    // Set the bucket expiration time   //只有本bucket内的工作都过期后才会bucket.setExpiration返回true此时将bucket放入提早队列    if (bucket.setExpiration(virtualId * tickMs)) {     //bucket是一个TimerTaskList,它实现了java.util.concurrent.Delayed接口,外面是一个多任务组成的链表,图2有阐明      queue.offer(bucket)    }    true  } else {    // Out of the interval. Put it into the parent timer    //工作的过期工夫不在本工夫轮周期内阐明须要降级工夫轮,如果不存在则结构上一层工夫轮,持续用上一层工夫轮增加工作    if (overflowWheel == null) addOverflowWheel()    overflowWheel.add(timerTaskEntry)  }} 

在本层级工夫轮里增加上一层工夫轮里的过程,留神的是在下一层工夫轮的interval为上一层工夫轮的tickMs

private[this] def addOverflowWheel(): Unit = {  synchronized {    if (overflowWheel == null) {      overflowWheel = new TimingWheel(        tickMs = interval,        wheelSize = wheelSize,        startMs = currentTime,        taskCounter = taskCounter,        queue      )    }  }} 

驱动工夫轮滚动过程:

//留神这里会存在一个递归,始终驱动工夫轮的指针滚动直到工夫有余于驱动下层的工夫轮滚动。def advanceClock(timeMs: Long): Unit = {  if (timeMs >= currentTime + tickMs) {   //把以后工夫打平为工夫轮tickMs的整数倍    currentTime = timeMs - (timeMs % tickMs)    // Try to advance the clock of the overflow wheel if present    //驱动下层工夫轮,这里的传给下层的currentTime工夫是本层工夫轮打平过的,然而在下层工夫轮还是会持续打平    if (overflowWheel != null) overflowWheel.advanceClock(currentTime)  }} 

这里是驱动源代码:

//循环bucket外面的工作列表,一个个从新增加进工夫轮,对符合条件的工夫轮进行升降级或者执行工作private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */def advanceClock(timeoutMs: Long): Boolean = {  var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)  if (bucket != null) {    writeLock.lock()    try {      while (bucket != null) {        //驱动工夫轮        timingWheel.advanceClock(bucket.getExpiration())       //循环buckek也就是工作列表,工作列表一个个持续增加进工夫轮以此来降级或者降级工夫轮,把过期工作找进去执行        bucket.flush(reinsert)       //循环        //这里就是从提早队列取出bucket,bucket是有延迟时间的,取出代表该bucket过期,咱们通过bucket能取到bucket蕴含的工作列表        bucket = delayQueue.poll()      }    } finally {      writeLock.unlock()    }    true  } else {    false  }} 

总结

kafka的提早队列应用工夫轮实现,可能反对大量工作的高效触发,然而在kafka提早队列实现计划里还是看到了delayQueue的影子,应用delayQueue是对工夫轮外面的bucket放入提早队列,以此来推动工夫轮滚动,然而基于将插入和删除操作则放入工夫轮中,将这些操作的工夫复杂度都降为O(1),晋升效率。Kafka对性能的极致谋求让它把最合适的组件放在最适宜的地位。