之前三节咱们次要剖析了KafkaProducer是如何将音讯放入到内存缓冲区的。
下面的逻辑只是Accumulator.append()的一段外围逻辑而已,还记得之前咱们剖析过的KafkaProducerHelloWorld的整体逻辑么?
之前剖析的代码逻辑如下图所示:
从最开始配置解析,音讯对象Record的创立,到元数据拉取、key和value的最后序列化、Product分区路由的原理、音讯如何放入内存缓冲区的原理。
之前咱们曾经剖析到了图中红线的局部的结尾了—唤醒Sender线程发送音讯。
这一节咱们就持续剖析,音讯放入了内存缓冲中之后,触发唤醒Sender线程,之后Sender线程如何将打包好Batch发送进来的。
什么条件会唤醒Sender线程
从下面的流程图能够看到,在producer.send()执行doSend()的时候,accumulator.append()将音讯内存缓冲器之后,会唤醒Sender线程。
那咱们来看下RecordBatch放入缓冲器后,什么条件会唤醒Sender线程呢?
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // 1.waitOnMetadata 期待元数据拉取 // 2.keySerializer.serialize和valueSerializer.serialize,很显著就是将Record序列化成byte字节数组 // 3.通过partition进行路由分区,依照肯定路由策略抉择Topic下的某个分区 //省略代码... // 4.accumulator.append将音讯放入缓冲器中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); //5.唤醒Sender线程的selector.select()的阻塞,开始解决内存缓冲器中的数据。 this.sender.wakeup(); } return result.future; } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (Exception e) { throw e; } //省略其余各种异样捕捉}
从下面代码,能够很分明的看到,唤醒sender线程的逻辑很简略,就是以后Batch曾经写满,或者是新的batch创立了。
result.batchIsFull || result.newBatchCreated
那么这两变量什么时候设置的呢?
在上一节中RecordBatch.tryAppned是创立新的batch,而RecordAccumulator.tryAppend()次要是追加写batch。他们会设置batchIsFull和newBatchCreated的标记。示意是新创建还是写满的batch。
次要代码如下:
new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true)
public final static class RecordAppendResult { public final FutureRecordMetadata future; public final boolean batchIsFull; public final boolean newBatchCreated; public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) { this.future = future; this.batchIsFull = batchIsFull; this.newBatchCreated = newBatchCreated; }}public boolean isFull() { return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();}
当满足条件后,最终会触发到sender.wakeup()
唤醒之前while循环阻塞的Selector(),筹备发送音讯。整个过程如下所示:
唤醒的Sender线程如何发送batch音讯的?
既然最终触发了sender.wakeUp(),你应该晓得底层触发的就是NioSelector的wakeup。唤醒的是哪一个流程呢?咱们先来回顾下,之前《Kafka成长记4 元数据拉取 下》Sender线程的run的次要脉络在如下图所示:
也就是之前剖析拉取元数据的时候,外围就是NetworkClient.poll()的外部次要3步 maybeUpdate()-->Selector()-->hanlde()。
最终拉取元数据胜利后,会再次阻塞在Selector.select()期待,而此时sender.waykeUp()就会唤醒阻塞继续执行run办法了。
然而NetworkClient.poll()外层还有一堆令人蛊惑的代码,不晓得大家记不记得?
void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now); }
之前第一次剖析run办法时,你发现这里有一堆令人蛊惑的逻辑,通过debug形式咱们发现理论执行到的最初一行只有client.poll()办法。
而还有一堆之前的一对令人蛊惑的逻辑:
accumulator.ready()、client.ready(node, now)
accumulator.drain()
accumulator.abortExpiredBatches()
client.send(request, now)
如下所示:
唤醒Sender之后,当初咱们曾经有一个batch了,这些逻辑有哪一些触发到了呢?咱们一起来看下吧。
RecordAccumulator.ready()在筹备什么?
然而发送音讯前要做一些查看,比方对应依据分区号找到对应的Broker、Broker连贯的查看、batch超时查看等等。
//Sender.java run(){ // 第一次Ready RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now) //省略... client.poll(); }public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
能够看到发送音讯前的Accumulator.ready()次要的脉络就是:
通过遍历内存缓冲器中batches的这个内存汇合,通过以一些判断,决定batch是否能够被发送。
1)第一个判断就是要发送分区的batchs对应的Broker必须存在。
2)如果leader存在,则计算一些条件,比方 :
backingOff,发送这个batch的工夫 + 重试距离的工夫,是否大于了以后工夫,默认是距离是100ms,个别attempts是0,不重试。然而如果attempts>0,则重试时,过了间隔时间才能够发送batch。
waitedTimeMs 示意以后工夫减去这个Batch被创立进去的那个工夫,这个Batch从创立开始到当初曾经期待了多久了
timeToWaitMs 这个Batch从创立开始算起,最多期待多久就必须去发送。如果重试阶段,这个工夫就是重试距离,然而在非重试的初始阶段,就是linger.ms的工夫(100ms),就是到了100ms这个batch必须发送进来。
full 示意Batch是否已满,如果说Dequeue里超过一个Batch了,阐明这个peekFirst返回的Batch就肯定是曾经满的,另外就是如果假如Dequeue里只有一个Batch,然而判断发现这个Batch达到了16kb的大小,也是已满的
expired 以后Batch曾经期待的工夫(120ms) >= Batch最多只能期待的工夫(100ms),曾经超出了linger.ms的工夫范畴了,否则呢,60ms < 100ms,此时就没有过期。如果linger.ms默认是0,就意味着说,只有Batch创立进去了,在这个中央肯定是expired = true
最终下面的条件组合成一个条件:
boolean sendable = full || expired || exhausted || closed || flushInProgress()
条件成立,则将筹备好的ReadyCheckResult数据返回,表明是否发送batch,并且发送到指明发送到哪一个Broker。
3)最初一点其实就是每次循环每个TopicPartition只是取出第一个batch,进行判断。如果一个都不满足条件,会取所有Partition中timeLeftMs最小的工夫期待发送Batch。这算是尽可能疾速的发送batch的优化的吧,这个细节其实大家能够不必在刻意记住,晓得大体发送逻辑就够了,晓得Kafka有一些优化和考量就够了。
下面过程 整体如下图所示:
外围次要查看了对应依据分区号找到对应的Broker、Broker连贯的查看、batch超时查看等信息。
NetWorkClient.ready()在筹备什么?
在run办法中,次要有两个ready,其中一个咱们曾经剖析过了,另一个ready是在做什么呢?
其实从名字上看,当然是在筹备网络连接了。代码如下:
//Sender.java run(){ // 第一次Ready RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now) //第二次ready Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } //省略... client.poll(); } @Override public boolean ready(Node node, long now) { if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); if (isReady(node, now)) return true; if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one initiateConnect(node, now); return false; } public boolean isReady(Node node, long now) { // if we need to update our metadata now declare all requests unready to make metadata requests first // priority return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString()); } private boolean canSendRequest(String node) { return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }
其实这里代码脉络很清晰,就是次要判断了:
元数据是否须要拉取+网络连接Channel是否筹备好+连贯状态是否曾经Connected+inFlightRequests正在发送的数量有没有超过5个(这个inFlightRequests之前拉取元数据的原理中遇见过,每次通过NIO发送的申请,如果没有收到响应,inFlightRequests这里会记录正在发送的申请)
如果上述都筹备好了,就能够发送batch了,如果没有筹备好,该须要拉取元数据就拉取,该须要建设broker连贯就建设,这个就没什么好说的了。
通过两次ready执行,Sender的run()办法执行如下图:
申请的筹备Accumulator.drain()和ClientRequest创立
执行完两次ready,在之后就是Accumulator.drain()办法了。drain办法其实逻辑从正文上就能显著看进去。
//Sender.java run(){ // 第一次Ready accumulator.ready(cluster, now) //第二次Ready networkclient.ready // 申请的会集 Accumulator.drain Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now); //省略... client.poll(); } /** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * @param now The current unix time in milliseconds * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. */ public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<RecordBatch>> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<RecordBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. if (!muted.contains(tp)) { Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request break; } else { RecordBatch batch = deque.pollFirst(); batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
联合正文和整体脉络,就是一个基于之前ready好的Node(broker)的列表的for循环。咱们能够得出如下论断:
因为默认咱们是依照topic-partition的构造记录batch音讯的,为了确认每一个Broker须要发送那些partition中的batchs音讯,须要基于Node组装好Batchs,而不是基于topic组装好baths。
Accumulator.drain()就是这件事的,获取broker上所有的partition,遍历broker上的所有的partitions,对每个partition获取到dequeue里的first batch,放入待发送到broker的列表里,每个broker都有一个batches列表,造成一个map记录下来。key是brokerId,value是batches列表。
整体如下图所示:
2次ready之后,drain会集数据之后,次要进行是batchs申请的最终构建,createClientRequest。
//Sender.java run(){ // 2次Ready、drain之后 // createProduceRequests操作 List<ClientRequest> requests = createProduceRequests(batches, now); //省略... client.poll() } /** * Transfer the record batches into a list of produce requests on a per-node basis */ private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size()); for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; } /** * Create a produce request from the given record batches */ private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; return new ClientRequest(now, acks != 0, send, callback); }
咱们想要发送音讯必定不是间接吧batchs这个List发送进来,最终还是要进行一个简略封装的。你还记拉取元数据的申请是怎么封装的么?
能够看到是有一套封装申请逻辑的。同理这里发送音讯时也是相似的, 之前咱们序列化好的RecordBatch音讯,实质还是ByteBuffer,这里通过一系列对象再次补充了一些新次要是额定减少了api key,api version,acks,request timeout等数据信息。如下图所示:
最初的筹备setSend()
最初筹备是一次setSend的操作,下面一系列筹备后,连贯筹备好了,申请也筹备好了,都ok之后,因为当初唤醒了Selector,依照NIO的通信机制,接着咱们须要将SelectKey的Ops操作更新为Wirte,示意须要写数据给Broker,接着进行write操作即可。所以这里会执行一次setSend操作,代码次要如下:
//Sender.java run(){ // 2次Ready、drain、createClientRequest之后 // setSend操作 for (ClientRequest request : requests) client.send(request, now); //省略... client.poll(); } public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); selector.send(request.request()); }//org.apache.kafka.common.network.Selecotor.java public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); try { channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }// org.apache.kafka.common.network.KafkaChannel public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
这个过程和之前发送元数据拉取申请是相似的,你应该很有些相熟,能够见《Kafka成长记4发送元数据拉取申请》。
整体如下图所示:
最终的音讯发送
通过后面一系列筹备,2次的reday、drain、clientRequest的创立,setSend设置对应SelectChannel关注写事件这些一系列筹备都ok后,终于要执行到音讯发送了。
其实有了之前发送元数据的教训,最终咱们必定是通过poolSelectKey->之后Handle类的办法解决的。
1)首先发送音讯的写申请,其实就是依靠inFlightRequests去暂存了正在发送的Request,通过channel.write写数据进来,之后执行handleCompletedSend办法
2)当音讯发送胜利后,勾销对OP_WRITE事件的关注,会承受到返回信息进行handreceives办法,否则如果一个Request的数据都没发送结束,此时还须要放弃对OP_WRITE事件的关注,关注NIO里的OP_READ事件呢key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
(具体代码逻辑就是在对应的handle办法,比较简单,我就不开展带大家看了,之后下一节解说粘包拆包的时候会再次钻研这块逻辑的)
整个过程大体如下图所示:
小结
最初,到这里Kafka内存缓冲区中的音讯最终如何发送进来的就带大家一起钻研明确了。
其实之前9节内容都比拟干货,解说的比拟粗疏,不能说算是精读源码,然而必定是Kafka的外围源码都摸透了。
《Kafka Producer篇》预计还有几节就完结了
我是枯萎,咱们下一节见~
本文由博客一文多发平台 OpenWrite 公布!