关于kafka:Kafka-网络请求

34次阅读

共计 1035 个字符,预计需要花费 3 分钟才能阅读完成。

初始化网络连接

假如咱们音讯要发送到 192.168.1.10 的 broker 中,假如 ID 为 1,在建设连贯的时候,会在缓存中,保留这个 Node 的连贯状态,包含DISCONNECTED, CONNECTING, CONNECTED,因为咱们是建设连贯,所以刚开始内存中的状态就是CONNECTING

与 broker 创立连贯,其实就是规范的 NIO 操作了,创立一个 SocketChannel 与 192.168.1.10 连贯,并在这个 SocketChannel 上注册了一个 OP_CONNECT。

注册完后,就须要把 SelectionKey 封装成一个 KafkaChannel,通过 node 的 id 作为映射,保留在内存里。并且 SelectionKey 和 KafkaChannel 须要做绑定,这样就能够把监听到的事件交给 KafkaChannel 进行解决。

到这里,咱们能够看到,ID 为 1 的 node 对应着一个连贯状态以及一个 KafkaChannel。如果创立连贯失败了,连贯状态就会被改为DISCONNECTED

如果连贯胜利了,状态就会改为CONNECTED

绑定 OP_WRITE

在发送数据的时候,须要绑定 OP_WRITE 事件。在绑定之前,须要判断是否能够发送这个申请。

1、连贯是否曾经建设

当咱们内存中,有 node 对应的连贯状态,且这个连贯状态为CONNECTED,那就是曾经建设好了连贯。

2、Channel 曾经筹备好了

内存中,有 node 对应的 KafkaChannel,且这个 KafkaChannel 曾经筹备好了。

3、申请数量不超过 5

这个是由 max.in.flight.requests.per.connection 设置的,默认为 5,也就是说,在生产者客户端,最多只有 5 个申请,如果此时,曾经有 5 个申请了,那以后的申请就不能进行发送。

如果以上三个条件都满足,就会把申请缓存起来,用来判断不能超过 5 个申请数。

每个 broker 都对应这个一个队列,申请数量不超过 5,指的是每个 broker 对应的申请数量。

而后从缓存中拿到 KafkaChannel,通过 KafkaChannel 注册已 OP_WRITE 事件。

发送申请

发送之前,这里要先看看连贯状态,如果还没有连贯,那就要实现网络连接才能够发送。实现连贯后,开始监听 OP_READ 事件。

如果音讯曾经发送进来了,此时就须要把 OP_WRITE 移除掉。

因为可能如下图一样,可能有多个发送的 OP_WRITE 在 SelectionKey 中,那就须要进行迭代,把每个申请都发送进来。

正文完
 0