1 Kafka broker 启动
(1)kafka的broker,代表一个节点,蕴含多个partition,partition有可能是leader或follower。入口在core包的Kafka类,先后启动KafkaServerStartable 和 KafkaServer , 接着启动了若干组件。每个组件简直都是封装的线程启动,都是对立格调的startup。
(2)LogManager: 磁盘日志文件操作组件,加载的时候次要负责查看日志目录和加载日志文件,还有几个性能:
a 旧日志段删除 ,超过工夫阈值或大小进行删除,b 物理刷盘性能 ,默认操作系统管制刷盘,也提供配置定时工夫刷盘。c 检查点复原 检查点checkponit记录最初一次刷盘的offset,异样敞开后可复原d 分区目录删除 broker收到stopReplica申请时,删除分区和对应的segment
(3)SocketServer 网络组件,实现的react模式,负责收发申请。
(4)ReplicaManager 正本组件,负责正本治理,包含正本写数据,正本拉数据等。
(5)KafkaController 负责配合zk,选举leader和isr,broker变动解决,分区调配解决等。
(6)GroupCoordinator 负责consummer group治理,包含consummer退出,心跳,来到等。
(7)kafkaRequestHandlerPool是个线程池,KafkaApis是用策略模式封装的工具类,能够解决各类型的申请。
2 kafka broker收发申请的过程
(1)收发申请,外围是socketServer组件,随着kafkaserver启动一起启动起来,外围是创立了一个Acceptor对象和Processor
数组,看名字也能看进去,这个必须是reactor网络模型。
(2)一个acceptor就是个线程,蕴含了多个processor(默认三个)。
acceptor其实是在nioSelector上注册了accept事件,一直轮训新的accept事件,期待建设连贯,
连贯设置有:非阻塞连贯,禁用tcpdelay(缩小提早),keepalive为true放弃连贯,
而后把这个新accept进去的channel丢给本人某个processor(多个processor取模轮着给)。
(3)processor
看下面图,每个processor都有一个selector,还有一个连贯队列。从Acceptor外面创立的新连贯,都会放入某一个process的连贯队列,而后processor的selector对本人的连贯队列一直注册事件并解决。收到的申请和解决后的返回,都会放到requestChannel列表里
。外围就是线程的run办法如下:
其中:
a configureNewConnections ( ) ,这里是拿到新连贯,注册read事件。 b processNewResponses, 从名字就能看进去,是对申请解决完之后的response进行解决,这里有点跳跃,因为解决申请的过程在上面,这里是解决返回,从requestChannel列表里拿到response,通过sendResponse( ) 把response发回到申请方。发送的过程就是先拿到具体的channel,而后通过selector的send,把responseshe 晓得kafkaChannel的发送对象里,同时注册op_write事件,前面就是networkClient的网络发送性能了,和producer客户端用的是一套。c poll办法,kafkaselector的poll 办法,和producer的发送原理统一,通过pollSelectionKeys,解决有事件达到的连贯,包含connect事件、read事件、write事件,通过Selector本人封装的一些队列存储两头后果,如compeledReceives代表承受残缺的申请,completedSends代表发送实现的申请,stageReceives是暂存队列等。在producer应用的时候都剖析过。d processCompleteReceives 从selector里循环遍历下面的compeledReceives,这里寄存了read事件读取好的申请,而后封装成request放入SocketServer的requestChannel, 这个requestchannel就是个队列,为前面别的线程拿request做筹备e processCompletedSends, 解决发送实现的响应,channel从新关注read事件。
(4)在上图左边,kafkaServer启动时会启动requestHandlerPool, 这个蕴含了KafkaApis工具类,能够操作各种不同类型申请。这个pool是个线程池,从下面requestChannel
拿申请,应用KafkaApis解决,这就是个策略模式的handle处理器
,指向了不同办法。
以prodduce类型为例,调用replicaManager.appendMessages解决完生产者收回的音讯后,应用回调办法把response放回到requestChannel外面,外围办法:
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
前面网络组件就能够进行response的发送了。整个过程就残缺了。