共计 23864 个字符,预计需要花费 60 分钟才能阅读完成。
上一节结尾,咱们总结道:初始化 KafkaProducer 时并没有去拉取元数据,然而创立了 Selector 组件,启动了 Sender 线程,select 阻塞期待申请响应。因为还没有发送任何申请,所以初始化时并没有去真正拉取元数据。
真正拉取元数据是在第一次 send 办法调用时,会唤醒唤醒 Selector 之前阻塞的 select(), 进入第二次 while 循环,从而发送拉取元数据申请,并且通过 Obejct.wait 的机制期待 60s,等到从 Broker 拉取元数据胜利后,才会继续执行真正的生产音讯的申请,否则会报拉取元数据超时异样。
如下图:
而唤醒 Selector 的 select 之后应该会进入第二次 while 循环,那第二次 while 循环如何发送申请拉取元数据申请,并且在胜利后 notifyAll()进行唤醒操作的呢?
咱们明天来一起看一下。
第二次 while 循环 - 开始触发元数据拉取
唤醒了阻塞的 select,你还记得阻塞后的逻辑么?
唤醒后会依据 nioSelector.select()返回的 readKeys 这个 int 数字,如果大于 0 如执行 pollSelectionKeys 的一些操作,因为间接被 wakeUp(), 理论 readKeys 是 0,所以 poll 办法间接就返回了,不会执行 pollSelectionKeys 的解决。
而且 Selector 的 poll 办法返回后,因为 pollSelectionKeys 没有执行,所以之后一系列办法 handleCompletedSends、handleCompletedReceives、handleDisconnections、handleConnections、handleTimedOutRequests 均没有执行。(你能够本人尝试断点下,就会发现。)
下面的逻辑执行实现,也就说第一次循环会完结,从新进行第二次循环。整体过程如下图所示:(次要执行了灰色的备注标注的流程)
第二次循环 maybeUpdate 执行的起因
既然进入第二次循环,就会从新执行将从新执行 maybeUpdate()、poll()、handle 结尾的这些办法。
你还记得 maybeUpdate 的外围脉络么?它次要是依据 3 个工夫决定了 metadataTimeout 是否为 0,来决定是否执行。代码如下:
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
maybeUpdate(now, node);
}
return metadataTimeout;
}
public synchronized long timeToNextUpdate(long nowMs) {long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
第一次循环的时候 metadataTimeout 失去的是非 0,而第二次循环这个值其实曾经变成 0 了。
因为咱们在上一节 send 的在 sender.wakeyUp()前,已经执行了 metadata.requestUpdate();
这一行代码,它将 needUpdate 这个标记改为了 true。会让决定 metadataTimeout 的 3 个工夫值中的 timeToNextMetadataUpdate 也变为 0,也就是说 timeToNextMetadataUpdate、timeToNextReconnectAttempt、waitForMetadataFetch 都变成了 0,天然 metadataTimeout 也是 0 了。
如下图所示:
所以第二次循环的时候会真正执行 maybeUpdae 的逻辑。而不像之前第一次,什么都没做。
而如果 metadataTimeout=0,次要执行了 2 个办法:
1)leastLoadedNode 这个其实从正文能够看出,是在抉择一个 Broker 节点,从它那里拉取元数据。抉择的规范必定最好连贯过的 Broker,并且待发送数据少的节点,这些逻辑具体咱们就认真钻研了。
2)maybeUpdate 这个办法其实十分要害,是次要建设连贯或者发动拉取元数据申请的逻辑
所以这里咱们次要看下这个 mayBeUpdate 的次要逻辑:
/**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
}
下面的脉络比较简单,次要就是一个 if-else。
if 是否能够发送拉取元数据申请,能够就调用 doSend()办法
else 如果不能够发送申请,阐明连贯还未建设,须要初始化连贯,调用 initateConnection()办法
整个过程如下图所示:
拉取元数据前,是如何基于 NIO 建设连贯的?
maybeUpdae 会依据 canSendRequest、canConnect 办法应用 ClusterConnectionStates 这个组件,判断是否和 Broker 建设过连贯,这个组件之前第二节咱们提到过,是 NetworklClient 记录和 Broker 连贯状况的组件。代码次要如下:
NetworklClient.java;
private boolean canSendRequest(String node) {return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
ClusterConnectionStates
public boolean canConnect(String id, long now) {NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
除了连贯状态,还做了其余额定逻辑判断,是很细节的判断了,咱们抓大放小,在这里不必深究。
次要晓得,目前是没有和 Broker 建设过任何连贯的,所以肯定会走到 initiateConnect()这个办法,来建设连贯。让咱们一起来看下吧。
/**
* Initiate a connection to the given node
*/
private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();
try {log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
public void connecting(String id, long now) {nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
}
外围脉络非常简单,就两句话:
1)connectionStates.connecting()记录状态为连贯中,这个没啥好说的。
2)selector.connect() 通过 Kafka 封装的 Selector 执行 connect 办法,这个就是建设连贯的要害了。
Selector 的 connect 办法就非常要害,咱们看下它的代码在做什么:
org.apache.kafka.common.network.Selector.java
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id" + id);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {socketChannel.close();
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
PlaintextChannelBuilder.java
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
KafkaChannel channel = null;
try {PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {log.warn("Failed to create channel due to", e);
throw new KafkaException(e);
}
return channel;
}
下面 connect()办法的外围脉络次要就是:
1)SocketChannel.open()创立了 NIO 的 SocketChannel
2)设置了一些 Sokect 参数,通过 SocketChannel 发动了 connect 连贯(这个是 NIO 常见的操作,大家能够本人取搜一个 Java 原生 NIO 的 HelloWorld,或者之后关注 NIO 成长记,就会对这个必定不会生疏了)
3)socketChannel 向 Selector 注册 register,并且指明关注建设连贯申请 SelectionKey.OP_CONNECT,通过 SelectionKey 关联对应的 SocketChannel
4)buildChannel 将下面 SocketChannel、Selector、SelectionKey 整个关系封装到了 KafkaChannel 中,这里比拟坑的是,它还二次封装了一个对象叫做 TransportLayer。并且通过 key.attach(channel); 将 KafkaChannel 绑定了到了 SelcetionKey 下来
5) 通过 Map<String, KafkaChannel> channels,缓存了 KafkaChannel
整个逻辑如下图所示:
到这里 initateConnect()办法就执行实现了,maybeupdate 办法返回,接着进入第二次 while 循环的下一步,Selector.poll();
如下粉红线条所示:
而 Selector.poll(); 之前咱们就晓得它底层会调用 nioSelector 的 select() 阻塞期待是否有关怀的申请。
如果你相熟 NIO 的话,就晓得,如果之前发送的 connect 连贯建设胜利,那注册的 Selectionkey 有对应关怀的事件 SelectionKey.OP_CONNECT,就会跳出阻塞。
这个过程如下图所示:
从上图来看,接着就肯定会执行 pollSelectionKeys()办法了:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {if (channel.finishConnect()) {this.connected.add(channel.id());
this.sensors.connectionCreated.record();} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {Send send = channel.write();
if (send != null) {this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
这个办法的逻辑看上去不太清晰,没关系,咱们能够 debug 看下:
你会发现这个办法次要就是在遍历有响应的 SelectionKeys 汇合,因为之前只注册了一个 SelectioinKey, 关系 Connect 类型的申请,所以这里咱们只遍历到了一个。
接着你一路断点就会发现,这个 while 循环外围执行如下一句话:
private final List<String> connected;
if (channel.finishConnect()) {this.connected.add(channel.id());
this.sensors.connectionCreated.record();} else
continue;
}
KafkaChannel.java
public boolean finishConnect() throws IOException {return transportLayer.finishConnect();
}
PlaintextTransportLayer.java
public boolean finishConnect() throws IOException {boolean connected = socketChannel.finishConnect();
if (connected)
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
下面 if-else 这段代码的外围脉络就是:
首先通过 channel.finishConnect() 判断了连贯是否建设,底层实质就是 NIO 的 socketChannel.finishConnect();,如果连贯建设,批改了 SelectionKey 关怀的操作次要是 SelectionKey.OP_READ 类型,不再是 OP_CONNECT 类型了。之后将建设连贯的 ChannelId 缓存了起来,在一个 List<String> connected 汇合中。
整体如下图所示:
poll 办法就执行实现了,第二次 while 循环的第二步也就执行实现了,最初 while 循环还会执行一堆 handle 办法:
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
其实你都能够猜出来,建设连贯后会执行哪一个办法?没错,会进行 handleConnections()的执行,其余办法压根执行不到,都是间接返回的。
handleConnections 执行什么逻辑呢?
NetWorkClient.java
private void handleConnections() {for (String node : this.selector.connected()) {log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
Selector.java
public List<String> connected() {return this.connected;}
ClusterConnectionStates.java
public void connected(String id) {NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CONNECTED;
}
其实就是遍历了建设了 Channel 的 Node(Broker), 记录了这个 Node 的连贯状态为 CONNECTED。(你还记得之前 maybeUpdate 的执行 initiateConnect()时候是状态是 CONNECTING 么?)
到这里其实第二次 while 循环就执行实现了,第二次循环也是一样外围执行这三大步的,maybeUpdate()->poll()->handle 结尾办法。次要做的事件就是和 Broker 通过 NIO 的形式建设了连贯。
而之前的第一次循环,maybeUpdate()->poll()->handle 结尾办法,中次要就是 poll()办法阻塞了下,其余什么都没有干。
第二次循环的整体过程,总结如下的大图:
通过这第二次循环逻辑,是不是你对 Producer 有了更相熟的意识了呢?
之后还有会再次执行第三次 while 循环甚至更多,都是一样的再次执行 maybeUpdate()->poll()->handle 结尾办法的逻辑。
发送元数据的拉取申请
Sender 的再次执行第三次循环,第一步必定还是执行 maybeUpdate(), 而这次执行 maybeUpdate(),连贯曾经建设,会执行另一段逻辑,doSend()办法,真正进行元数据的拉取。让咱们快来一起看下吧!
/**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdate(long now, Node node) {if (node == null) {log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
这次执行到 maybeUpdate 的时候,会执行
//NetworkClient.java
private boolean canSendRequest(String node) {return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
//ClusterConnectionStates.java
public boolean isConnected(String id) {NodeConnectionState state = nodeState.get(id);
return state != null && state.state == ConnectionState.CONNECTED;
}
//Selector.java
public boolean isChannelReady(String id) {KafkaChannel channel = this.channels.get(id);
return channel != null && channel.ready();}
//PlaintextTransportLayer.java
public boolean ready() {return true;}
//InFlightRequests.java
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
public boolean canSendMore(String node) {Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
下面通过一堆组件,当三个条件都是 true 才会执行,doSend 办法。
connectionStates.isConnected(node):必定是 ture 了,因为连贯状态曾经记录为 Connected 了。
selector.isChannelReady(node):之前建设的 Kafkachannel 缓存在了 map 中,channel.ready()默认永远返回 ture,
inFlightRequests.canSendMore(node):requests 队列非空并且队列元素数量小于 maxInFlightRequestsPerConnection 默认 5,这个配置即可。
第二次循环的时候,队列压根是空的,所以这个条件也是 ture 了。
/**
* 这里波及了一个很要害的内存构造 InFlightRequests 中的 Map<String, Deque<ClientRequest>> requests,一个 Map 和双向队列组成的内存构造。之前剖析 *Network 的时候咱们提到过这个组件,那个时候只是通过正文晓得:InFlightRequests,是示意已发送或正在发送但尚未收到响应的申请汇合。具体做什么的并不知道。* 然而,这里咱们就能够看到,在发送申请前,申请 request 会进入这个内存构造进行暂存,和正文表白的很靠近了,常常会用来判断有没有待发送申请。*/
也就是说当连贯已建设后,第三次循环就会执行到 doSend 办法逻辑了。
如下图所示:
接着 if 如果通过就是执行了如下逻辑了:
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
}
public RequestSend(String destination, RequestHeader header, Struct body) {super(destination, serialize(header, body));
this.header = header;
this.body = body;
}
public static ByteBuffer serialize(RequestHeader header, Struct body) {ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
buffer.rewind();
return buffer;
}
首先能够看到,doSend 前对申请参数做了各种档次的包装,最终对象序列化成 ByteBuffer。(这里依照什么格局序列化成 ByteBuffer 的此时咱们临时不做钻研,之后钻研 Kafka 解决粘包和拆包问题的时候咱们会再次提到的)
具体细节我就不带大家看了,简略概括下就是:MetadataRequest->RequestHeader+Struct-=RequestSend(serialize 办法转为 ByteBuffer)->ClientRequest
包装好申请之后,调用了 doSend 办法:
private void doSend(ClientRequest request, long now) {request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
//ClientRequest.java
public RequestSend request() {return request;}
// Selector.java
public void send(Send send) {KafkaChannel channel = channelOrFail(send.destination());
try {channel.setSend(send);
} catch (CancelledKeyException e) {this.failedSends.add(send.destination());
close(channel);
}
}
private KafkaChannel channelOrFail(String id) {KafkaChannel channel = this.channels.get(id);
if (channel == null)
throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id" + id + "existing connections" + channels.keySet());
return channel;
}
// KafkaChannel.java
public void setSend(Send send) {if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer.java
public void addInterestOps(int ops) {key.interestOps(key.interestOps() | ops);
}
这段办法就比拟有意思了,你会发现 doSend 次要脉络如下:
1)将申请暂存到了 inFlightRequests 内存构造中
2)Selector 从 map 中获取到之前缓存的 KafkaChannel
3) KafkaChannel 记录了发送的申请数据 RequestSend,并且补充了对写申请的关注(在之前连贯建设后,勾销了 OP_CONNECT 关注,减少关注 OP_READ,你还记得么?)
下面的操作根本就是 NIO 的惯例操作了,获取 Channel,设置关注事件。然而 …
channel.write 操作呢?这里并没有写数据进来呀?所以KafkaChannel 这个办法叫 setSend(Send send),只是设置了待发送的对象,和关怀的 OP_WRITE 而已。
整个过程如下图所示:
doSend 办法执行后,metadataUpdater.maybeUpdate 的办法也就返回了,接着就会进入第三次循环的第二大步,selector.poll()办法,最初会执行 handle 结尾的办法。这个置信你曾经不生疏了。
而 selector.poll()外围就两步:
1) selector.select() 阻塞期待服务端返回关怀的事件
2)执行 pollSelectionKeys(),遍历所有 SelectionKeys,依据 SelectionKey 关怀的事件,执行不同的解决(之前建设连贯的时候,只是依据 OP_CONNECT,记录了连贯胜利的 ChannelId。)
这里因为客户端关怀的 OP_READ 和 OP_WRITE 事件,所以第三次执行循环的时候,这里 selector.select() 阻塞会跳出,执行前面 pollSelectionKeys()的逻辑。
这里我间接截取了要害逻辑进去,Selector.java 第三次 while 循环执行时,pollSelectionKeys 办法 遍历 SelectionKeys 的外围逻辑如下:
//Selector.java 第三次 while 循环执行时,pollSelectionKeys 办法 遍历 SelectionKeys 的外围逻辑
if (channel.ready() && key.isWritable()) {Send send = channel.write();
if (send != null) {this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
下面的外围次要就是:
1)通过 channel.write()将拉取元数据的申请发送进来!
2)发送实现后,记录曾经发送胜利的申请到 List<Send> completedSends; 中
在这里咱们终于看到了 channel.write() 而且最终底层是通过的 nio 的 socketChannel.write,将之前序列化好的 ByteBuffer 写出去的。而且发送实现会移除 SelectionKey.OP_WRITE 的关注,不再写出数据了。
//KafkaChannel.java
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();}
//PlaintextTransportLayer.java
public long write(ByteBuffer[] srcs) throws IOException {return socketChannel.write(srcs);
}
数据终于发送实现了,整个过程能够总结如下图所示:
while 循环的第三次执行,曾经执行了 maybeUpdat()和 poll()办法了,最初就是执行 handle 结尾的办法了。
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
这些办法中必定的执行是 handleCompletedSends 办法了。
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}
}
这里会从之前暂存的 inFlightRequests 取出来发送的申请,request.expectResponse()默认是 true,所以 if 条件不会成立,handleCompletedSends 相当于什么都没做。从正文看这个办法是为了解决:” 如果没有响应,那么当发送实现时,返回它。” 也就是说这个逻辑不是要害逻辑,咱们抓大放小,跳过就行了。
随着你浏览源码的教训晋升,你会常常发现这种不是外围的逻辑。此时你肯定要学会取舍,学会抓大放小的思维。
既然如此,handle 结尾的办法其实就执行实现了。该进入第四次 while 循环了 ….
接管拉取的元数据,唤醒 KafkaProduer.Send 办法
其实你能够想到,第四次 while 循环会做些什么。当然是接管服务端返回的元数据,唤醒之前 wait 的 KafkaProduer.Send 办法了。有了之前 3 次 while 循环的教训,这次让咱们间接找到外围逻辑,看看它是如何做的,一起疾速的看一下吧!
1)首先执行的 maybeUpdate:
第四次 while 循环,maybeUpdate 中 waitForMetadataFetch 会计算出一个非 0 的值,导致 maybeUpdate 和第一次循环一样,什么都不会执行
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
2)接着执行 selector.poll(),会阻塞在 select()办法,然而当服务器返回数据,因为咱们 SelectionKey 上目前只关注了 OP_READ,所以会此时会跳出阻塞执行对应的 pollSelectionKeys 中的逻辑
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;/**
* adds a receive to staged receives
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
这段逻辑其实就是就是承受 ByteBuffer 为放入 NetworkReceive 对象中,底层实质调用的是 SocketChannel 的 read()办法,就是常见的 NIO 操作而已。和发送数据的是相似的。底层这里就不带大家看了,置信你本人能够看明确的。
除了承受数据到 NetworkReceive 对象中,还会将承受的数据暂存到一个双端队列 Deque 中。Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
3) 执行完 poll 办法后,就该执行 handle 结尾的办法了 ,此次执行的是 handleCompletedReceives() 办法:
/**
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
和这个办法脉络很简略:
1)依据之前暂存的申请 ClientRequest,从 NetworkReceive 找到对应的响应,接着进行一系列的解析 ButeBuffer 为一个 Struct 对象。
2)执行 DefaultMetadataUpdater 的 maybeHandleCompletedReceive 办法
之后的 DefaultMetadataUpdater 的 maybeHandleCompletedReceive 这个办法有做什么的?
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {this.metadata.update(cluster, now);
} else {log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
下面的代码外围脉络如下:
1) maybeHandleCompletedReceive 会将 Strut 对象转为 MetadataResponse 之后转为 Cluster 对象
2)最初依据 Cluster 中的 Nodes 信息,如果大于 0,执行 metadata.update()办法,执行一些 Listener 回调,最初要害的是 metadata.notifyAll() 唤醒了之前阻塞期待的 KafkaProducer.send()
整个过程总结如下图所示:
总结
到此元数据的拉取源码原理咱们就钻研的完了。其实当你钻研实现之后,你会发现,咱们执行了外围 while 循环 4 次,随着反复反复的过程,如同源码原理并没有多难了。
其实就是这样的,很多时候,简略的事件反复做,只有多思考多推敲,就会发现法则,就会缓缓了解事件的实质。 这个思维比咱们钻研分明 Kafka 拉取元数据的源码原理重要的多。
另外就是元数据拉取说白了其实并不简单,无非都是连贯建设,申请发送,申请响应。是 Kafka 应用了一些有意思的机制,wait+notifyAll 机制和 NIO 的形式而已。
之前我始终给你们画的是具体的逻辑图,你们能够本人画了一个简图,总结下它的逻辑。如果本人能画图,给他人解释明确,就阐明你真正了解了。
不过其实在这个过程中 Kafka 还是做了很多思考的,你能够思考下,它的一些亮点和劣势,就像之前 ZK 选举原理钻研后一样。你思考出的思路和想法,远远大于常识自身。你能够留言在评论去给我咱们一起探讨!
Kafka 成长记定位尽管偏差于晋升技术深度,如果你纯熟的应用过 NIO,当然很好了解元数据拉取过程中的 NIO 常识。
如果不太理解 NIO 的,能够本人百度下 NIO 的基本知识。或者关注我之后出的《NIO 小白起步营》
咱们下一节再见!
本文由博客一文多发平台 OpenWrite 公布!