kafka server在启动的时候会初始化SocketServer、KafkaApis 和 KafkaRequestHandlerPool这三个对象,它们是kafka网络解决模型的次要组成部分,kafka所有的申请都是通过TCP网络以socket的形式进行通信的。该网络模型是基于java NIO机制实现的,依据其特点能够简略演绎为1+N+M模型,它与Reactor模式相似,其残缺流程图如下:
别离来看下Acceptor,Processor,KafkaRequestHandle这三局部:
- 1个Acceptor线程,负责监听socket新的连贯申请,注册了了OP_ACCEPT事件,将新的连贯依照round robin形式交给对应的Processor线程解决。
- N个Processor线程,其中每个Processor都有本人的selector,它会向Acceptor调配的SocketChannel注册相应的OP_READ事件,N的大小可由
num.networker.threads
配置,默认为3 - M个KafkaRequestHandler线程解决申请,并将解决的后果返回给Processor线程对应的response queue,由Processor将解决的后果返回给相应的申请发送者,M的大小可由
num.io.threads
配置,默认为8
再对着上图说下流程:
- Acceptor监听到新连贯,而后按round robin的形式交给对应的Processor进行解决。
- Processor在SocketChannel上注册OP_READ事件并期待事件产生
- 当申请到来时,Processor会将申请放入到一个Request Queue中,这是所有Processor共有的一个队列
- KafkaRequestHandler申请从Reqeust Queue中取出,并调用相应KafkaApis进行解决(如果是producer生产申请,则将音讯写入到底层的磁盘日志中;如果是fetch申请,则从磁盘或页缓存中读取音讯)
- 解决的后果会放入到Processor对应的Response Queue中(每个申请都有标识它们来自哪个Processor),Request Queue的数量与Processor数量统一
- Processor从对应Response Queue中取出reponse并返回给对应的请求者。
须要阐明下:
Request Queue是所有Processor专用的一个队列,而Response Queue则是与Processor一一对的,因为每个Processor监听的SocketChannel并不是同一批,如果私有一个Response Queue,那么这N个Processor的selector就须要监听所有的SocketChannel,而Processor与Response Queue一一对应则Processor相应的selector只须要关注调配给本人的SocketChannel即可。
再补充下Purgatory组件,如下图:
Purgatory组件是用来缓存延时申请(Delayed Request)的,所谓延时申请,就是那些一时未满足条件不能立即解决的申请,比方设置了acks=all的producer申请,一旦设置了acks=all,那么该申请就必须期待ISR中所有正本都接管了音讯后能力返回,此时算是该申请的IO线程就必须期待其余broker的写入后果。当申请不能立即解决时,它就会暂存在Purgatory中,稍后一旦满足了条件,IO线程会持续解决该申请,并将Response放入对应网络线程的响应队列中。
最初再来一张kafka的结构图:
参考文章:深入浅出kafka原理-4-kafka网络机制原理