乐趣区

关于java:kafka原理剖析5-broker启动及接收请求

1 Kafka broker 启动

(1)kafka 的 broker,代表一个节点,蕴含多个 partition,partition 有可能是 leader 或 follower。入口在 core 包的 Kafka 类,先后启动 KafkaServerStartable 和 KafkaServer , 接着启动了若干组件。每个组件简直都是封装的线程启动,都是对立格调的 startup。

(2)LogManager: 磁盘日志文件操作组件,加载的时候次要负责查看日志目录和加载日志文件,还有几个性能:

a 旧日志段删除,超过工夫阈值或大小进行删除,b 物理刷盘性能,默认操作系统管制刷盘,也提供配置定时工夫刷盘。c 检查点复原 检查点 checkponit 记录最初一次刷盘的 offset,异样敞开后可复原
d 分区目录删除  broker 收到 stopReplica 申请时,删除分区和对应的 segment

(3)SocketServer 网络组件,实现的 react 模式,负责收发申请。

(4)ReplicaManager 正本组件,负责正本治理,包含正本写数据,正本拉数据等。

(5)KafkaController 负责配合 zk,选举 leader 和 isr,broker 变动解决,分区调配解决等。

(6)GroupCoordinator 负责 consummer group 治理,包含 consummer 退出,心跳,来到等。

(7)kafkaRequestHandlerPool 是个线程池,KafkaApis 是用策略模式封装的工具类,能够解决各类型的申请。

2 kafka broker 收发申请的过程

(1)收发申请,外围是 socketServer 组件,随着 kafkaserver 启动一起启动起来,外围是创立了一个 Acceptor 对象和 Processor 数组,看名字也能看进去,这个必须是 reactor 网络模型。

(2)一个 acceptor 就是个线程,蕴含了多个 processor(默认三个)。
acceptor 其实是在 nioSelector 上注册了 accept 事件,一直轮训新的 accept 事件,期待建设连贯,

连贯设置有:非阻塞连贯,禁用 tcpdelay(缩小提早),keepalive 为 true 放弃连贯,

而后把这个新 accept 进去的 channel 丢给本人某个 processor(多个 processor 取模轮着给)。

(3)processor
看下面图,每个 processor 都有一个 selector,还有一个连贯队列。从 Acceptor 外面创立的新连贯,都会放入某一个 process 的连贯队列,而后 processor 的 selector 对本人的连贯队列一直注册事件并解决。收到的申请和解决后的返回,都会放到 requestChannel 列表里。外围就是线程的 run 办法如下:

其中:

 
a  configureNewConnections ( ),这里是拿到新连贯,注册 read 事件。b processNewResponses,  从名字就能看进去,是对申请解决完之后的 response 进行解决,这里有点跳跃,因为解决申请的过程在上面,这里是解决返回,从 requestChannel 列表里拿到 response,通过 sendResponse( ) 把 response 发回到申请方。发送的过程就是先拿到具体的 channel,而后通过 selector 的 send,把 responseshe 晓得 kafkaChannel 的发送对象里,同时注册 op_write 事件,前面就是 networkClient 的网络发送性能了,和 producer 客户端用的是一套。c poll 办法,kafkaselector 的 poll 办法,和 producer 的发送原理统一,通过 pollSelectionKeys,解决有事件达到的连贯,包含 connect 事件、read 事件、write 事件,通过 Selector 本人封装的一些队列存储两头后果,如 compeledReceives 代表承受残缺的申请,completedSends 代表发送实现的申请,stageReceives 是暂存队列等。在 producer 应用的时候都剖析过。d processCompleteReceives 从 selector 里循环遍历下面的 compeledReceives,这里寄存了 read 事件读取好的申请,而后封装成 request 放入
SocketServer 的 requestChannel, 这个 requestchannel 就是个队列,为前面别的线程拿 request 做筹备

e processCompletedSends, 解决发送实现的响应,channel 从新关注 read 事件。

(4)在上图左边,kafkaServer 启动时会启动 requestHandlerPool, 这个蕴含了 KafkaApis 工具类,能够操作各种不同类型申请。这个 pool 是个线程池,从下面 requestChannel 拿申请,应用 KafkaApis 解决,这就是个 策略模式的 handle 处理器,指向了不同办法。

以 prodduce 类型为例,调用 replicaManager.appendMessages 解决完生产者收回的音讯后,应用回调办法把 response 放回到 requestChannel 外面,外围办法:
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
前面网络组件就能够进行 response 的发送了。整个过程就残缺了。

退出移动版