共计 1366 个字符,预计需要花费 4 分钟才能阅读完成。
接管实现的数据
broker 的响应,是通过监听 OP_READ
事件开始的。当有 read 事件的时候,就会通过 SelectionKey 找到对应的 KafkaChannel。
KafkaChannel 接管到响应,就开始读取收到的数据,因为网络传输会有拆包的过程,所以接收数据的时候,就须要进粘包,kafka 是这样解决的,首先定义一个音讯的大小,用 4 个字节示意,而后前面跟着音讯内容。
比方 5aaaaa3bbb4cccc 这种音讯。
读取的时候会先拿到 5,再读取 aaaaa,这样第一个音讯就读取到了。
而后往下读,拿到 3,再读取 bbb,这样第二个音讯就读取到了。
而后往下读,拿到 4,再读取 cccc,这样第三个音讯就读取到了。
读完后,把数据存在 NetworkReceive,并放入 KafkaChannel 映射的队列中。
当解决完 SelectionKey 所有的事件后,就开始遍历每个 KafkaChannel 映射的队列,取出队列头的 NetworkReceive 寄存在 completedReceives。
不须要回调的申请
咱们在发送申请的时候,会把申请进行了缓存。
有些申请是不须要返回值的,此时就间接封装一个空的 ClientResponse,放入到 responses 列表。
并且从 inFlightRequests 中,移除对应的 ClientRequest。
须要回调的申请
completedReceives 是曾经拿到的申请,当咱们遍历 completedReceives 的时候,就会依据 node 的 id 把 inFlightRequests 缓存中对应的 ClientRequest 取出来,如果这个申请须要返回值,就会封装一个 ClientResponse,放入 ClientResponse 列表表。
同样的,对应的 ClientRequest 也会从 inFlightRequests 中移除。
失去连贯的申请
如果失去了连贯,首先就会更改状态为 DISCONNECTED。
而后把 inFlightRequests 对应的 ClientRequest 都新增一个 disconnected 为 true 的 ClientResponse。
失落连贯后,就会要求从新拉取元数据。
连贯的申请
这个操作就跟下面相同,把曾经是 CONNECTED 的状态,批改为 CONNECTED。
解决超时的申请
咱们的申请都缓存在 inFlightRequests 中,所以间接从 inFlightRequests 缓存中,拿出每个 node 对应的 ClientRequest 申请,依据发送申请工夫和以后的工夫比照,就晓得这个 ClientRequest 是否超时,如果超时了,就敞开对应的 Channel。其余操作跟下面的失去连贯的申请一样解决。
回调解决
此时,咱们曾经拿到了 ClientResponse 的列表,包含不解决的 ClientResponse、超时或者端口连贯 disconnected 为 true 的 ClientResponse 以及须要解决的 ClientResponse。
此时就间接遍历 ClientResponse,来解决这些回调。
如果响应里,有异样,然而能够容许重试,就会把 RecordBatch 从新放入 batchs 里。
如果有异样,然而不能重试,就把这个异样返回给咱们自定义的回调函数。
如果没有异样,就封装好 RecordMetadata 返回给咱们自定义的回调函数。