共计 1035 个字符,预计需要花费 3 分钟才能阅读完成。
初始化网络连接
假如咱们音讯要发送到 192.168.1.10 的 broker 中,假如 ID 为 1,在建设连贯的时候,会在缓存中,保留这个 Node 的连贯状态,包含DISCONNECTED
, CONNECTING
, CONNECTED
,因为咱们是建设连贯,所以刚开始内存中的状态就是CONNECTING
。
与 broker 创立连贯,其实就是规范的 NIO 操作了,创立一个 SocketChannel 与 192.168.1.10 连贯,并在这个 SocketChannel 上注册了一个 OP_CONNECT。
注册完后,就须要把 SelectionKey 封装成一个 KafkaChannel,通过 node 的 id 作为映射,保留在内存里。并且 SelectionKey 和 KafkaChannel 须要做绑定,这样就能够把监听到的事件交给 KafkaChannel 进行解决。
到这里,咱们能够看到,ID 为 1 的 node 对应着一个连贯状态以及一个 KafkaChannel。如果创立连贯失败了,连贯状态就会被改为DISCONNECTED
。
如果连贯胜利了,状态就会改为CONNECTED
。
绑定 OP_WRITE
在发送数据的时候,须要绑定 OP_WRITE
事件。在绑定之前,须要判断是否能够发送这个申请。
1、连贯是否曾经建设
当咱们内存中,有 node 对应的连贯状态,且这个连贯状态为CONNECTED
,那就是曾经建设好了连贯。
2、Channel 曾经筹备好了
内存中,有 node 对应的 KafkaChannel,且这个 KafkaChannel 曾经筹备好了。
3、申请数量不超过 5
这个是由 max.in.flight.requests.per.connection
设置的,默认为 5,也就是说,在生产者客户端,最多只有 5 个申请,如果此时,曾经有 5 个申请了,那以后的申请就不能进行发送。
如果以上三个条件都满足,就会把申请缓存起来,用来判断不能超过 5 个申请数。
每个 broker 都对应这个一个队列,申请数量不超过 5,指的是每个 broker 对应的申请数量。
而后从缓存中拿到 KafkaChannel,通过 KafkaChannel 注册已 OP_WRITE
事件。
发送申请
发送之前,这里要先看看连贯状态,如果还没有连贯,那就要实现网络连接才能够发送。实现连贯后,开始监听 OP_READ
事件。
如果音讯曾经发送进来了,此时就须要把 OP_WRITE
移除掉。
因为可能如下图一样,可能有多个发送的 OP_WRITE
在 SelectionKey 中,那就须要进行迭代,把每个申请都发送进来。