关于kafka:kafka之五网络模型

40次阅读

共计 1528 个字符,预计需要花费 4 分钟才能阅读完成。

kafka server 在启动的时候会初始化 SocketServer、KafkaApis 和 KafkaRequestHandlerPool 这三个对象,它们是 kafka 网络解决模型的次要组成部分,kafka 所有的申请都是通过 TCP 网络以 socket 的形式进行通信的。该网络模型是基于 java NIO 机制实现的,依据其特点能够简略演绎为 1 +N+ M 模型,它与 Reactor 模式相似,其残缺流程图如下:

别离来看下 Acceptor,Processor,KafkaRequestHandle 这三局部:

  1. 1 个 Acceptor 线程,负责监听 socket 新的连贯申请,注册了了 OP_ACCEPT 事件,将新的连贯依照 round robin 形式交给对应的 Processor 线程解决。
  2. N 个 Processor 线程,其中每个 Processor 都有本人的 selector,它会向 Acceptor 调配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小可由 num.networker.threads 配置,默认为 3
  3. M 个 KafkaRequestHandler 线程解决申请,并将解决的后果返回给 Processor 线程对应的 response queue, 由 Processor 将解决的后果返回给相应的申请发送者,M 的大小可由 num.io.threads 配置,默认为 8

再对着上图说下流程:

  1. Acceptor 监听到新连贯,而后按 round robin 的形式交给对应的 Processor 进行解决。
  2. Processor 在 SocketChannel 上注册 OP_READ 事件并期待事件产生
  3. 当申请到来时,Processor 会将申请放入到一个 Request Queue 中,这是所有 Processor 共有的一个队列
  4. KafkaRequestHandler 申请从 Reqeust Queue 中取出,并调用相应 KafkaApis 进行解决(如果是 producer 生产申请,则将音讯写入到底层的磁盘日志中;如果是 fetch 申请,则从磁盘或页缓存中读取音讯)
  5. 解决的后果会放入到 Processor 对应的 Response Queue 中(每个申请都有标识它们来自哪个 Processor),Request Queue 的数量与 Processor 数量统一
  6. Processor 从对应 Response Queue 中取出 reponse 并返回给对应的请求者。

须要阐明下:
Request Queue 是所有 Processor 专用的一个队列,而 Response Queue 则是与 Processor 一一对的,因为每个 Processor 监听的 SocketChannel 并不是同一批,如果私有一个 Response Queue, 那么这 N 个 Processor 的 selector 就须要监听所有的 SocketChannel,而 Processor 与 Response Queue 一一对应则 Processor 相应的 selector 只须要关注调配给本人的 SocketChannel 即可。

再补充下 Purgatory 组件,如下图:

Purgatory 组件是用来缓存延时申请(Delayed Request)的,所谓延时申请,就是那些一时未满足条件不能立即解决的申请,比方设置了 acks=all 的 producer 申请,一旦设置了 acks=all,那么该申请就必须期待 ISR 中所有正本都接管了音讯后能力返回,此时算是该申请的 IO 线程就必须期待其余 broker 的写入后果。当申请不能立即解决时,它就会暂存在 Purgatory 中,稍后一旦满足了条件,IO 线程会持续解决该申请,并将 Response 放入对应网络线程的响应队列中。

最初再来一张 kafka 的结构图:

参考文章:深入浅出 kafka 原理 -4-kafka 网络机制原理

正文完
 0