共计 4982 个字符,预计需要花费 13 分钟才能阅读完成。
一、Producer 的网络模型
咱们后面几篇有说 Producer 发送流程的源码剖析,但那个是大的轮廓,波及到发送很多相干的内容,比方:
- 获取 topic 的 metadata 信息
- key 和 value 的序列化
- 获取该 record 要发送到的 partition
- 向 RecordAccmulator 中追加 record 数据
- 唤醒 sender 线程发送 RecordBatch
那这篇老周次要来说下 Producer 的网络模型,这里间接给出 Producer 的网络模型图,如下:
从图中能够看出,KafkaProducer 相当于客户端,与 Sender 调用层交互,Sender 调用 NetworkClient,NetworkClient 调用 Selector,而 Selector 底层封装了 Java NIO 的相干接口。心中有了 Producer 的网络模型大抵轮廓后,咱们接下来就能够来剖析 Producer 的网络模型。
二、Producer 与 Broker 的交互流程
咱们在业务代码通过生产者 producer 调用 send 办法来发送音讯,不难发现都是通过走 Producer 的实现类 KafkaProducer 的 send 办法:
2.1 org.apache.kafka.clients.producer.KafkaProducer#doSend
下面的两个 send 办法最终会走到 doSend 办法里来:
这块的源码老周在前两篇的 Producer 源码解析那一篇剖析了的哈,这里次要说下与 Broker 通信的交互剖析。次要有两点:
- waitOnMetadata():申请 tp(topic-partition)元数据 metadata 更新,两头会调用 sender.wakeup()。
- ccumulator.append():将 record 对应的 tp 写入到 deque 中,如果该 tp 对应的 deque batch 是满了或者新建了一个 batch,则会调用 sender.wakeup()。
次要看下 sender.wakeup() 办法,次要作用就是将 Sender 线程从阻塞中唤醒。
2.2 org.apache.kafka.clients.producer.internals.Sender#wakeup
/**
* Wake up the selector associated with this send thread
*/
public void wakeup() {this.client.wakeup();
}
/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {this.selector.wakeup();
}
/**
* Interrupt the nioSelector if it is blocked waiting to do I/O.
*/
@Override
public void wakeup() {this.nioSelector.wakeup();
}
不难发现,调用链是:
Sender -> NetworkClient -> Selector(Kafka 封装的)-> Selector(java.nio.channels.Selector Java NIO)
wakeup() 的次要作用就是唤醒阻塞在 select()/select(long) 上的线程,为什么要唤醒?因为注册了新的 channel 或者事件。
再回到 Kafka 这里,KafkaProducer 中 dosend() 办法调用 sender.wakeup() 办法作用就很显著了。作用就是:当有新的 RecordBatch 创立后,旧的 RecordBatch 就能够发送了,如果线程阻塞在 select() 办法中,就将其唤醒,Sender 从新开始运行 run() 办法,在这个办法中,旧的 RecordBatch 将会被选中,进而能够及时将这些申请发送进来。
2.3 org.apache.kafka.clients.producer.internals.Sender#run
跟到 runOnce 办法里去:
持续跟,外围是 Sender 线程每次循环具体执行的中央,即 sendProducerData() 办法:
最初调用 client.poll() 办法,对于 socket 的一些理论的读写操作。
咱们来小结一下 Sender.run() 办法的大抵流程,次要分为以下五步:
- accumulator.ready():遍历整个 batches(key:TopicPartition,value: Deque<ProducerBatch>>),如果 ProducerBatch 不为空,就将其对应的 leader 选出来,最初会返回一个能够发送 ReadyCheckResult 实例,readyNodes 是次要的成员变量。
- 如果有 tp 的 leader 是未知的,就强制 metadata 更新。遍历未知 leader 的主题(蕴含 leader 选举未决的主题以及可能曾经过期的主题),再次将主题增加到元数据以确保它被蕴含并申请元数据更新,因为有音讯要发送到该主题,调用 requestUpdate() 办法来更新。
- accumulator.drain():遍历每个 leader(第一步中选出)节点,获取该节点上所有的 tp,如果该 tp 对应的 ProducerBatch 不在 backoff 期间(没有重试过或者重试了然而距离曾经达到了 retryBackoffMs),并且 ProducerBatch 的大小不大于 maxSize(一个 request 的最大限度默认 1 MB)或 ProducerBatch 的汇合是空的,那么就把这个 ProducerBatch 增加 list 中,最终返回的类型为 Map<Integer, List<ProducerBatch>>,key 为 leader.id,value 为要发送的 ProducerBatch 的列表。
- sendProduceRequests():发送 Producer 申请,这个办法会调用 NetworkClient.send() 来发送 clientRequest。
- NetworkClient.poll():对于 socket 的一些理论的读写操作,这个办法会持续调用 Kafka 封装的 Selector.poll(),跟进去底层是调用的 Java NIO 的 Selector.poll()。
2.4 org.apache.kafka.clients.NetworkClient#poll
次要分为以下三步:
- metadataUpdater.maybeUpdate():判断是否须要更新 metadata 元数据。抉择具备起码未实现申请且至多合乎连贯条件的节点。此办法将更喜爱具备现有连贯的节点,但如果所有现有连贯都在应用,则可能会抉择咱们还没有连贯的节点。如果不存在连贯,则此办法将首选最近连贯尝试起码的节点。这种办法永远不会抉择一个没有现有连贯的节点,并且咱们在从新连贯回退期间断开了连贯,或者抉择了一个被限度的流动连贯。
- selector.poll():进行 socket 相干的 IO 操作。
-
解决实现后的操作:在一个 select() 过程之后的相干解决
- handleAbortedSends(responses):如果因为不受反对的版本异样或断开连接而停止发送,请立刻解决,无需期待 Selector#poll。
- handleCompletedSends(responses, updatedNow):解决任何已实现的申请发送。特地是如果预期没有响应,则认为申请已实现。
- handleCompletedReceives(responses, updatedNow):解决任何已实现的接管并应用接管到的响应更新响应列表。(MetadataResponse、ApiVersionsResponse 都是在这解决的)
- handleDisconnections(responses, updatedNow):解决连贯失败的那些连贯,从新申请 metadata。
- handleConnections():解决新建设的那些连贯(还不能发送申请,比方:还未认证)
- handleInitiateApiVersionRequests(updatedNow):对那些新建设的连贯,发送 apiVersionRequest(默认状况:第一次建设连贯时,须要向 Broker 发送 ApiVersionRequest 申请)
- handleTimedOutRequests(responses, updatedNow):解决 timeout 的连贯,敞开该连贯,并刷新 metadata。
2.5 org.apache.kafka.common.network.Selector#poll
Kafka 中的 Selector 类次要是 Java NIO 相干接口的封装,Socket 相干 IO 操作都是在这个类中实现的。次要操作都是在上面这个办法中调用的:
2.5.1 org.apache.kafka.common.network.Selector#clear
2.5.2 org.apache.kafka.common.network.Selector#select
Selector.select() 办法底层还是调用的 Java NIO 的原生接口,这里的 nioSelector 其实就是 java.nio.channels.Selector 的实例对象,这个办法最坏状况下,会阻塞 ms 的工夫,如果在一次轮询,只有有一个 Channel 的事件就绪,它就会立马返回。
/**
* Check for data, waiting up to the given timeout.
*
* @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
*/
private int select(long timeoutMs) throws IOException {if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
2.5.3 org.apache.kafka.common.network.Selector#pollSelectionKeys
这部分代码是 socket IO 的次要局部,发送 Send 及接管 Receive 都是在这里实现的,在 poll() 办法中,这个办法会调用三次:
- 解决从具备缓冲数据的通道的轮询事件
- 解决曾经就绪的事件,进行相应的 IO 操作。
- 解决新建设的那些连贯,增加缓存及传输层的握手与认证。
2.5.4 org.apache.kafka.common.network.Selector#addToCompletedReceives
/**
* adds a receive to completed receives
*/
private void addToCompletedReceives(KafkaChannel channel, NetworkReceive networkReceive, long currentTimeMs) {if (hasCompletedReceive(channel))
throw new IllegalStateException("Attempting to add second completed receive to channel" + channel.id());
// 增加到 completedReceives 中
this.completedReceives.put(channel.id(), networkReceive);
sensors.recordCompletedReceive(channel.id(), networkReceive.size(), currentTimeMs);
}
接管到的所有 Receive 都会被放入到 completedReceives 的汇合中期待后续解决。
三、总结
总结的话我就不复述了,下面的源码流程说的很分明了。最初再来一张 Producer 网络模型的时序图:
心愿对你有所帮忙,咱们下期再见。