乐趣区

Kafka源码系列之Broker的IO服务及业务处理

一、kafka 角色

  Kafka 源码系列主要是以 kafka 0.8.2.2 源码为例。以看 spark 等源码的经验总结除了一个重要的看源码的思路:先了解部件角色和功能角色,然后逐个功能请求序列画图分析,最后再汇总。那么,下面再啰嗦一下,kafka 的角色。kafka 在生产中的使用,如下图所示。从图中可以看到其主要角色:1、Zookeeper:Broker 需要通过 ZooKeeper 记录集群的所有 Broker,controller 等信息,记录 Consumer 的消费消息的偏移量等信息。2、Broker: 主要负责管理数据,处理数据的生产、消费请求及副本的同步等信息。3、Topic: 标识一个类别的消息。4、Partition: 针对 topic 进行了进一步细分,增加并发度。牵涉到副本及 leader 选举。5、Producer: 主要与 Broker 进行交互,来生产消息到 broker。6、Consumer: 主要是从 Broker 上获取消息,将自己的消费偏移等信息记录与 zookeeper。从各个角色的功能来看,我们整个数据服务请求的中心就是 Broker,自然也是由 Broker 来负责各种事件处理及应答各个部件的。

二、Broker 请求及应答机制的实现

  在 JAVA 的网络 IO 模型彻底讲解的那篇文章里,已经彻底讲解了 Java 的各种网络 IO 实现的机制及优缺点。其实,kafka 的 Broker 就是通过 JAVA 的 NIO 来实现监听和请求处理及应答的。主要牵涉到的类:1)、KafkaServer
      该类代表了一个 kafka Broker 的生命周期,处理 kafka 启动或者停止所需要的所有功能。2)、SocketServer
      一个 NIO 服务中心。线程模型是,1 个 Acceptor 线程,用来处理新的链接请求,N 个加工 Processor 线程。每个线程拥有一个他们自己的 selector,主要负责 IO 请求及应答。3)、KafkaRequestHandler
      实际会在 KafkaRequestHandlerPool 中创建多个对象,负责加工处理 request 线程。会创建 M 个处理 Handler 线程。负责处理 request 请求,将 responses 重新写会加工线程 Processor,以便于其写回给客户端。4)、RequestChannel
      该类主要是封装了 requestQueue,responseQueues,responseListeners,便于个各类中同时引用并作出自己的处理。5)、KafkaApis

Kafka 多样请求的逻辑处理程序。
具体如图二所示:

下面讲解 1,2,3,4,5,具体含义:

  1、SocketServer.startup(),会启动一个后台线程,该线程会持有一个 acceptor,负责接收新的链接请求,并轮训所有的 Processor,将新的链接请求加入 Processor 对象的成员变量 ConcurrentLinkedQueue 里,Processor 会在其 run 方法里面处理。

[Scala] 纯文本查看 复制代码
?

// start accepting connections
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize,quotas)
Utils.newThread(“kafka-socket-acceptor”, acceptor, false).start()
acceptor.awaitStartup

  Processor 池的初始化 

[Scala] 纯文本查看 复制代码
?

for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i,

                            time, 
                            maxRequestSize, 
                            aggregateIdleMeter,
                            newMeter("IdlePercent","percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                            numProcessorThreads, 
                            requestChannel,
                            quotas,
                            connectionsMaxIdleMs)

Utils.newThread(“kafka-network-thread-%d-%d”.format(port, i), processors(i),false).start()
}

accepttor 轮训 Processor

[Scala] 纯文本查看 复制代码

val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {

var key: SelectionKey = null
try {
  key = iter.next
  iter.remove()
  if(key.isAcceptable)
     accept(key, processors(currentProcessor))
  else
     throw new IllegalStateException("Unrecognized key state for acceptor thread.")

  // round robin to the next processor thread
  currentProcessor = (currentProcessor + 1) % processors.length
} catch {case e: Throwable => error("Error while accepting connection", e)
}
  2、Processor 的 run 方法里面,会针对可读事件调用 read 方法里将 request 请求信息通过 requestChannel.sendRequest(req) 添加到 RequestChannel 的成员变量里面。

[Scala] 纯文本查看 复制代码
?
1
requestQueue = new ArrayBlockingQueueRequestChannel.Request

  3、在 KafkaServer 的 startup 方法里面构建 KafkaRequestHandlerPool 对象的时候,会构建若干 handler 线程。

[Scala] 纯文本查看 复制代码
?

for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads,requestChannel, apis)
threads(i) = Utils.daemonThread(“kafka-request-handler-” + i, runnables(i))
threads(i).start()
}

  在 KafakRequestHandler 的方法里面会对我们的 request 进行处理 

[Scala] 纯文本查看 复制代码
?
1
2
req = requestChannel.receiveRequest(300)
apis.handle(req)

  实际上,是通过 KafkaApis 对象的 handle 方法进行各种逻辑的处理的。

[Scala] 纯文本查看 复制代码
?

def handle(request: RequestChannel.Request) {
try{

trace("Handling request:" + request.requestObj + "from client:" + request.remoteAddress)
request.requestId match {case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
  case RequestKeys.FetchKey => handleFetchRequest(request)
  case RequestKeys.OffsetsKey => handleOffsetRequest(request)
  case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
  case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
  case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
  case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
  case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
  case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
  case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
  case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
  case requestId => throw new KafkaException("Unknown api code" + requestId)
}

} catch {

case e: Throwable =>
  request.requestObj.handleError(e, requestChannel, request)
  error("error when handling request %s".format(request.requestObj), e)

} finallyrequest.apiLocalCompleteTimeMs = SystemTime.milliseconds}

  4、在每一种请求处理结束之后会产生对应的 response

[Scala] 纯文本查看 复制代码
?
1
requestChannel.sendResponse(new RequestChannel.Response(request, newBoundedByteBufferSend(response)))

  并将 response 存储到 RequestChannel 的 responseQueues 存储。5、最终,由我们的 Processor 在其 run 方法里面,取出 RequestChannel 的 responseQueues 存储的时间,匹配到写事件,然后通过其 write 方法对具体的 request 进行应答。

[Scala] 纯文本查看 复制代码
?

else if(key.isWritable)
write(key)

三、总结

  这是一个典型的 Reactor 多线程模型,并且实现了 IO 线程和业务线程进行隔离。这样做的优点有以下几种:
  1、充分利用资源
      可以充分利用 CPU 资源,增加并发度,使业务响应速度加快。2、故障隔离:
     业务处理线程,无论是处理耗时,还是发生阻塞,都不会影响 IO 请求线程。保证服务器能在某些业务线程出故障的情况下,正常进行 IO 请求应答。3、可维护性
     职责单一,可维护性高,方便定位问题。
退出移动版