初始化网络连接

假如咱们音讯要发送到192.168.1.10的broker中,假如ID为1,在建设连贯的时候,会在缓存中,保留这个Node的连贯状态,包含DISCONNECTED, CONNECTING, CONNECTED,因为咱们是建设连贯,所以刚开始内存中的状态就是CONNECTING

与broker创立连贯,其实就是规范的NIO操作了,创立一个SocketChannel与192.168.1.10连贯,并在这个SocketChannel上注册了一个OP_CONNECT。

注册完后,就须要把SelectionKey封装成一个KafkaChannel,通过node的id作为映射,保留在内存里。并且SelectionKey和KafkaChannel须要做绑定,这样就能够把监听到的事件交给KafkaChannel进行解决。

到这里,咱们能够看到,ID为1的node对应着一个连贯状态以及一个KafkaChannel。如果创立连贯失败了,连贯状态就会被改为DISCONNECTED

如果连贯胜利了,状态就会改为CONNECTED

绑定OP_WRITE

在发送数据的时候,须要绑定OP_WRITE事件。在绑定之前,须要判断是否能够发送这个申请。

1、连贯是否曾经建设

当咱们内存中,有node对应的连贯状态,且这个连贯状态为CONNECTED,那就是曾经建设好了连贯。

2、Channel曾经筹备好了

内存中,有node对应的KafkaChannel,且这个KafkaChannel曾经筹备好了。

3、申请数量不超过5

这个是由max.in.flight.requests.per.connection设置的,默认为5,也就是说,在生产者客户端,最多只有5个申请,如果此时,曾经有5个申请了,那以后的申请就不能进行发送。

如果以上三个条件都满足,就会把申请缓存起来,用来判断不能超过5个申请数。

每个broker都对应这个一个队列,申请数量不超过5,指的是每个broker对应的申请数量。

而后从缓存中拿到KafkaChannel,通过KafkaChannel注册已OP_WRITE事件。

发送申请

发送之前,这里要先看看连贯状态,如果还没有连贯,那就要实现网络连接才能够发送。实现连贯后,开始监听OP_READ事件。

如果音讯曾经发送进来了,此时就须要把OP_WRITE移除掉。

因为可能如下图一样,可能有多个发送的OP_WRITE在SelectionKey中,那就须要进行迭代,把每个申请都发送进来。