之前三节咱们次要剖析了KafkaProducer是如何将音讯放入到内存缓冲区的

下面的逻辑只是Accumulator.append()的一段外围逻辑而已,还记得之前咱们剖析过的KafkaProducerHelloWorld的整体逻辑么?

之前剖析的代码逻辑如下图所示:

从最开始配置解析,音讯对象Record的创立,到元数据拉取、key和value的最后序列化、Product分区路由的原理、音讯如何放入内存缓冲区的原理。

之前咱们曾经剖析到了图中红线的局部的结尾了—唤醒Sender线程发送音讯。

这一节咱们就持续剖析,音讯放入了内存缓冲中之后,触发唤醒Sender线程,之后Sender线程如何将打包好Batch发送进来的。

什么条件会唤醒Sender线程

从下面的流程图能够看到,在producer.send()执行doSend()的时候,accumulator.append()将音讯内存缓冲器之后,会唤醒Sender线程。

那咱们来看下RecordBatch放入缓冲器后,什么条件会唤醒Sender线程呢?

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {    TopicPartition tp = null;    try {      // 1.waitOnMetadata 期待元数据拉取      // 2.keySerializer.serialize和valueSerializer.serialize,很显著就是将Record序列化成byte字节数组      // 3.通过partition进行路由分区,依照肯定路由策略抉择Topic下的某个分区      //省略代码...      // 4.accumulator.append将音讯放入缓冲器中        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);        if (result.batchIsFull || result.newBatchCreated) {            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);            //5.唤醒Sender线程的selector.select()的阻塞,开始解决内存缓冲器中的数据。            this.sender.wakeup();        }        return result.future;    } catch (ApiException e) {        log.debug("Exception occurred during message send:", e);        if (callback != null)            callback.onCompletion(null, e);        this.errors.record();        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        return new FutureFailure(e);    } catch (Exception e) {        throw e;    }    //省略其余各种异样捕捉}

从下面代码,能够很分明的看到,唤醒sender线程的逻辑很简略,就是以后Batch曾经写满,或者是新的batch创立了。

result.batchIsFull || result.newBatchCreated

那么这两变量什么时候设置的呢?

在上一节中RecordBatch.tryAppned是创立新的batch,而RecordAccumulator.tryAppend()次要是追加写batch。他们会设置batchIsFull和newBatchCreated的标记。示意是新创建还是写满的batch。

次要代码如下:

new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true)
public final static class RecordAppendResult {    public final FutureRecordMetadata future;    public final boolean batchIsFull;    public final boolean newBatchCreated;    public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {        this.future = future;        this.batchIsFull = batchIsFull;        this.newBatchCreated = newBatchCreated;    }}public boolean isFull() {    return !this.writable || this.writeLimit <= this.compressor.estimatedBytesWritten();}

当满足条件后,最终会触发到sender.wakeup() 唤醒之前while循环阻塞的Selector(),筹备发送音讯。整个过程如下所示:

唤醒的Sender线程如何发送batch音讯的?

既然最终触发了sender.wakeUp(),你应该晓得底层触发的就是NioSelector的wakeup。唤醒的是哪一个流程呢?咱们先来回顾下,之前《Kafka成长记4 元数据拉取 下》Sender线程的run的次要脉络在如下图所示:

也就是之前剖析拉取元数据的时候,外围就是NetworkClient.poll()的外部次要3步 maybeUpdate()-->Selector()-->hanlde()。

最终拉取元数据胜利后,会再次阻塞在Selector.select()期待,而此时sender.waykeUp()就会唤醒阻塞继续执行run办法了。

然而NetworkClient.poll()外层还有一堆令人蛊惑的代码,不晓得大家记不记得?

   void run(long now) {        Cluster cluster = metadata.fetch();        // get the list of partitions with data ready to send        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);        // if there are any partitions whose leaders are not known yet, force metadata update        if (result.unknownLeadersExist)            this.metadata.requestUpdate();        // remove any nodes we aren't ready to send to        Iterator<Node> iter = result.readyNodes.iterator();        long notReadyTimeout = Long.MAX_VALUE;        while (iter.hasNext()) {            Node node = iter.next();            if (!this.client.ready(node, now)) {                iter.remove();                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));            }        }        // create produce requests        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,                                                                         result.readyNodes,                                                                         this.maxRequestSize,                                                                         now);        if (guaranteeMessageOrder) {            // Mute all the partitions drained            for (List<RecordBatch> batchList : batches.values()) {                for (RecordBatch batch : batchList)                    this.accumulator.mutePartition(batch.topicPartition);            }        }        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);        // update sensors        for (RecordBatch expiredBatch : expiredBatches)            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);        sensors.updateProduceRequestMetrics(batches);        List<ClientRequest> requests = createProduceRequests(batches, now);        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes        // with sendable data that aren't ready to send since they would cause busy looping.        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);        if (result.readyNodes.size() > 0) {            log.trace("Nodes with data ready to send: {}", result.readyNodes);            log.trace("Created {} produce requests: {}", requests.size(), requests);            pollTimeout = 0;        }        for (ClientRequest request : requests)            client.send(request, now);        // if some partitions are already ready to be sent, the select time would be 0;        // otherwise if some partition already has some data accumulated but not ready yet,        // the select time will be the time difference between now and its linger expiry time;        // otherwise the select time will be the time difference between now and the metadata expiry time;        this.client.poll(pollTimeout, now);    }

之前第一次剖析run办法时,你发现这里有一堆令人蛊惑的逻辑,通过debug形式咱们发现理论执行到的最初一行只有client.poll()办法

而还有一堆之前的一对令人蛊惑的逻辑:

accumulator.ready()、client.ready(node, now)

accumulator.drain()

accumulator.abortExpiredBatches()

client.send(request, now)

如下所示:

唤醒Sender之后,当初咱们曾经有一个batch了,这些逻辑有哪一些触发到了呢?咱们一起来看下吧。

RecordAccumulator.ready()在筹备什么?

然而发送音讯前要做一些查看,比方对应依据分区号找到对应的Broker、Broker连贯的查看、batch超时查看等等。

 //Sender.java   run(){     // 第一次Ready      RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now)     //省略...     client.poll();  }public ReadyCheckResult ready(Cluster cluster, long nowMs) {        Set<Node> readyNodes = new HashSet<>();        long nextReadyCheckDelayMs = Long.MAX_VALUE;        boolean unknownLeadersExist = false;        boolean exhausted = this.free.queued() > 0;        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {            TopicPartition part = entry.getKey();            Deque<RecordBatch> deque = entry.getValue();            Node leader = cluster.leaderFor(part);            if (leader == null) {                unknownLeadersExist = true;            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {                synchronized (deque) {                    RecordBatch batch = deque.peekFirst();                    if (batch != null) {                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;                        long waitedTimeMs = nowMs - batch.lastAttemptMs;                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);                        boolean full = deque.size() > 1 || batch.records.isFull();                        boolean expired = waitedTimeMs >= timeToWaitMs;                        boolean sendable = full || expired || exhausted || closed || flushInProgress();                        if (sendable && !backingOff) {                            readyNodes.add(leader);                        } else {                            // Note that this results in a conservative estimate since an un-sendable partition may have                            // a leader that will later be found to have sendable data. However, this is good enough                            // since we'll just wake up and then sleep again for the remaining time.                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);                        }                    }                }            }        }        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);    }

能够看到发送音讯前的Accumulator.ready()次要的脉络就是:

通过遍历内存缓冲器中batches的这个内存汇合,通过以一些判断,决定batch是否能够被发送。

1)第一个判断就是要发送分区的batchs对应的Broker必须存在。

2)如果leader存在,则计算一些条件,比方 :

backingOff,发送这个batch的工夫 + 重试距离的工夫,是否大于了以后工夫,默认是距离是100ms,个别attempts是0,不重试。然而如果attempts>0,则重试时,过了间隔时间才能够发送batch。

waitedTimeMs 示意以后工夫减去这个Batch被创立进去的那个工夫,这个Batch从创立开始到当初曾经期待了多久了

timeToWaitMs 这个Batch从创立开始算起,最多期待多久就必须去发送。如果重试阶段,这个工夫就是重试距离,然而在非重试的初始阶段,就是linger.ms的工夫(100ms),就是到了100ms这个batch必须发送进来。

full 示意Batch是否已满,如果说Dequeue里超过一个Batch了,阐明这个peekFirst返回的Batch就肯定是曾经满的,另外就是如果假如Dequeue里只有一个Batch,然而判断发现这个Batch达到了16kb的大小,也是已满的

expired 以后Batch曾经期待的工夫(120ms) >= Batch最多只能期待的工夫(100ms),曾经超出了linger.ms的工夫范畴了,否则呢,60ms < 100ms,此时就没有过期。如果linger.ms默认是0,就意味着说,只有Batch创立进去了,在这个中央肯定是expired = true

最终下面的条件组合成一个条件:

boolean sendable = full || expired || exhausted || closed || flushInProgress()

条件成立,则将筹备好的ReadyCheckResult数据返回,表明是否发送batch,并且发送到指明发送到哪一个Broker。

3)最初一点其实就是每次循环每个TopicPartition只是取出第一个batch,进行判断。如果一个都不满足条件,会取所有Partition中timeLeftMs最小的工夫期待发送Batch。这算是尽可能疾速的发送batch的优化的吧,这个细节其实大家能够不必在刻意记住,晓得大体发送逻辑就够了,晓得Kafka有一些优化和考量就够了。

下面过程 整体如下图所示:

外围次要查看了对应依据分区号找到对应的Broker、Broker连贯的查看、batch超时查看等信息。

NetWorkClient.ready()在筹备什么?

在run办法中,次要有两个ready,其中一个咱们曾经剖析过了,另一个ready是在做什么呢?

其实从名字上看,当然是在筹备网络连接了。代码如下:

 //Sender.java   run(){     // 第一次Ready      RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now)      //第二次ready        Iterator<Node> iter = result.readyNodes.iterator();        long notReadyTimeout = Long.MAX_VALUE;        while (iter.hasNext()) {            Node node = iter.next();            if (!this.client.ready(node, now)) {                iter.remove();                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));            }        }     //省略...     client.poll();  }      @Override    public boolean ready(Node node, long now) {        if (node.isEmpty())            throw new IllegalArgumentException("Cannot connect to empty node " + node);        if (isReady(node, now))            return true;        if (connectionStates.canConnect(node.idString(), now))            // if we are interested in sending to a node and we don't have a connection to it, initiate one            initiateConnect(node, now);        return false;    }    public boolean isReady(Node node, long now) {        // if we need to update our metadata now declare all requests unready to make metadata requests first        // priority        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());    }    private boolean canSendRequest(String node) {        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);    }

其实这里代码脉络很清晰,就是次要判断了:

元数据是否须要拉取+网络连接Channel是否筹备好+连贯状态是否曾经Connected+inFlightRequests正在发送的数量有没有超过5个(这个inFlightRequests之前拉取元数据的原理中遇见过,每次通过NIO发送的申请,如果没有收到响应,inFlightRequests这里会记录正在发送的申请)

如果上述都筹备好了,就能够发送batch了,如果没有筹备好,该须要拉取元数据就拉取,该须要建设broker连贯就建设,这个就没什么好说的了。

通过两次ready执行,Sender的run()办法执行如下图:

申请的筹备Accumulator.drain()和ClientRequest创立

执行完两次ready,在之后就是Accumulator.drain()办法了。drain办法其实逻辑从正文上就能显著看进去。

 //Sender.java   run(){     // 第一次Ready accumulator.ready(cluster, now)      //第二次Ready networkclient.ready     // 申请的会集 Accumulator.drain      Map<Integer, List<RecordBatch>> batches =        this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);     //省略...     client.poll();  }   /**     * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified     * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.     *      * @param cluster The current cluster metadata     * @param nodes The list of node to drain     * @param maxSize The maximum number of bytes to drain     * @param now The current unix time in milliseconds     * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.     */    public Map<Integer, List<RecordBatch>> drain(Cluster cluster,                                                 Set<Node> nodes,                                                 int maxSize,                                                 long now) {        if (nodes.isEmpty())            return Collections.emptyMap();        Map<Integer, List<RecordBatch>> batches = new HashMap<>();        for (Node node : nodes) {            int size = 0;            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());            List<RecordBatch> ready = new ArrayList<>();            /* to make starvation less likely this loop doesn't start at 0 */            int start = drainIndex = drainIndex % parts.size();            do {                PartitionInfo part = parts.get(drainIndex);                TopicPartition tp = new TopicPartition(part.topic(), part.partition());                // Only proceed if the partition has no in-flight batches.                if (!muted.contains(tp)) {                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));                    if (deque != null) {                        synchronized (deque) {                            RecordBatch first = deque.peekFirst();                            if (first != null) {                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;                                // Only drain the batch if it is not during backoff period.                                if (!backoff) {                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {                                        // there is a rare case that a single batch size is larger than the request size due                                        // to compression; in this case we will still eventually send this batch in a single                                        // request                                        break;                                    } else {                                        RecordBatch batch = deque.pollFirst();                                        batch.records.close();                                        size += batch.records.sizeInBytes();                                        ready.add(batch);                                        batch.drainedMs = now;                                    }                                }                            }                        }                    }                }                this.drainIndex = (this.drainIndex + 1) % parts.size();            } while (start != drainIndex);            batches.put(node.id(), ready);        }        return batches;    }

联合正文和整体脉络,就是一个基于之前ready好的Node(broker)的列表的for循环。咱们能够得出如下论断:

因为默认咱们是依照topic-partition的构造记录batch音讯的,为了确认每一个Broker须要发送那些partition中的batchs音讯,须要基于Node组装好Batchs,而不是基于topic组装好baths。

Accumulator.drain()就是这件事的,获取broker上所有的partition,遍历broker上的所有的partitions,对每个partition获取到dequeue里的first batch,放入待发送到broker的列表里,每个broker都有一个batches列表,造成一个map记录下来。key是brokerId,value是batches列表。

整体如下图所示:

2次ready之后,drain会集数据之后,次要进行是batchs申请的最终构建,createClientRequest。

 //Sender.java   run(){     // 2次Ready、drain之后     // createProduceRequests操作     List<ClientRequest> requests = createProduceRequests(batches, now);           //省略...     client.poll()  }   /**     * Transfer the record batches into a list of produce requests on a per-node basis     */    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));        return requests;    }  /**     * Create a produce request from the given record batches     */    private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());        for (RecordBatch batch : batches) {            TopicPartition tp = batch.topicPartition;            produceRecordsByPartition.put(tp, batch.records.buffer());            recordsByPartition.put(tp, batch);        }        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);        RequestSend send = new RequestSend(Integer.toString(destination),                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),                                           request.toStruct());        RequestCompletionHandler callback = new RequestCompletionHandler() {            public void onComplete(ClientResponse response) {                handleProduceResponse(response, recordsByPartition, time.milliseconds());            }        };        return new ClientRequest(now, acks != 0, send, callback);    }

咱们想要发送音讯必定不是间接吧batchs这个List发送进来,最终还是要进行一个简略封装的。你还记拉取元数据的申请是怎么封装的么?

能够看到是有一套封装申请逻辑的。同理这里发送音讯时也是相似的, 之前咱们序列化好的RecordBatch音讯,实质还是ByteBuffer,这里通过一系列对象再次补充了一些新次要是额定减少了api key,api version,acks,request timeout等数据信息。如下图所示:

最初的筹备setSend()

最初筹备是一次setSend的操作,下面一系列筹备后,连贯筹备好了,申请也筹备好了,都ok之后,因为当初唤醒了Selector,依照NIO的通信机制,接着咱们须要将SelectKey的Ops操作更新为Wirte,示意须要写数据给Broker,接着进行write操作即可。所以这里会执行一次setSend操作,代码次要如下:

 //Sender.java   run(){     // 2次Ready、drain、createClientRequest之后     // setSend操作     for (ClientRequest request : requests)       client.send(request, now);     //省略...     client.poll();  }    public void send(ClientRequest request, long now) {        String nodeId = request.request().destination();        if (!canSendRequest(nodeId))            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");        doSend(request, now);    }    private void doSend(ClientRequest request, long now) {        request.setSendTimeMs(now);        this.inFlightRequests.add(request);        selector.send(request.request());    }//org.apache.kafka.common.network.Selecotor.java    public void send(Send send) {        KafkaChannel channel = channelOrFail(send.destination());        try {            channel.setSend(send);        } catch (CancelledKeyException e) {            this.failedSends.add(send.destination());            close(channel);        }    }// org.apache.kafka.common.network.KafkaChannel    public void setSend(Send send) {        if (this.send != null)            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");        this.send = send;        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);    }

这个过程和之前发送元数据拉取申请是相似的,你应该很有些相熟,能够见《Kafka成长记4发送元数据拉取申请》。

整体如下图所示:

最终的音讯发送

通过后面一系列筹备,2次的reday、drain、clientRequest的创立,setSend设置对应SelectChannel关注写事件这些一系列筹备都ok后,终于要执行到音讯发送了。

其实有了之前发送元数据的教训,最终咱们必定是通过poolSelectKey->之后Handle类的办法解决的。

1)首先发送音讯的写申请,其实就是依靠inFlightRequests去暂存了正在发送的Request,通过channel.write写数据进来,之后执行handleCompletedSend办法

2)当音讯发送胜利后,勾销对OP_WRITE事件的关注,会承受到返回信息进行handreceives办法,否则如果一个Request的数据都没发送结束,此时还须要放弃对OP_WRITE事件的关注,关注NIO里的OP_READ事件呢key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

(具体代码逻辑就是在对应的handle办法,比较简单,我就不开展带大家看了,之后下一节解说粘包拆包的时候会再次钻研这块逻辑的)

整个过程大体如下图所示:

小结

最初,到这里Kafka内存缓冲区中的音讯最终如何发送进来的就带大家一起钻研明确了。

其实之前9节内容都比拟干货,解说的比拟粗疏,不能说算是精读源码,然而必定是Kafka的外围源码都摸透了。

《Kafka Producer篇》预计还有几节就完结了

我是枯萎,咱们下一节见~

本文由博客一文多发平台 OpenWrite 公布!