共计 15541 个字符,预计需要花费 39 分钟才能阅读完成。
之前三节咱们 次要剖析了 KafkaProducer 是如何将音讯放入到内存缓冲区的。
下面的逻辑只是 Accumulator.append()的一段外围逻辑而已,还记得之前咱们剖析过的 KafkaProducerHelloWorld 的整体逻辑么?
之前剖析的代码逻辑如下图所示:
从最开始配置解析,音讯对象 Record 的创立,到元数据拉取、key 和 value 的最后序列化、Product 分区路由的原理、音讯如何放入内存缓冲区的原理。
之前咱们曾经剖析到了图中红线的局部的结尾了—唤醒 Sender 线程发送音讯。
这一节咱们就持续剖析,音讯放入了内存缓冲中之后,触发唤醒 Sender 线程,之后 Sender 线程如何将打包好 Batch 发送进来的。
什么条件会唤醒 Sender 线程
从下面的流程图能够看到,在 producer.send()执行 doSend()的时候,accumulator.append()将音讯内存缓冲器之后,会唤醒 Sender 线程。
那咱们来看下 RecordBatch 放入缓冲器后,什么条件会唤醒 Sender 线程呢?
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 1.waitOnMetadata 期待元数据拉取
// 2.keySerializer.serialize 和 valueSerializer.serialize,很显著就是将 Record 序列化成 byte 字节数组
// 3. 通过 partition 进行路由分区,依照肯定路由策略抉择 Topic 下的某个分区
// 省略代码...
// 4.accumulator.append 将音讯放入缓冲器中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
//5. 唤醒 Sender 线程的 selector.select()的阻塞,开始解决内存缓冲器中的数据。this.sender.wakeup();}
return result.future;
} catch (ApiException e) {log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (Exception e) {throw e;}
// 省略其余各种异样捕捉
}
从下面代码,能够很分明的看到,唤醒 sender 线程的逻辑很简略,就是以后 Batch 曾经写满,或者是新的 batch 创立了。
result.batchIsFull || result.newBatchCreated
那么这两变量什么时候设置的呢?
在上一节中 RecordBatch.tryAppned 是创立新的 batch,而 RecordAccumulator.tryAppend()次要是追加写 batch。他们会设置 batchIsFull 和 newBatchCreated 的标记。示意是新创建还是写满的 batch。
次要代码如下:
new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true)
public final static class RecordAppendResult {
public final FutureRecordMetadata future;
public final boolean batchIsFull;
public final boolean newBatchCreated;
public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
this.future = future;
this.batchIsFull = batchIsFull;
this.newBatchCreated = newBatchCreated;
}
}
public boolean isFull() {return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();
}
当满足条件后,最终会触发到sender.wakeup()
唤醒之前 while 循环阻塞的 Selector(),筹备发送音讯。整个过程如下所示:
唤醒的 Sender 线程如何发送 batch 音讯的?
既然最终触发了 sender.wakeUp(),你应该晓得底层触发的就是 NioSelector 的 wakeup。唤醒的是哪一个流程呢?咱们先来回顾下,之前《Kafka 成长记 4 元数据拉取 下》Sender 线程的 run 的次要脉络在如下图所示:
也就是之前剖析拉取元数据的时候,外围就是 NetworkClient.poll()的外部次要 3 步 maybeUpdate()–>Selector()–>hanlde()。
最终拉取元数据胜利后,会再次阻塞在 Selector.select()期待,而此时 sender.waykeUp()就会唤醒阻塞继续执行 run 办法了。
然而 NetworkClient.poll()外层还有一堆令人蛊惑的代码,不晓得大家记不记得?
void run(long now) {Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {Node node = iter.next();
if (!this.client.ready(node, now)) {iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}
之前第一次剖析 run 办法时,你发现这里有一堆令人蛊惑的逻辑,通过 debug 形式咱们发现理论执行到的最初一行只有 client.poll() 办法。
而还有一堆之前的一对令人蛊惑的逻辑:
accumulator.ready()、client.ready(node, now)
accumulator.drain()
accumulator.abortExpiredBatches()
client.send(request, now)
如下所示:
唤醒 Sender 之后,当初咱们曾经有一个 batch 了,这些逻辑有哪一些触发到了呢?咱们一起来看下吧。
RecordAccumulator.ready()在筹备什么?
然而发送音讯前要做一些查看,比方 对应依据分区号找到对应的 Broker、Broker 连贯的查看、batch 超时查看 等等。
//Sender.java
run(){
// 第一次 Ready
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now)
// 省略...
client.poll();}
public ReadyCheckResult ready(Cluster cluster, long nowMs) {Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
if (leader == null) {unknownLeadersExist = true;} else if (!readyNodes.contains(leader) && !muted.contains(part)) {synchronized (deque) {RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {readyNodes.add(leader);
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
能够看到发送音讯前的 Accumulator.ready()次要的脉络就是:
通过遍历内存缓冲器中 batches 的这个内存汇合,通过以一些判断,决定 batch 是否能够被发送。
1)第一个判断就是要发送分区的 batchs 对应的 Broker 必须存在。
2)如果 leader 存在,则计算一些条件,比方:
backingOff,发送这个 batch 的工夫 + 重试距离的工夫,是否大于了以后工夫,默认是距离是 100ms,个别 attempts 是 0,不重试。然而如果 attempts>0,则重试时,过了间隔时间才能够发送 batch。
waitedTimeMs 示意以后工夫减去这个 Batch 被创立进去的那个工夫,这个 Batch 从创立开始到当初曾经期待了多久了
timeToWaitMs 这个 Batch 从创立开始算起,最多期待多久就必须去发送。如果重试阶段,这个工夫就是重试距离,然而在非重试的初始阶段,就是 linger.ms 的工夫(100ms),就是到了 100ms 这个 batch 必须发送进来。
full 示意 Batch 是否已满,如果说 Dequeue 里超过一个 Batch 了,阐明这个 peekFirst 返回的 Batch 就肯定是曾经满的,另外就是如果假如 Dequeue 里只有一个 Batch,然而判断发现这个 Batch 达到了 16kb 的大小,也是已满的
expired 以后 Batch 曾经期待的工夫(120ms)>= Batch 最多只能期待的工夫(100ms),曾经超出了 linger.ms 的工夫范畴了,否则呢,60ms < 100ms,此时就没有过期。如果 linger.ms 默认是 0,就意味着说,只有 Batch 创立进去了,在这个中央肯定是 expired = true
最终下面的条件组合成一个条件:
boolean sendable = full || expired || exhausted || closed || flushInProgress()
条件成立,则将筹备好的 ReadyCheckResult 数据返回,表明是否发送 batch,并且发送到指明发送到哪一个 Broker。
3)最初一点其实就是每次循环每个 TopicPartition 只是取出第一个 batch, 进行判断。如果一个都不满足条件,会取所有 Partition 中 timeLeftMs 最小的工夫期待发送 Batch。这算是尽可能疾速的发送 batch 的优化的吧,这个细节其实大家能够不必在刻意记住,晓得大体发送逻辑就够了,晓得 Kafka 有一些优化和考量就够了。
下面过程 整体如下图所示:
外围次要查看了对应依据分区号找到对应的 Broker、Broker 连贯的查看、batch 超时查看等信息。
NetWorkClient.ready()在筹备什么?
在 run 办法中,次要有两个 ready,其中一个咱们曾经剖析过了,另一个 ready 是在做什么呢?
其实从名字上看,当然是在筹备网络连接了。代码如下:
//Sender.java
run(){
// 第一次 Ready
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now)
// 第二次 ready
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {Node node = iter.next();
if (!this.client.ready(node, now)) {iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 省略...
client.poll();}
@Override
public boolean ready(Node node, long now) {if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node" + node);
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}
public boolean isReady(Node node, long now) {
// if we need to update our metadata now declare all requests unready to make metadata requests first
// priority
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
private boolean canSendRequest(String node) {return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
其实这里代码脉络很清晰,就是次要判断了:
元数据是否须要拉取 + 网络连接 Channel 是否筹备好 + 连贯状态是否曾经 Connected+inFlightRequests 正在发送的数量有没有超过 5 个(这个 inFlightRequests 之前拉取元数据的原理中遇见过,每次通过 NIO 发送的申请,如果没有收到响应,inFlightRequests 这里会记录正在发送的申请)
如果上述都筹备好了,就能够发送 batch 了,如果没有筹备好,该须要拉取元数据就拉取,该须要建设 broker 连贯就建设,这个就没什么好说的了。
通过两次 ready 执行,Sender 的 run()办法执行如下图:
申请的筹备 Accumulator.drain()和 ClientRequest 创立
执行完两次 ready,在之后就是 Accumulator.drain()办法了。drain 办法其实逻辑从正文上就能显著看进去。
//Sender.java
run(){// 第一次 Ready accumulator.ready(cluster, now)
// 第二次 Ready networkclient.ready
// 申请的会集 Accumulator.drain
Map<Integer, List<RecordBatch>> batches =
this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);
// 省略...
client.poll();}
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* @param now The current unix time in milliseconds
* @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
Set<Node> nodes,
int maxSize,
long now) {if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// Only proceed if the partition has no in-flight batches.
if (!muted.contains(tp)) {Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {synchronized (deque) {RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {RecordBatch batch = deque.pollFirst();
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
联合正文和整体脉络,就是一个基于之前 ready 好的 Node(broker)的列表的 for 循环。咱们能够得出如下论断:
因为默认咱们是依照 topic-partition 的构造记录 batch 音讯的,为了确认每一个 Broker 须要发送那些 partition 中的 batchs 音讯,须要基于 Node 组装好 Batchs,而不是基于 topic 组装好 baths。
Accumulator.drain()就是这件事的,获取 broker 上所有的 partition,遍历 broker 上的所有的 partitions,对每个 partition 获取到 dequeue 里的 first batch,放入待发送到 broker 的列表里,每个 broker 都有一个 batches 列表,造成一个 map 记录下来。key 是 brokerId,value 是 batches 列表。
整体如下图所示:
2 次 ready 之后,drain 会集数据之后,次要进行是 batchs 申请的最终构建,createClientRequest。
//Sender.java
run(){
// 2 次 Ready、drain 之后
// createProduceRequests 操作
List<ClientRequest> requests = createProduceRequests(batches, now);
// 省略...
client.poll()}
/**
* Transfer the record batches into a list of produce requests on a per-node basis
*/
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
}
/**
* Create a produce request from the given record batches
*/
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
return new ClientRequest(now, acks != 0, send, callback);
}
咱们想要发送音讯必定不是间接吧 batchs 这个 List 发送进来,最终还是要进行一个简略封装的。你还记拉取元数据的申请是怎么封装的么?
能够看到是有一套封装申请逻辑的。同理这里发送音讯时也是相似的,之前咱们序列化好的 RecordBatch 音讯,实质还是 ByteBuffer,这里通过一系列对象再次补充了一些新次要是额定减少了 api key,api version,acks,request timeout 等数据信息。如下图所示:
最初的筹备 setSend()
最初筹备是一次 setSend 的操作,下面一系列筹备后,连贯筹备好了,申请也筹备好了,都 ok 之后,因为当初唤醒了 Selector,依照 NIO 的通信机制,接着咱们须要将 SelectKey 的 Ops 操作更新为 Wirte,示意须要写数据给 Broker,接着进行 write 操作即可。所以这里会执行一次 setSend 操作, 代码次要如下:
//Sender.java
run(){
// 2 次 Ready、drain、createClientRequest 之后
// setSend 操作
for (ClientRequest request : requests)
client.send(request, now);
// 省略...
client.poll();}
public void send(ClientRequest request, long now) {String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node" + nodeId + "which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
//org.apache.kafka.common.network.Selecotor.java
public void send(Send send) {KafkaChannel channel = channelOrFail(send.destination());
try {channel.setSend(send);
} catch (CancelledKeyException e) {this.failedSends.add(send.destination());
close(channel);
}
}
// org.apache.kafka.common.network.KafkaChannel
public void setSend(Send send) {if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
这个过程和之前发送元数据拉取申请是相似的,你应该很有些相熟,能够见《Kafka 成长记 4 发送元数据拉取申请》。
整体如下图所示:
最终的音讯发送
通过后面一系列筹备,2 次的 reday、drain、clientRequest 的创立,setSend 设置对应 SelectChannel 关注写事件这些一系列筹备都 ok 后,终于要执行到音讯发送了。
其实有了之前发送元数据的教训,最终咱们必定是 通过 poolSelectKey-> 之后 Handle 类的办法解决的。
1)首先发送音讯的写申请, 其实就是依靠 inFlightRequests 去暂存了正在发送的 Request,通过 channel.write 写数据进来,之后执行 handleCompletedSend 办法
2)当音讯发送胜利后,勾销对 OP_WRITE 事件的关注,会承受到返回信息进行 handreceives 办法,否则如果一个 Request 的数据都没发送结束,此时还须要放弃对 OP_WRITE 事件的关注,关注 NIO 里的 OP_READ 事件呢key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
(具体代码逻辑就是在对应的 handle 办法,比较简单,我就不开展带大家看了,之后下一节解说粘包拆包的时候会再次钻研这块逻辑的)
整个过程大体如下图所示:
小结
最初,到这里 Kafka 内存缓冲区中的音讯最终如何发送进来的就带大家一起钻研明确了。
其实之前 9 节内容都比拟干货,解说的比拟粗疏,不能说算是精读源码,然而必定是 Kafka 的外围源码都摸透了。
《Kafka Producer 篇》预计还有几节就完结了
我是枯萎,咱们下一节见~
本文由博客一文多发平台 OpenWrite 公布!