上一节结尾,咱们总结道: 初始化KafkaProducer时并没有去拉取元数据,然而创立了Selector组件,启动了Sender线程,select阻塞期待申请响应。因为还没有发送任何申请,所以初始化时并没有去真正拉取元数据。

真正拉取元数据是在第一次send办法调用时,会唤醒唤醒Selector之前阻塞的select(),进入第二次while循环,从而发送拉取元数据申请,并且通过Obejct.wait的机制期待60s,等到从Broker拉取元数据胜利后,才会继续执行真正的生产音讯的申请,否则会报拉取元数据超时异样。

如下图:

而唤醒Selector的select之后应该会进入第二次while循环,那第二次while循环如何发送申请拉取元数据申请,并且在胜利后notifyAll()进行唤醒操作的呢?

咱们明天来一起看一下。

第二次while循环-开始触发元数据拉取

唤醒了阻塞的select,你还记得阻塞后的逻辑么?

唤醒后会依据nioSelector.select()返回的readKeys这个int数字,如果大于0如执行pollSelectionKeys的一些操作,因为间接被wakeUp(),理论readKeys是0,所以poll办法间接就返回了,不会执行pollSelectionKeys的解决。

而且Selector的poll办法返回后,因为pollSelectionKeys没有执行,所以之后一系列办法handleCompletedSends、handleCompletedReceives、handleDisconnections、handleConnections、handleTimedOutRequests均没有执行。(你能够本人尝试断点下,就会发现。)

下面的逻辑执行实现,也就说第一次循环会完结,从新进行第二次循环。整体过程如下图所示:(次要执行了灰色的备注标注的流程)

第二次循环maybeUpdate执行的起因

既然进入第二次循环,就会从新执行将从新执行maybeUpdate()、poll()、handle结尾的这些办法。

你还记得maybeUpdate的外围脉络么?它次要是依据3个工夫决定了metadataTimeout是否为0,来决定是否执行。代码如下:

        @Override        public long maybeUpdate(long now) {            // should we update our metadata?            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;            // if there is no node available to connect, back off refreshing metadata            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),                    waitForMetadataFetch);            if (metadataTimeout == 0) {                // Beware that the behavior of this method and the computation of timeouts for poll() are                // highly dependent on the behavior of leastLoadedNode.                Node node = leastLoadedNode(now);                maybeUpdate(now, node);            }            return metadataTimeout;        }        public synchronized long timeToNextUpdate(long nowMs) {            long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);            long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;            return Math.max(timeToExpire, timeToAllowUpdate);        }

第一次循环的时候metadataTimeout失去的是非0,而第二次循环这个值其实曾经变成0了。

因为咱们在上一节send的在sender.wakeyUp()前,已经执行了metadata.requestUpdate();

这一行代码,它将needUpdate这个标记改为了true。会让决定metadataTimeout的3个工夫值中的timeToNextMetadataUpdate也变为0,也就是说timeToNextMetadataUpdate、timeToNextReconnectAttempt、waitForMetadataFetch都变成了0,天然metadataTimeout也是0了。

如下图所示:

所以第二次循环的时候会真正执行maybeUpdae的逻辑。而不像之前第一次,什么都没做。

而如果metadataTimeout=0,次要执行了2个办法:

1)leastLoadedNode 这个其实从正文能够看出,是在抉择一个Broker节点,从它那里拉取元数据。抉择的规范必定最好连贯过的Broker,并且待发送数据少的节点,这些逻辑具体咱们就认真钻研了。

2)maybeUpdate 这个办法其实十分要害,是次要建设连贯或者发动拉取元数据申请的逻辑

所以这里咱们次要看下这个mayBeUpdate的次要逻辑:

/** * Add a metadata request to the list of sends if we can make one */private void maybeUpdate(long now, Node node) {    if (node == null) {        log.debug("Give up sending metadata request since no node is available");        // mark the timestamp for no node available to connect        this.lastNoNodeAvailableMs = now;        return;    }    String nodeConnectionId = node.idString();    if (canSendRequest(nodeConnectionId)) {        this.metadataFetchInProgress = true;        MetadataRequest metadataRequest;        if (metadata.needMetadataForAllTopics())            metadataRequest = MetadataRequest.allTopics();        else            metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));        ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);        log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());        doSend(clientRequest, now);    } else if (connectionStates.canConnect(nodeConnectionId, now)) {        // we don't have a connection to this node right now, make one        log.debug("Initialize connection to node {} for sending metadata request", node.id());        initiateConnect(node, now);        // If initiateConnect failed immediately, this node will be put into blackout and we        // should allow immediately retrying in case there is another candidate node. If it        // is still connecting, the worst case is that we end up setting a longer timeout        // on the next round and then wait for the response.    } else { // connected, but can't send more OR connecting        // In either case, we just need to wait for a network event to let us know the selected        // connection might be usable again.        this.lastNoNodeAvailableMs = now;    }}

下面的脉络比较简单,次要就是一个if-else。

if 是否能够发送拉取元数据申请,能够就调用doSend()办法

else 如果不能够发送申请,阐明连贯还未建设,须要初始化连贯,调用initateConnection()办法

整个过程如下图所示:

拉取元数据前,是如何基于NIO建设连贯的?

maybeUpdae会依据canSendRequest、canConnect办法应用ClusterConnectionStates这个组件,判断是否和Broker建设过连贯,这个组件之前第二节咱们提到过,是NetworklClient记录和Broker连贯状况的组件。代码次要如下:

NetworklClient.java;private boolean canSendRequest(String node) {    return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);}ClusterConnectionStatespublic boolean canConnect(String id, long now) {    NodeConnectionState state = nodeState.get(id);    if (state == null)        return true;    else        return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;}

除了连贯状态,还做了其余额定逻辑判断,是很细节的判断了,咱们抓大放小,在这里不必深究。

次要晓得,目前是没有和Broker建设过任何连贯的,所以肯定会走到initiateConnect()这个办法,来建设连贯。让咱们一起来看下吧。

    /**     * Initiate a connection to the given node     */    private void initiateConnect(Node node, long now) {        String nodeConnectionId = node.idString();        try {            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());            this.connectionStates.connecting(nodeConnectionId, now);            selector.connect(nodeConnectionId,                             new InetSocketAddress(node.host(), node.port()),                             this.socketSendBuffer,                             this.socketReceiveBuffer);        } catch (IOException e) {            /* attempt failed, we'll try again after the backoff */            connectionStates.disconnected(nodeConnectionId, now);            /* maybe the problem is our metadata, update it */            metadataUpdater.requestUpdate();            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);        }    }    public void connecting(String id, long now) {        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));    }

外围脉络非常简单,就两句话:

1)connectionStates.connecting() 记录状态为连贯中,这个没啥好说的。

2)selector.connect() 通过Kafka封装的Selector执行connect办法,这个就是建设连贯的要害了。

Selector的connect办法就非常要害,咱们看下它的代码在做什么:

org.apache.kafka.common.network.Selector.java@Overridepublic void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {    if (this.channels.containsKey(id))        throw new IllegalStateException("There is already a connection for id " + id);    SocketChannel socketChannel = SocketChannel.open();    socketChannel.configureBlocking(false);    Socket socket = socketChannel.socket();    socket.setKeepAlive(true);    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)        socket.setSendBufferSize(sendBufferSize);    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)        socket.setReceiveBufferSize(receiveBufferSize);    socket.setTcpNoDelay(true);    boolean connected;    try {        connected = socketChannel.connect(address);    } catch (UnresolvedAddressException e) {        socketChannel.close();        throw new IOException("Can't resolve address: " + address, e);    } catch (IOException e) {        socketChannel.close();        throw e;    }    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);    key.attach(channel);    this.channels.put(id, channel);    if (connected) {        // OP_CONNECT won't trigger for immediately connected channels        log.debug("Immediately connected to node {}", channel.id());        immediatelyConnectedKeys.add(key);        key.interestOps(0);    }}PlaintextChannelBuilder.java    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {        KafkaChannel channel = null;        try {            PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);            Authenticator authenticator = new DefaultAuthenticator();            authenticator.configure(transportLayer, this.principalBuilder, this.configs);            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);        } catch (Exception e) {            log.warn("Failed to create channel due to ", e);            throw new KafkaException(e);        }        return channel;    }

下面connect()办法的外围脉络次要就是:

1)SocketChannel.open()创立了NIO的SocketChannel

2)设置了一些Sokect参数,通过SocketChannel发动了connect连贯(这个是NIO常见的操作,大家能够本人取搜一个Java原生NIO的HelloWorld,或者之后关注NIO成长记,就会对这个必定不会生疏了)

3)socketChannel向Selector注册register,并且指明关注建设连贯申请SelectionKey.OP_CONNECT,通过SelectionKey关联对应的SocketChannel

4)buildChannel将下面SocketChannel、Selector、SelectionKey整个关系封装到了KafkaChannel中,这里比拟坑的是,它还二次封装了一个对象叫做TransportLayer。并且通过 key.attach(channel);将KafkaChannel绑定了到了SelcetionKey下来

5) 通过Map<String, KafkaChannel> channels,缓存了KafkaChannel

整个逻辑如下图所示:

到这里initateConnect()办法就执行实现了,maybeupdate办法返回,接着进入第二次while循环的下一步,Selector.poll();

如下粉红线条所示:

Selector.poll();之前咱们就晓得它底层会调用nioSelector的select()阻塞期待是否有关怀的申请。

如果你相熟NIO的话,就晓得,如果之前发送的connect连贯建设胜利,那注册的Selectionkey有对应关怀的事件SelectionKey.OP_CONNECT,就会跳出阻塞。

这个过程如下图所示:

从上图来看,接着就肯定会执行pollSelectionKeys()办法了:

  private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {        Iterator<SelectionKey> iterator = selectionKeys.iterator();        while (iterator.hasNext()) {            SelectionKey key = iterator.next();            iterator.remove();            KafkaChannel channel = channel(key);            // register all per-connection metrics at once            sensors.maybeRegisterConnectionMetrics(channel.id());            lruConnections.put(channel.id(), currentTimeNanos);            try {                /* complete any connections that have finished their handshake (either normally or immediately) */                if (isImmediatelyConnected || key.isConnectable()) {                    if (channel.finishConnect()) {                        this.connected.add(channel.id());                        this.sensors.connectionCreated.record();                    } else                        continue;                }                /* if channel is not ready finish prepare */                if (channel.isConnected() && !channel.ready())                    channel.prepare();                /* if channel is ready read from any connections that have readable data */                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {                    NetworkReceive networkReceive;                    while ((networkReceive = channel.read()) != null)                        addToStagedReceives(channel, networkReceive);                }                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */                if (channel.ready() && key.isWritable()) {                    Send send = channel.write();                    if (send != null) {                        this.completedSends.add(send);                        this.sensors.recordBytesSent(channel.id(), send.size());                    }                }                /* cancel any defunct sockets */                if (!key.isValid()) {                    close(channel);                    this.disconnected.add(channel.id());                }            } catch (Exception e) {                String desc = channel.socketDescription();                if (e instanceof IOException)                    log.debug("Connection with {} disconnected", desc, e);                else                    log.warn("Unexpected error from {}; closing connection", desc, e);                close(channel);                this.disconnected.add(channel.id());            }        }    }

这个办法的逻辑看上去不太清晰,没关系,咱们能够debug看下:

你会发现这个办法次要就是在遍历有响应的SelectionKeys汇合,因为之前只注册了一个SelectioinKey,关系Connect类型的申请,所以这里咱们只遍历到了一个。

接着你一路断点就会发现,这个while循环外围执行如下一句话:

private final List<String> connected;if (channel.finishConnect()) {     this.connected.add(channel.id());     this.sensors.connectionCreated.record(); } else     continue; }
KafkaChannel.javapublic boolean finishConnect() throws IOException {    return transportLayer.finishConnect();}PlaintextTransportLayer.javapublic boolean finishConnect() throws IOException {    boolean connected = socketChannel.finishConnect();    if (connected)    key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);    return connected;}

下面if-else这段代码的外围脉络就是:

首先通过channel.finishConnect() 判断了连贯是否建设,底层实质就是NIO的socketChannel.finishConnect();,如果连贯建设,批改了SelectionKey关怀的操作次要是SelectionKey.OP_READ类型,不再是OP_CONNECT类型了。之后将建设连贯的ChannelId缓存了起来,在一个List<String> connected汇合中。

整体如下图所示:

poll办法就执行实现了,第二次while循环的第二步也就执行实现了,最初while循环还会执行一堆handle办法:

handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleTimedOutRequests(responses, updatedNow);

其实你都能够猜出来,建设连贯后会执行哪一个办法?没错,会进行handleConnections()的执行,其余办法压根执行不到,都是间接返回的。

handleConnections执行什么逻辑呢?

NetWorkClient.java   private void handleConnections() {        for (String node : this.selector.connected()) {            log.debug("Completed connection to node {}", node);            this.connectionStates.connected(node);        }    }Selector.javapublic List<String> connected() {    return this.connected;}    ClusterConnectionStates.java    public void connected(String id) {        NodeConnectionState nodeState = nodeState(id);        nodeState.state = ConnectionState.CONNECTED;    }

其实就是遍历了建设了Channel的Node(Broker),记录了这个Node的连贯状态为CONNECTED。(你还记得之前maybeUpdate的执行initiateConnect()时候是状态是CONNECTING么?)

到这里其实第二次while循环就执行实现了,第二次循环也是一样外围执行这三大步的,maybeUpdate()->poll()->handle结尾办法。次要做的事件就是和Broker通过NIO的形式建设了连贯。

而之前的第一次循环,maybeUpdate()->poll()->handle结尾办法,中次要就是poll()办法阻塞了下,其余什么都没有干。

第二次循环的整体过程,总结如下的大图:

通过这第二次循环逻辑,是不是你对Producer有了更相熟的意识了呢?

之后还有会再次执行第三次while循环甚至更多,都是一样的再次执行maybeUpdate()->poll()->handle结尾办法的逻辑。

发送元数据的拉取申请

Sender的再次执行第三次循环,第一步必定还是执行maybeUpdate(),而这次执行maybeUpdate(),连贯曾经建设,会执行另一段逻辑,doSend()办法,真正进行元数据的拉取。让咱们快来一起看下吧!

         /**         * Add a metadata request to the list of sends if we can make one         */        private void maybeUpdate(long now, Node node) {            if (node == null) {                log.debug("Give up sending metadata request since no node is available");                // mark the timestamp for no node available to connect                this.lastNoNodeAvailableMs = now;                return;            }            String nodeConnectionId = node.idString();            if (canSendRequest(nodeConnectionId)) {                this.metadataFetchInProgress = true;                MetadataRequest metadataRequest;                if (metadata.needMetadataForAllTopics())                    metadataRequest = MetadataRequest.allTopics();                else                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());                doSend(clientRequest, now);            } else if (connectionStates.canConnect(nodeConnectionId, now)) {                // we don't have a connection to this node right now, make one                log.debug("Initialize connection to node {} for sending metadata request", node.id());                initiateConnect(node, now);                // If initiateConnect failed immediately, this node will be put into blackout and we                // should allow immediately retrying in case there is another candidate node. If it                // is still connecting, the worst case is that we end up setting a longer timeout                // on the next round and then wait for the response.            } else { // connected, but can't send more OR connecting                // In either case, we just need to wait for a network event to let us know the selected                // connection might be usable again.                this.lastNoNodeAvailableMs = now;            }

这次执行到maybeUpdate的时候,会执行

//NetworkClient.javaprivate boolean canSendRequest(String node) {    return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);}//ClusterConnectionStates.javapublic boolean isConnected(String id) {    NodeConnectionState state = nodeState.get(id);    return state != null && state.state == ConnectionState.CONNECTED;}//Selector.javapublic boolean isChannelReady(String id) {    KafkaChannel channel = this.channels.get(id);    return channel != null && channel.ready();}//PlaintextTransportLayer.javapublic boolean ready() {    return true;}//InFlightRequests.javaprivate final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();public boolean canSendMore(String node) {    Deque<ClientRequest> queue = requests.get(node);    return queue == null || queue.isEmpty() ||        (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);}

下面通过一堆组件,当三个条件都是true才会执行,doSend办法。

connectionStates.isConnected(node):必定是ture了,因为连贯状态曾经记录为Connected了。

selector.isChannelReady(node) :之前建设的Kafkachannel缓存在了map中,channel.ready()默认永远返回ture,

inFlightRequests.canSendMore(node):requests队列非空并且队列元素数量小于maxInFlightRequestsPerConnection 默认5,这个配置即可。

第二次循环的时候,队列压根是空的,所以这个条件也是ture了。

/***这里波及了一个很要害的内存构造InFlightRequests 中的Map<String, Deque<ClientRequest>> requests,一个Map和双向队列组成的内存构造。之前剖析*Network的时候咱们提到过这个组件,那个时候只是通过正文晓得:InFlightRequests ,是示意已发送或正在发送但尚未收到响应的申请汇合。具体做什么的并不知道。*然而,这里咱们就能够看到,在发送申请前,申请request会进入这个内存构造进行暂存,和正文表白的很靠近了,常常会用来判断有没有待发送申请。*/

也就是说当连贯已建设后,第三次循环就会执行到doSend办法逻辑了。

如下图所示:

接着if如果通过就是执行了如下逻辑了:

if (canSendRequest(nodeConnectionId)) {    this.metadataFetchInProgress = true;    MetadataRequest metadataRequest;    if (metadata.needMetadataForAllTopics())        metadataRequest = MetadataRequest.allTopics();    else        metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));    ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);    log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());    doSend(clientRequest, now);}public RequestSend(String destination, RequestHeader header, Struct body) {    super(destination, serialize(header, body));    this.header = header;    this.body = body;}public static ByteBuffer serialize(RequestHeader header, Struct body) {    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());    header.writeTo(buffer);    body.writeTo(buffer);    buffer.rewind();    return buffer;}

首先能够看到,doSend前对申请参数做了各种档次的包装,最终对象序列化成ByteBuffer。(这里依照什么格局序列化成ByteBuffer的此时咱们临时不做钻研,之后钻研Kafka解决粘包和拆包问题的时候咱们会再次提到的)

具体细节我就不带大家看了,简略概括下就是:MetadataRequest->RequestHeader+Struct-=RequestSend(serialize办法转为ByteBuffer)->ClientRequest

包装好申请之后,调用了doSend办法:

private void doSend(ClientRequest request, long now) {    request.setSendTimeMs(now);    this.inFlightRequests.add(request);    selector.send(request.request());}//ClientRequest.javapublic RequestSend request() {    return request;}// Selector.java  public void send(Send send) {        KafkaChannel channel = channelOrFail(send.destination());        try {            channel.setSend(send);        } catch (CancelledKeyException e) {            this.failedSends.add(send.destination());            close(channel);        }    }        private KafkaChannel channelOrFail(String id) {        KafkaChannel channel = this.channels.get(id);        if (channel == null)            throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());        return channel;    }// KafkaChannel.javapublic void setSend(Send send) {    if (this.send != null)        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");    this.send = send;    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// PlaintextTransportLayer.javapublic void addInterestOps(int ops) {    key.interestOps(key.interestOps() | ops);}

这段办法就比拟有意思了,你会发现doSend次要脉络如下:

1)将申请暂存到了inFlightRequests内存构造中

2)Selector从map中获取到之前缓存的KafkaChannel

3) KafkaChannel记录了发送的申请数据RequestSend,并且补充了对写申请的关注(在之前连贯建设后,勾销了OP_CONNECT关注,减少关注OP_READ,你还记得么?)

下面的操作根本就是NIO的惯例操作了,获取Channel,设置关注事件。然而...

channel.write操作呢?这里并没有写数据进来呀?所以KafkaChannel这个办法叫setSend(Send send),只是设置了待发送的对象,和关怀的OP_WRITE而已。

整个过程如下图所示:

doSend办法执行后,metadataUpdater.maybeUpdate的办法也就返回了,接着就会进入第三次循环的第二大步,selector.poll()办法,最初会执行handle结尾的办法。这个置信你曾经不生疏了。

而selector.poll()外围就两步:

1) selector.select() 阻塞期待服务端返回关怀的事件

2)执行pollSelectionKeys(),遍历所有SelectionKeys,依据SelectionKey关怀的事件,执行不同的解决(之前建设连贯的时候,只是依据OP_CONNECT,记录了连贯胜利的ChannelId。)

这里因为客户端关怀的OP_READ和OP_WRITE事件,所以第三次执行循环的时候,这里selector.select() 阻塞会跳出,执行前面pollSelectionKeys()的逻辑。

这里我间接截取了要害逻辑进去,Selector.java 第三次while循环执行时,pollSelectionKeys办法 遍历SelectionKeys的外围逻辑如下:

//Selector.java 第三次while循环执行时,pollSelectionKeys办法 遍历SelectionKeys的外围逻辑if (channel.ready() && key.isWritable()) {    Send send = channel.write();    if (send != null) {    this.completedSends.add(send);    this.sensors.recordBytesSent(channel.id(), send.size());    }}

下面的外围次要就是:

1)通过channel.write()将拉取元数据的申请发送进来!

2)发送实现后,记录曾经发送胜利的申请到List<Send> completedSends;中

在这里咱们终于看到了channel.write() 而且最终底层是通过的nio的socketChannel.write,将之前序列化好的ByteBuffer写出去的。而且发送实现会移除SelectionKey.OP_WRITE的关注,不再写出数据了。

//KafkaChannel.javapublic Send write() throws IOException {    Send result = null;    if (send != null && send(send)) {        result = send;        send = null;    }    return result;}private boolean send(Send send) throws IOException {    send.writeTo(transportLayer);    if (send.completed())        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);    return send.completed();}//PlaintextTransportLayer.java    public long write(ByteBuffer[] srcs) throws IOException {        return socketChannel.write(srcs);    }

数据终于发送实现了,整个过程能够总结如下图所示:

while循环的第三次执行,曾经执行了maybeUpdat()和poll()办法了,最初就是执行handle结尾的办法了。

   handleCompletedSends(responses, updatedNow);   handleCompletedReceives(responses, updatedNow);   handleDisconnections(responses, updatedNow);   handleConnections();   handleTimedOutRequests(responses, updatedNow);

这些办法中必定的执行是handleCompletedSends办法了。

    private void handleCompletedSends(List<ClientResponse> responses, long now) {        // if no response is expected then when the send is completed, return it        for (Send send : this.selector.completedSends()) {            ClientRequest request = this.inFlightRequests.lastSent(send.destination());            if (!request.expectResponse()) {                this.inFlightRequests.completeLastSent(send.destination());                responses.add(new ClientResponse(request, now, false, null));            }        }    }

这里会从之前暂存的inFlightRequests取出来发送的申请,request.expectResponse()默认是true,所以if条件不会成立,handleCompletedSends相当于什么都没做。从正文看这个办法是为了解决:"如果没有响应,那么当发送实现时,返回它。" 也就是说这个逻辑不是要害逻辑,咱们抓大放小,跳过就行了。

随着你浏览源码的教训晋升, 你会常常发现这种不是外围的逻辑。此时你肯定要学会取舍,学会抓大放小的思维。

既然如此,handle结尾的办法其实就执行实现了。该进入第四次while循环了....

接管拉取的元数据,唤醒KafkaProduer.Send办法

其实你能够想到,第四次while循环会做些什么。当然是接管服务端返回的元数据,唤醒之前wait的KafkaProduer.Send办法了。有了之前3次while循环的教训,这次让咱们间接找到外围逻辑,看看它是如何做的,一起疾速的看一下吧!

1)首先执行的maybeUpdate:

第四次while循环,maybeUpdate中waitForMetadataFetch会计算出一个非0的值,导致maybeUpdate和第一次循环一样,什么都不会执行

long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;

2)接着执行selector.poll(),会阻塞在select()办法,然而当服务器返回数据,因为咱们SelectionKey上目前只关注了OP_READ,所以会此时会跳出阻塞执行对应的pollSelectionKeys中的逻辑

/* if channel is ready read from any connections that have readable data */if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {    NetworkReceive networkReceive;    while ((networkReceive = channel.read()) != null)    addToStagedReceives(channel, networkReceive);}Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;/**   * adds a receive to staged receives  */private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {    if (!stagedReceives.containsKey(channel))        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());    Deque<NetworkReceive> deque = stagedReceives.get(channel);    deque.add(receive);}

这段逻辑其实就是就是承受ByteBuffer为放入NetworkReceive对象中,底层实质调用的是SocketChannel的read()办法,就是常见的NIO操作而已。和发送数据的是相似的。底层这里就不带大家看了,置信你本人能够看明确的。

除了承受数据到NetworkReceive对象中,还会将承受的数据暂存到一个双端队列Deque中。Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;

3) 执行完poll办法后,就该执行handle结尾的办法了,此次执行的是handleCompletedReceives()办法:

    /**     * Handle any completed receives and update the response list with the responses received.     *     * @param responses The list of responses to update     * @param now The current time     */    private void handleCompletedReceives(List<ClientResponse> responses, long now) {        for (NetworkReceive receive : this.selector.completedReceives()) {            String source = receive.source();            ClientRequest req = inFlightRequests.completeNext(source);            Struct body = parseResponse(receive.payload(), req.request().header());            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))                responses.add(new ClientResponse(req, now, false, body));        }    }       public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {            short apiKey = req.request().header().apiKey();            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {                handleResponse(req.request().header(), body, now);                return true;            }            return false;        }

和这个办法脉络很简略:

1)依据之前暂存的申请ClientRequest,从NetworkReceive找到对应的响应,接着进行一系列的解析ButeBuffer为一个Struct对象。

2)执行DefaultMetadataUpdater的maybeHandleCompletedReceive办法

之后的DefaultMetadataUpdater的maybeHandleCompletedReceive这个办法有做什么的?

        private void handleResponse(RequestHeader header, Struct body, long now) {            this.metadataFetchInProgress = false;            MetadataResponse response = new MetadataResponse(body);            Cluster cluster = response.cluster();            // check if any topics metadata failed to get updated            Map<String, Errors> errors = response.errors();            if (!errors.isEmpty())                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being            // created which means we will get errors and no nodes until it exists            if (cluster.nodes().size() > 0) {                this.metadata.update(cluster, now);            } else {                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());                this.metadata.failedUpdate(now);            }  }    public synchronized void update(Cluster cluster, long now) {        this.needUpdate = false;        this.lastRefreshMs = now;        this.lastSuccessfulRefreshMs = now;        this.version += 1;        for (Listener listener: listeners)            listener.onMetadataUpdate(cluster);        // Do this after notifying listeners as subscribed topics' list can be changed by listeners        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;        notifyAll();        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);    }

下面的代码外围脉络如下:

1) maybeHandleCompletedReceive会将Strut对象转为MetadataResponse之后转为Cluster对象

2)最初依据Cluster中的Nodes信息,如果大于0,执行metadata.update()办法,执行一些Listener回调,最初要害的是metadata.notifyAll() 唤醒了之前阻塞期待的KafkaProducer.send()

整个过程总结如下图所示:

总结

到此元数据的拉取源码原理咱们就钻研的完了。其实当你钻研实现之后,你会发现,咱们执行了外围while循环4次,随着反复反复的过程,如同源码原理并没有多难了。

其实就是这样的,很多时候,简略的事件反复做,只有多思考多推敲,就会发现法则,就会缓缓了解事件的实质。 这个思维比咱们钻研分明Kafka拉取元数据的源码原理重要的多。

另外就是元数据拉取说白了其实并不简单,无非都是连贯建设,申请发送,申请响应。是Kafka应用了一些有意思的机制,wait+notifyAll机制和NIO的形式而已。

之前我始终给你们画的是具体的逻辑图,你们能够本人画了一个简图,总结下它的逻辑。如果本人能画图,给他人解释明确,就阐明你真正了解了。

不过其实在这个过程中Kafka还是做了很多思考的,你能够思考下,它的一些亮点和劣势,就像之前ZK选举原理钻研后一样。你思考出的思路和想法,远远大于常识自身。你能够留言在评论去给我咱们一起探讨!

Kafka成长记定位尽管偏差于晋升技术深度,如果你纯熟的应用过NIO,当然很好了解元数据拉取过程中的NIO常识。

如果不太理解NIO的,能够本人百度下NIO的基本知识。或者关注我之后出的《NIO小白起步营》

咱们下一节再见!

本文由博客一文多发平台 OpenWrite 公布!