上一篇讲了Kafka Producer发送音讯的主体流程,这一篇咱们关注下Kafka的网络层是如何实现的。
对于发送音讯而言,Producer是客户端,Broker是服务器端。
Kafka应用了JavaNIO向服务器发送音讯,所以在这之前须要理解java nio的基本知识。这次网络层源码剖析从metadata request切入。
开局一张图
下面是Kafka producer网络层的主体流程,先看下有一个大体印象。
Kafka的底层应用的是Java NIO,Kafka中针对NIO的Selector的封装类也叫Selector,对Channel的封装类叫做KafkaChannel。前面如果没有非凡阐明,Selector都是指Kafka中的Selector。
metadata request
先来回顾下Kafka 发送音讯的代码
KafkaProducer<String,String> producer = createProducer();for (int i = 0; i < 10;i++) { producer.send(new ProducerRecord<>("codeTest","key" + (i+1),"value" + (i+1)));}
下面的代码中首先会初始化KafkaProducer,在初始化KafkaProducer的时候,同时咱们也会初始化Kafka发送音讯的客户端
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), ...);
创立NetworkClient(implements KafkaClient)的同时会创立一个Selector,这个是对java.nio.Selector的封装,发送申请,接管响应,解决连贯实现以及现有的断开连接都是通过它的poll()办法调用实现的。
期待metadata更新
咱们都晓得Kafka读写音讯都是通过leader的,只有晓得了leader能力发送音讯到kafka,在咱们的你好,Kafka
)一文中,咱们讲了首先会发动metadata request,从中就能够获取到集群元信息(leader,partiton,ISR列表等),那么是在哪里发动metadata request的呢?
调用KafkaProducer的doSend()(send()-->doSend())办法时,第一步就是通过waitOnMetadata期待集群元数据(topic,partition,node等)可用。
Cluster cluster = metadata.fetch();Integer partitionsCount = cluster.partitionCountForTopic(topic);//如果此时曾经有partition的信息了if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0);do { int version = metadata.requestUpdate(); //唤醒Sender()线程,Sender会被poll阻塞(参见java.nio.Channels.Selector.poll()) sender.wakeup(); //期待metadata的更新,会始终阻塞直到以后版本大于最近一次版本 metadata.awaitUpdate(version, remainingWaitMs); cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic);} while (partitionsCount == null);
如果metadata中不蕴含对应topic的metadata信息,那么就申请更新metadata,如果没有更新则始终会在这个while循环中,这个循环次要做了以下几件事
- 调用metadata.requestUpdate();将needUpdate属性设置为true(示意强制更新),返回以后version(用于判断是否更新过了)
- 唤醒Sender线程,实际上是唤醒NetworkClient中Selector,防止Selector始终在poll中期待
- 执行metadata.awaitUpdate期待metadata的更新,未更新则始终阻塞。
//期待元数据更新,直到以后版本大于咱们晓得的最新版本public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while ((this.version <= lastVersion) && !isClosed()) { if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; remainingWaitMs = maxWaitMs - elapsed; }}
发动申请
当初咱们晓得了会期待元数据更新,那么到底是在哪里更新的呢?下面有讲到唤醒了Sender线程,在run()办法中会去调用KafkaClient.poll()办法,这里会对metadata request进行解决
@Overridepublic List<ClientResponse> poll(long timeout, long now) { //解决metadata long metadataTimeout = metadataUpdater.maybeUpdate(now); //这里会调用java.nio.Selector.select()办法 this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); ... //这里会对metadata response进行解决,就能够获取到kafka metadata的信息 handleCompletedReceives(responses, updatedNow); ... return responses;}
当初看看metadata request如何发送的
private long maybeUpdate(long now, Node node) { String nodeConnectionId = node.idString(); //曾经连贯,且Channel曾经ready,且没有申请被发送到给定节点 if (canSendRequest(nodeConnectionId, now)) { //这里会最终调用NetworkClient.doSend()办法,实际上是将Send对象设置到KafkaChannel中,并没有进行网络IO sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); return defaultRequestTimeoutMs; } if (connectionStates.canConnect(nodeConnectionId, now)) { //初始化连贯 initiateConnect(node, now); return reconnectBackoffMs; } return Long.MAX_VALUE;}
下面的代码先查看是否能够发送申请,如果能够发送申请就间接将数据设置到KafkaChannel中。如果不能发送就查看以后是否能够连贯,如果能够则初始化连贯,初始化连贯的代码在Selector.connect()办法中
@Overridepublic void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { SocketChannel socketChannel = SocketChannel.open(); //设置keepAlive以及socket的一些属性(比方发送数据缓存区大小以及接收数据缓冲区大小) configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize); //实际上调用socketChannel.connect(address); boolean connected = doConnect(socketChannel, address); //将socketChannel注册到nioSelector中,同时将生成KafkaChannel(对java.nio.Channel的封装) //并将KafkaChannel绑定到java.nio.SelectionKey中 SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);// connectct为true代表该连贯不会再触发CONNECT事件,所以这里要独自解决 if (connected) { // 退出到一个独自的汇合中 immediatelyConnectedKeys.add(key); // 勾销对该连贯的CONNECT事件的监听 key.interestOps(0); }}private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { SelectionKey key = socketChannel.register(nioSelector, interestedOps); KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key); this.channels.put(id, channel); return key;}
如果相熟NIO的话,下面的代码看上去就很相熟,次要就是设置设置channel以及selectionKey的关系。
须要留神的是doConnect()办法返回为true的状况,在非阻塞模式下,对于local connection,连贯可能马上就建设好了,那该办法会返回true,对于这种状况,不会再触发之后的connect事件。因而kafka用一个独自的汇合immediatelyConnectedKeys将这些非凡的连贯记录下来。在接下来的步骤会进行非凡解决。
这里要留意到KafkaChannel就是在这里被创立的。到这里咱们就要来看看KafkaChannel和Selector有哪些属性是须要咱们留神的
public class KafkaChannel { //继承java.nio.channels.Channel,可读可写,对socketChannel的封装 private final TransportLayer transportLayer; //通过它来创立Buffer和回收Buffer private final MemoryPool memoryPool; //收到的数据 private NetworkReceive receive; //发送的数据 private Send send;}public class Selector implements Selectable, AutoCloseable { //java nio中的Selector private final java.nio.channels.Selector nioSelector; //kafka服务器节点和Channel之间对应关系 private final Map<String, KafkaChannel> channels; //发送实现的申请 private final List<Send> completedSends; //残缺的音讯响应 private final List<NetworkReceive> completedReceives; //暂存的音讯响应 private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; //立刻连贯上的SelectionKey private final Set<SelectionKey> immediatelyConnectedKeys; //用于调配ByteBuffer private final MemoryPool memoryPool;}
轮询数据
初始化连贯实现之后,这个时候就是开始轮询了,在Selector.poll()办法中对于数据读写的逻辑如下
public void poll(long timeout) throws IOException { /* check ready keys */ int numReadyKeys = select(timeout); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); pollSelectionKeys(readyKeys, false, endSelect); // 革除所有SelectionKey,防止下一次在进行解决 readyKeys.clear(); //解决发动连贯时,马上就建设连贯的申请,这种个别只在broker和client在同一台机器上才存在 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } //将暂存起来的网络响应增加到已实现网络响应汇合外面 addToCompletedReceives();}void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { for (SelectionKey key : determineHandlingOrder(selectionKeys)) { KafkaChannel channel = channel(key); boolean sendFailed = false; //READ事件 if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; //read办法会从channel中将数据读取到Buffer中(还是通过KafkaChannel中的transportLayer), while ((networkReceive = channel.read()) != null) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); //将读到的申请存起来 Deque<NetworkReceive> deque = stagedReceives.get(channel); deque.add(receive); } } //写事件 if (channel.ready() && key.isWritable()) { //从buffer中写入数据到Channel(KafkaChannel中的transportLayer) Send send = channel.write(); }}
下面的代码次要做了两件事儿
- 读取网络返回的申请,从Channel读进Buffer,Buffer是有容量限度的,所以可能一次只能读取一个req的局部数据。只有读到一个残缺的req的状况下,channel.read()办法才返回非null
- 发送数据,从Buffer写入Channel,这里发动了真正的网络IO
读出数据后,会先放到stagedReceives汇合中,而后在addToCompletedReceives()办法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。
private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); //被mute的channel会被放到explicitlyMutedChannels中,chanel被mute是在服务端(scala)执行的 if (!explicitlyMutedChannels.contains(channel)) { Deque<NetworkReceive> deque = entry.getValue(); addToCompletedReceives(channel, deque); if (deque.isEmpty()) iter.remove(); } } }}
依据官网代码正文这样做的起因有2点,还能够参考(https://github.com/apache/kaf...。
- 对于SSL连贯,数据内容是加密的,不能精准的确定本次须要读取的数据大小,只能尽可能的多读,这样会导致可能会比申请的数据读的要多。那如果该channel之后没有数据能够读,会导致多读的数据将不会被解决。
kafka须要确保一个channel上request被解决的程序是其发送的程序。因而对于每个channel而言,每次poll下层最多只能看见一个申请,当该申请解决实现之后,再解决其余的申请。对Server端和Client端来说解决形式不一样。Selector这个类在Client和Server端都会调用,所以这里存在两种状况
- 利用在 Server 端时,Server 为了保障音讯的时序性,在 Selector 中提供了两个办法:mute(String id) 和 unmute(String id),对该 KafkaChannel 做标记来保障同时只能解决这个 Channel 的一个 request(能够了解为排它锁)。当 Server 端接管到 request 后,先将其放入 stagedReceives 汇合中,此时该 Channel 还未 mute,这个 Receive 会被放入 completedReceives 汇合中。Server 在对 completedReceives 汇合中的 request 进行解决时,会先对该 Channel mute,解决后的 response 发送实现后再对该 Channel unmute,而后能力解决该 Channel 其余的申请
- 利用在 Client 端时,Client 并不会调用 Selector 的 mute() 和 unmute() 办法,client 发送音讯的时序性而是通过 InFlightRequests(保留了max.in.flight.requests.per.connection参数的值) 和 RecordAccumulator 的 mutePartition 来保障的,因而对于 Client 端而言,这里接管到的所有 Receive 都会被放入到 completedReceives 的汇合中期待后续解决。
解决响应
当初响应数据曾经收到了,在KafkaClient.poll办法中会调用handleCompletedReceives()办法解决曾经实现的响应
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); //依据返回的数据结构解析对应body AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); maybeThrottle(body, req.header.apiVersion(), req.destination, now); //解决MetadataResponse数据,从中解析topic,partition,broker的对应关系 if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else responses.add(req.completed(body, now)); }}
至此,发送metadata request的流程曾经剖析结束,发送音讯的流程和metadata request的流程大体是统一的,这里就不做过多剖析了。
总结
总结下发送流程
- sender 线程第一次调用 poll() 办法时,初始化与 node 的连贯;
- sender 线程第二次调用 poll() 办法时,发送 Metadata 申请;
- sender 线程第三次调用 poll() 办法时,获取 metadataResponse,并更新 metadata。
通过上述 sender 线程三次调用 poll()办法,所申请的 metadata 信息才会失去更新,此时 Producer 线程也不会再阻塞,开始发送音讯。
剖析Kafka网络层的形成的时候,肯定要搞清楚NIO的解决流程,进一步了解Kafka中的Selector和KafkaChannel。
本次源代码剖析基于kafka-client-2.0.0版本。