上一节咱们剖析了Producer的外围组件,咱们失去了一张要害的组件图。你还记得么?

简略概括下下面的图就是:

创立了Metadata组件,外部通过Cluster保护元数据

初始化了发送音讯的内存缓冲器RecordAccumulator

创立了NetworkClient,外部最重要的是创立了NIO的Selector组件

启动了一个Sender线程,Sender援用了下面的所有组件,开始执行run办法。

图的最下方能够看到,上一节截止到了run办法的执行,这一节咱们首先会看看run办法外围脉络做了什么。接着剖析下Producer第一个外围流程:元数据拉取的源码原理。

让咱们开始吧!

Sender的run办法在做什么?

这一节咱们就持续剖析下,sender的run办法开始执行会做什么。

 public void run() {        log.debug("Starting Kafka producer I/O thread.");        // main loop, runs until close is called        while (running) {            try {                run(time.milliseconds());            } catch (Exception e) {                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");        // okay we stopped accepting requests but there may still be        // requests in the accumulator or waiting for acknowledgment,        // wait until these are completed.        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {            try {                run(time.milliseconds());            } catch (Exception e) {                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }        if (forceClose) {            // We need to fail all the incomplete batches and wake up the threads waiting on            // the futures.            this.accumulator.abortIncompleteBatches();        }        try {            this.client.close();        } catch (Exception e) {            log.error("Failed to close network client", e);        }        log.debug("Shutdown of Kafka producer I/O thread has completed.");    }

这个run办法的外围脉络很简略。次要就是2个while循环+线程的close,而2个while循环,他们都调用了run(long time)的这个办法。

通过正文你能够看到,第二个while是解决非凡状况的,当第一个while退出后,还有未发送的申请,须要第二个while循环解决实现,才会敞开线程。

整体脉络如下图所示:

接着其实就该看下run办法次要在干什么了?

   /**     * Run a single iteration of sending     *      * @param now     *            The current POSIX time in milliseconds     */    void run(long now) {        Cluster cluster = metadata.fetch();        // get the list of partitions with data ready to send        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);        // if there are any partitions whose leaders are not known yet, force metadata update        if (result.unknownLeadersExist)            this.metadata.requestUpdate();        // remove any nodes we aren't ready to send to        Iterator<Node> iter = result.readyNodes.iterator();        long notReadyTimeout = Long.MAX_VALUE;        while (iter.hasNext()) {            Node node = iter.next();            if (!this.client.ready(node, now)) {                iter.remove();                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));            }        }        // create produce requests        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,                                                                         result.readyNodes,                                                                         this.maxRequestSize,                                                                         now);        if (guaranteeMessageOrder) {            // Mute all the partitions drained            for (List<RecordBatch> batchList : batches.values()) {                for (RecordBatch batch : batchList)                    this.accumulator.mutePartition(batch.topicPartition);            }        }        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);        // update sensors        for (RecordBatch expiredBatch : expiredBatches)            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);        sensors.updateProduceRequestMetrics(batches);        List<ClientRequest> requests = createProduceRequests(batches, now);        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes        // with sendable data that aren't ready to send since they would cause busy looping.        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);        if (result.readyNodes.size() > 0) {            log.trace("Nodes with data ready to send: {}", result.readyNodes);            log.trace("Created {} produce requests: {}", requests.size(), requests);            pollTimeout = 0;        }        for (ClientRequest request : requests)            client.send(request, now);        // if some partitions are already ready to be sent, the select time would be 0;        // otherwise if some partition already has some data accumulated but not ready yet,        // the select time will be the time difference between now and its linger expiry time;        // otherwise the select time will be the time difference between now and the metadata expiry time;        this.client.poll(pollTimeout, now);    }

下面的代码,你如果第一次看,你必定会感觉,这个脉络十分不清晰,不晓得重点在哪里。不过还好有些正文,你能大体猜到他在干嘛。

accumulator的ready,networkclient的ready、networkclient的send、networkclient的poll

这些如同是在筹备内存区域、筹备网络连接的node节点、发送数据、拉取响应后果的意思。

可是如果你猜不到,该怎么办呢?

这时候就能够祭出debug这个杀器了。因为是producer,咱们能够在Hellowolrd的这个客户端打断点,一步一步看下。

当你对run办法一步一步打了断点之后你会发现:

accumulator的ready,networkclient的ready、networkclient的send 这些的逻辑简直都没有执行,全部都是初始化空对象,或者办法外部间接return。

间接一路执行到了client.poll办法。如下图所示:

那么,你能够得出一个论断,while第一次循环这个run办法的外围逻辑,其实只有一句话:

client.poll(pollTimeout, now)

整体脉络如下所示:

看来接下来,这个NetworkClient的poll办法,就是要害中的要害了:

    /**     * Do actual reads and writes to sockets.     * 对套接字进行理论读取和写入     *     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,     *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and     *                metadata timeout     * @param now The current time in milliseconds     * @return The list of responses received     */    @Override    public List<ClientResponse> poll(long timeout, long now) {        long metadataTimeout = metadataUpdater.maybeUpdate(now);        try {            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));        } catch (IOException e) {            log.error("Unexpected error during I/O", e);        }        // process completed actions        long updatedNow = this.time.milliseconds();        List<ClientResponse> responses = new ArrayList<>();        handleCompletedSends(responses, updatedNow);        handleCompletedReceives(responses, updatedNow);        handleDisconnections(responses, updatedNow);        handleConnections();        handleTimedOutRequests(responses, updatedNow);        // invoke callbacks        for (ClientResponse response : responses) {            if (response.request().hasCallback()) {                try {                    response.request().callback().onComplete(response);                } catch (Exception e) {                    log.error("Uncaught error in request completion:", e);                }            }        }        return responses;    }

这个办法的脉络就清晰多了,通过办法名和正文,咱们简直能够猜出他的一些作用次要有:

1)正文说:对套接字进行理论读取和写入

2)metadataUpdater.maybeUpdate(),你还记得NetworkClient的组件DefaultMetadataUpdater么,办法名意思是可能进行元数据更新。这个如同很要害的样子

3)接着执行了Selector的poll办法,这个是NetworkClient的另一个组件Selector,还记得么?它底层封装了原生的NIO Selector。这个办法应该也比拟要害。

4)后续对response执行了一系列的办法,从名字上看, handleCompletedSends 解决实现发送的申请、handleCompletedReceives解决实现承受的申请、handleDisconnections解决断开连接的申请、handleConnections解决连贯胜利的申请、解决超时的申请handleTimedOutRequests。依据不同状况有不同的解决。

5)最初还有一个response的相干的回调解决,如果注册了回调函数,会执行下。这个应该不是很要害的逻辑

也就是简略的说就是NetworkClient执行poll办法,次要通过selector解决申请的读取和写入,对响应后果做不同的解决而已。

如下图所示:

到这里其实咱们根本摸清出了run办法次要在做的一件事件了,因为是第一次循环,之前的accumulator的ready,networkclient的ready、networkclient的send 什么都没做,第一次while循环run办法外围执行的是networkclient.poll办法。而poll办法的次要逻辑就是下面图中所示的了。

maybeUpdate可能在在拉取元数据?

方才咱们剖析到,poll办法首先执行的是DefaultMetadataUpdater的maybeUpdate办法,它是可能更新的意思。咱们来一起看下他的逻辑吧。

        public long maybeUpdate(long now) {            // should we update our metadata?            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;            // if there is no node available to connect, back off refreshing metadata            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),                    waitForMetadataFetch);            if (metadataTimeout == 0) {                // Beware that the behavior of this method and the computation of timeouts for poll() are                // highly dependent on the behavior of leastLoadedNode.                Node node = leastLoadedNode(now);                maybeUpdate(now, node);            }            return metadataTimeout;        }      /**     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time     * is now     */    public synchronized long timeToNextUpdate(long nowMs) {        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;        return Math.max(timeToExpire, timeToAllowUpdate);    }

原来这里有一个工夫的判断,当判断满足才会执行maybeUpdate。

这个工夫计算如同比较复杂,然而大体能够看进去,metadataTimeout是依据三个工夫综合判断进去的,如果是0才会执行真正的maybeUpdate()。

像这种时候,咱们能够间接在metadataTimeout这里打一个断点,看下它的值是如何计算的,比方下图:

你会发现,当第一次执行while循环,执行到poll办法,执行到这个maybeUpdate的时候,决定metadataTimeout的3个值,有两个是0,其中一个是非0,是一个299720的值。最终导致metadataTimeout也是非0,是299720。

也就是说,第一次while循环不会执行maybeUpdate的任何逻辑。

那么接着向下执行 Selector的poll()办法。

   /**     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing     * disconnections, initiating new sends, or making progress on in-progress sends or receives.     * 在不阻塞的状况下,在每个连贯上做任何能够做的 I/O。这包含实现连贯实现、断开连接,启动新的发送,或在进行中的发送或接管申请     */    @Override    public void poll(long timeout) throws IOException {        if (timeout < 0)            throw new IllegalArgumentException("timeout should be >= 0");        clear();        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())            timeout = 0;        /* check ready keys */        long startSelect = time.nanoseconds();        //这个办法是NIO底层Selector.select(),会阻塞监听        int readyKeys = select(timeout);        long endSelect = time.nanoseconds();        currentTimeNanos = endSelect;        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());        //如果监听到有操作的SelectionKeys,也就是readyKeys>0< 会执行一些操作        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {            pollSelectionKeys(this.nioSelector.selectedKeys(), false);            pollSelectionKeys(immediatelyConnectedKeys, true);        }        addToCompletedReceives();        long endIo = time.nanoseconds();        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());        maybeCloseOldestConnection();    }    private int select(long ms) throws IOException {        if (ms < 0L)            throw new IllegalArgumentException("timeout should be >= 0");        if (ms == 0L)            return this.nioSelector.selectNow();        else            return this.nioSelector.select(ms);    }

下面的脉络次要是2步:

1)select(timeout): NIO底层selector.select(),会阻塞监听

2)pollSelectionKeys(): 监听到有操作的SelectionKeys,做了一些操作

也就是说,最终,Sender线程的run办法,第一次while循环执行poll办法,最初什么都没干,会被selector.select()阻塞住。

如下图所示:

new KafkaProducer之后

剖析完了run办法的执行 ,咱们剖析的KafkaProducerHelloWorld第一步new KafkaProducer()根本就实现了。

大家经验了一节半的工夫,终于剖析分明了KafkaProducer创立的原理。不不晓得你对Kafka的Producer是不是有了更深的了解了。

剖析了new KafkaProducer()之后呢?

咱们持续接着KafkaProducerHelloWorld往下剖析,你还记得KafkaProducerHelloWorld的代码么?

public class KafkaProducerHelloWorld {    public static void main(String[] args) throws Exception {        //配置Kafka的一些参数        Properties props = new Properties();        props.put("bootstrap.servers", "mengfanmao.org:9092");        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 创立一个Producer实例        KafkaProducer<String, String> producer = new KafkaProducer<>(props);        // 封装一条音讯        ProducerRecord<String, String> record = new ProducerRecord<>(                "test-topic", "test-key", "test-value");        // 同步形式发送音讯,会阻塞在这里,直到发送实现        // producer.send(record).get();        // 异步形式发送音讯,不阻塞,设置一个监听回调函数即可        producer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception exception) {                if(exception == null) {                    System.out.println("音讯发送胜利");                } else {                    exception.printStackTrace();                }            }        });        Thread.sleep(5 * 1000);        // 退出producer        producer.close();    }

KafkaProducerHelloWorld次要就3步:

1)new KafkaProducer 这个咱们曾经剖析完了,次要剖析了配置文件的解析、各个组件是什么、有什么,还有就是方才剖析的run线程第一次循环到底执行了什么。

2) new ProducerRecord 创立待发送的音讯

3) producer.send() 发送音讯

首先创立待发送的音讯:

ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");public ProducerRecord(String topic, K key, V value) {    this(topic, null, null, key, value);}    /**     * Creates a record with a specified timestamp to be sent to a specified topic and partition     * 创立具备指定工夫戳的记录以发送到指定主题和分区     * @param topic The topic the record will be appended to     * @param partition The partition to which the record should be sent     * @param timestamp The timestamp of the record     * @param key The key that will be included in the record     * @param value The record contents     */    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {        if (topic == null)            throw new IllegalArgumentException("Topic cannot be null");        if (timestamp != null && timestamp < 0)            throw new IllegalArgumentException("Invalid timestamp " + timestamp);        this.topic = topic;        this.partition = partition;        this.key = key;        this.value = value;        this.timestamp = timestamp;    }

咱们之前提过,Record示意了一条音讯的形象封装。这个ProducerRecord其实就示意了一条音讯。

从构造函数的正文能够看进去,ProducerRecord能够指定往哪个topic,哪一个分区partition,并且音讯能够设置一个工夫戳。分区和工夫戳默认能够不指定

其实看这块源码,咱们次要失去的信息就是这些了,这些都比较简单。就不画图了。

发送音讯时的元数据拉取触发

当Producer和Record都创立好了之后,能够用同步或者异步的形式发送音讯。

// 同步形式发送音讯,会阻塞在这里,直到发送实现// producer.send(record).get();// 异步形式发送音讯,不阻塞,设置一个监听回调函数即可producer.send(record, new Callback() {    @Override    public void onCompletion(RecordMetadata metadata, Exception exception) {        if(exception == null) {            System.out.println("音讯发送胜利");        } else {            exception.printStackTrace();        }    }});    //同步发送    @Override    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {        return send(record, null);    }    //异步发送    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {        // intercept the record, which can be potentially modified; this method does not throw exceptions        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);        return doSend(interceptedRecord, callback);    }

同步和异步的整个发送逻辑如下图所示:

从上图你会发现,然而无论同步发送还是异步底层都会调用同一个办法doSend()。区别就是有没有callBack回调函数而已,他们还都在调用前注册一些拦截器,这里咱们抓大放小下,咱们重点还是关注doSend办法。

doSend办法如下:

/** * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {    TopicPartition tp = null;    try {        // first make sure the metadata for the topic is available        long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);        long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);        byte[] serializedKey;        try {            serializedKey = keySerializer.serialize(record.topic(), record.key());        } catch (ClassCastException cce) {            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +                    " specified in key.serializer");        }        byte[] serializedValue;        try {            serializedValue = valueSerializer.serialize(record.topic(), record.value());        } catch (ClassCastException cce) {            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +                    " specified in value.serializer");        }        int partition = partition(record, serializedKey, serializedValue, metadata.fetch());        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);        ensureValidRecordSize(serializedSize);        tp = new TopicPartition(record.topic(), partition);        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);        // producer callback will make sure to call both 'callback' and interceptor callback        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);        if (result.batchIsFull || result.newBatchCreated) {            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);            this.sender.wakeup();        }        return result.future;        // handling exceptions and record the errors;        // for API exceptions return them in the future,        // for other exceptions throw directly    } catch (ApiException e) {        log.debug("Exception occurred during message send:", e);        if (callback != null)            callback.onCompletion(null, e);        this.errors.record();        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        return new FutureFailure(e);    } catch (InterruptedException e) {        this.errors.record();        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        throw new InterruptException(e);    } catch (BufferExhaustedException e) {        this.errors.record();        this.metrics.sensor("buffer-exhausted-records").record();        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        throw e;    } catch (KafkaException e) {        this.errors.record();        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        throw e;    } catch (Exception e) {        // we notify interceptor about all exceptions, since onSend is called before anything else in this method        if (this.interceptors != null)            this.interceptors.onSendError(record, tp, e);        throw e;    }}

这个办法的脉络尽管比拟长,然而脉络还是比拟清晰,次要先执行了:

1)waitOnMetadata 应该是期待元数据拉取

2)keySerializer.serialize和valueSerializer.serialize,很显著就是将Record序列化成byte字节数组

3)通过partition进行路由分区,依照肯定路由策略抉择Topic下的某个分区

4)accumulator.append将音讯放入缓冲器中

5)唤醒Sender线程的selector.select()的阻塞,开始解决内存缓冲器中的数据。

用图来示意如下所示:

这两节咱们重点剖析元数据拉取的这个场景的源码原理。

所以这里咱们着重先看下步骤1 ,之后的4步咱们之后会剖析到的。

waitOnMetadata 如何期待元数据拉取的?

既然send的第一步是执行waitOnMetadata办法,首先看下它的代码:

 /**     * Wait for cluster metadata including partitions for the given topic to be available.     * @param topic The topic we want metadata for     * @param maxWaitMs The maximum time in ms for waiting on the metadata     * @return The amount of time we waited in ms     */    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {        // add topic to metadata topic list if it is not there already.        if (!this.metadata.containsTopic(topic))            this.metadata.add(topic);        if (metadata.fetch().partitionsForTopic(topic) != null)            return 0;        long begin = time.milliseconds();        long remainingWaitMs = maxWaitMs;        while (metadata.fetch().partitionsForTopic(topic) == null) {            log.trace("Requesting metadata update for topic {}.", topic);            int version = metadata.requestUpdate();            sender.wakeup();            metadata.awaitUpdate(version, remainingWaitMs);            long elapsed = time.milliseconds() - begin;            if (elapsed >= maxWaitMs)                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");            if (metadata.fetch().unauthorizedTopics().contains(topic))                throw new TopicAuthorizationException(topic);            remainingWaitMs = maxWaitMs - elapsed;        }        return time.milliseconds() - begin;    }    /**     * Get the current cluster info without blocking     */    public synchronized Cluster fetch() {        return this.cluster;    }    public synchronized int requestUpdate() {        this.needUpdate = true;        return this.version;    }    /**     * Wait for metadata update until the current version is larger than the last version we know of     */    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {        if (maxWaitMs < 0) {            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");        }        long begin = System.currentTimeMillis();        long remainingWaitMs = maxWaitMs;        while (this.version <= lastVersion) {            if (remainingWaitMs != 0)                wait(remainingWaitMs);            long elapsed = System.currentTimeMillis() - begin;            if (elapsed >= maxWaitMs)                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");            remainingWaitMs = maxWaitMs - elapsed;        }    }

这个办法外围就是判断了是否有Cluster元数据信息,如果没有,进行了如下操作:

1)metadata.requestUpdate(); 更新了一个needUpdate标记,这个值会影响之前maybeUpdate的metadataTimeout的计算,能够让metadataTimeout为0

2)sender.wakeup();唤醒之前nioSelector.select()的阻塞,继续执行

3)metadata.awaitUpdate(version, remainingWaitMs); 次要进行了版本比拟,如果不是最新版本,调用了Metadata.wait()办法(wait办法是每个Object都会有的办法,个别和notify或者notifyAll组合应用)

整个过程我间接用图给大家示意一下,如下所示:

整个图就是明天咱们剖析的要害后果了,这里通过两种阻塞和唤醒机制,一个是NIO中Selector的select()和wakeUp(),一个是MetaData对象的wait()和notifyAll()机制。所以这里要联合之前Sender线程的阻塞逻辑一起来了解。

是不是很有意思一种应用,这里没有用任何线程的join、sleep、wait、park、unpark、notify这些办法。

小结

最初咱们简略小结下,这里一节咱们次要剖析了如下Producer的源码原理:

初始化KafkaProducer时并没有去拉取元数据,然而创立了Selector组件,启动了Sender线程,select阻塞期待申请响应。因为还没有发送任何申请,所以初始化时并没有去真正拉取元数据。

真正拉取元数据是在第一次send办法调用时,会唤醒唤醒Selector之前阻塞的select(),进入第二次while循环,从而发送拉取元数据申请,并且通过Obejct.wait的机制期待60s,等到从Broker拉取元数据胜利后,才会继续执行真正的生产音讯的申请,否则会报拉取元数据超时异样。

这一节咱们只是看到了进行了wait如何期待元数据拉取。

而唤醒Selector的select之后应该会进入第二次while循环

那第二次while循环如何发送申请拉取元数据申请,并且在胜利后notifyAll()进行唤醒操作的呢?

让咱们下一节持续剖析,大家敬请期待! 咱们下一节见!

本文由博客一文多发平台 OpenWrite 公布!