接管实现的数据

broker的响应,是通过监听OP_READ事件开始的。当有read事件的时候,就会通过SelectionKey找到对应的KafkaChannel。

KafkaChannel接管到响应,就开始读取收到的数据,因为网络传输会有拆包的过程,所以接收数据的时候,就须要进粘包,kafka是这样解决的,首先定义一个音讯的大小,用4个字节示意,而后前面跟着音讯内容。

比方5aaaaa3bbb4cccc这种音讯。

读取的时候会先拿到5,再读取aaaaa,这样第一个音讯就读取到了。

而后往下读,拿到3,再读取bbb,这样第二个音讯就读取到了。

而后往下读,拿到4,再读取cccc,这样第三个音讯就读取到了。

读完后,把数据存在NetworkReceive,并放入KafkaChannel映射的队列中。

当解决完SelectionKey所有的事件后,就开始遍历每个KafkaChannel映射的队列,取出队列头的NetworkReceive寄存在completedReceives。

不须要回调的申请

咱们在发送申请的时候,会把申请进行了缓存。

有些申请是不须要返回值的,此时就间接封装一个空的ClientResponse,放入到responses列表。

并且从inFlightRequests中,移除对应的ClientRequest。

须要回调的申请

completedReceives是曾经拿到的申请,当咱们遍历completedReceives的时候,就会依据node的id把inFlightRequests缓存中对应的ClientRequest取出来,如果这个申请须要返回值,就会封装一个ClientResponse,放入ClientResponse列表表。

同样的,对应的ClientRequest也会从inFlightRequests中移除。

失去连贯的申请

如果失去了连贯,首先就会更改状态为DISCONNECTED。

而后把inFlightRequests对应的ClientRequest都新增一个disconnected为true的ClientResponse。

失落连贯后,就会要求从新拉取元数据。

连贯的申请

这个操作就跟下面相同,把曾经是CONNECTED的状态,批改为CONNECTED。

解决超时的申请

咱们的申请都缓存在inFlightRequests中,所以间接从inFlightRequests缓存中,拿出每个node对应的ClientRequest申请,依据发送申请工夫和以后的工夫比照,就晓得这个ClientRequest是否超时,如果超时了,就敞开对应的Channel。其余操作跟下面的失去连贯的申请一样解决。

回调解决

此时,咱们曾经拿到了ClientResponse的列表,包含不解决的ClientResponse、超时或者端口连贯disconnected为true的ClientResponse以及须要解决的ClientResponse。

此时就间接遍历ClientResponse,来解决这些回调。

如果响应里,有异样,然而能够容许重试,就会把RecordBatch从新放入batchs里。

如果有异样,然而不能重试,就把这个异样返回给咱们自定义的回调函数。

如果没有异样,就封装好RecordMetadata返回给咱们自定义的回调函数。