关于kafka:Kafka-Producer网络层源码分析

22次阅读

共计 9909 个字符,预计需要花费 25 分钟才能阅读完成。

上一篇讲了 Kafka Producer 发送音讯的主体流程,这一篇咱们关注下 Kafka 的网络层是如何实现的。
对于发送音讯而言,Producer 是客户端,Broker 是服务器端。
Kafka 应用了 JavaNIO 向服务器发送音讯,所以在这之前须要理解 java nio 的基本知识。这次网络层源码剖析从 metadata request 切入。

开局一张图

下面是 Kafka producer 网络层的主体流程,先看下有一个大体印象。

Kafka 的底层应用的是 Java NIO,Kafka 中针对 NIO 的 Selector 的封装类也叫 Selector,对 Channel 的封装类叫做 KafkaChannel。前面如果没有非凡阐明,Selector 都是指 Kafka 中的 Selector。

metadata request

先来回顾下 Kafka 发送音讯的代码

KafkaProducer<String,String> producer = createProducer();
for (int i = 0; i < 10;i++) {producer.send(new ProducerRecord<>("codeTest","key" + (i+1),"value" + (i+1)));
}

下面的代码中首先会初始化 KafkaProducer,在初始化 KafkaProducer 的时候,同时咱们也会初始化 Kafka 发送音讯的客户端

KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                this.metrics, time, "producer", channelBuilder, logContext),
        ...);

创立 NetworkClient(implements KafkaClient)的同时会创立一个 Selector,这个是对 java.nio.Selector 的封装,发送申请,接管响应,解决连贯实现以及现有的断开连接都是通过它的 poll()办法调用实现的。

期待 metadata 更新

咱们都晓得 Kafka 读写音讯都是通过 leader 的,只有晓得了 leader 能力发送音讯到 kafka,在咱们的你好,Kafka
)一文中,咱们讲了首先会发动 metadata request, 从中就能够获取到集群元信息(leader,partiton,ISR 列表等), 那么是在哪里发动 metadata request 的呢?

调用 KafkaProducer 的 doSend()(send()–>doSend())办法时,第一步就是通过 waitOnMetadata 期待集群元数据 (topic,partition,node 等) 可用。

Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 如果此时曾经有 partition 的信息了
if (partitionsCount != null && (partition == null || partition < partitionsCount))
    return new ClusterAndWaitTime(cluster, 0);

do {int version = metadata.requestUpdate();
  // 唤醒 Sender()线程,Sender 会被 poll 阻塞(参见 java.nio.Channels.Selector.poll())
  sender.wakeup();
  // 期待 metadata 的更新, 会始终阻塞直到以后版本大于最近一次版本
  metadata.awaitUpdate(version, remainingWaitMs);
  cluster = metadata.fetch();
  elapsed = time.milliseconds() - begin;
  remainingWaitMs = maxWaitMs - elapsed;
  partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);

如果 metadata 中不蕴含对应 topic 的 metadata 信息, 那么就申请更新 metadata, 如果没有更新则始终会在这个 while 循环中,这个循环次要做了以下几件事

  1. 调用 metadata.requestUpdate(); 将 needUpdate 属性设置为 true(示意强制更新), 返回以后 version(用于判断是否更新过了)
  2. 唤醒 Sender 线程,实际上是唤醒 NetworkClient 中 Selector, 防止 Selector 始终在 poll 中期待
  3. 执行 metadata.awaitUpdate 期待 metadata 的更新,未更新则始终阻塞。
// 期待元数据更新,直到以后版本大于咱们晓得的最新版本
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {long begin = System.currentTimeMillis();
  long remainingWaitMs = maxWaitMs;
  while ((this.version <= lastVersion) && !isClosed()) {if (remainingWaitMs != 0)
          wait(remainingWaitMs);
      long elapsed = System.currentTimeMillis() - begin;
      remainingWaitMs = maxWaitMs - elapsed;
  }
}

发动申请

当初咱们晓得了会期待元数据更新,那么到底是在哪里更新的呢? 下面有讲到唤醒了 Sender 线程, 在 run()办法中会去调用 KafkaClient.poll()办法,这里会对 metadata request 进行解决

@Override
public List<ClientResponse> poll(long timeout, long now) {
    
  // 解决 metadata
  long metadataTimeout = metadataUpdater.maybeUpdate(now);

  // 这里会调用 java.nio.Selector.select()办法
  this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
  ...

  // 这里会对 metadata response 进行解决,就能够获取到 kafka metadata 的信息
  handleCompletedReceives(responses, updatedNow);
  ...

  return responses;
}

当初看看 metadata request 如何发送的

private long maybeUpdate(long now, Node node) {String nodeConnectionId = node.idString();
  // 曾经连贯,且 Channel 曾经 ready, 且没有申请被发送到给定节点
  if (canSendRequest(nodeConnectionId, now)) {// 这里会最终调用 NetworkClient.doSend()办法, 实际上是将 Send 对象设置到 KafkaChannel 中,并没有进行网络 IO
      sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
      return defaultRequestTimeoutMs;
  }
  if (connectionStates.canConnect(nodeConnectionId, now)) {
      // 初始化连贯
      initiateConnect(node, now);
      return reconnectBackoffMs;
  }
  return Long.MAX_VALUE;
}

下面的代码先查看是否能够发送申请, 如果能够发送申请就间接将数据设置到 KafkaChannel 中。如果不能发送就查看以后是否能够连贯,如果能够则初始化连贯,初始化连贯的代码在 Selector.connect()办法中

@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {SocketChannel socketChannel = SocketChannel.open();
  // 设置 keepAlive 以及 socket 的一些属性(比方发送数据缓存区大小以及接收数据缓冲区大小)
  configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);

  // 实际上调用 socketChannel.connect(address);
  boolean connected = doConnect(socketChannel, address);

  // 将 socketChannel 注册到 nioSelector 中, 同时将生成 KafkaChannel(对 java.nio.Channel 的封装)
  // 并将 KafkaChannel 绑定到 java.nio.SelectionKey 中
  SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
// connectct 为 true 代表该连贯不会再触发 CONNECT 事件,所以这里要独自解决
  if (connected) {
      // 退出到一个独自的汇合中
      immediatelyConnectedKeys.add(key);
      // 勾销对该连贯的 CONNECT 事件的监听
      key.interestOps(0);
  }
}

private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {SelectionKey key = socketChannel.register(nioSelector, interestedOps);
  KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
  this.channels.put(id, channel);
  return key;
}

如果相熟 NIO 的话,下面的代码看上去就很相熟,次要就是设置设置 channel 以及 selectionKey 的关系。
须要留神的是 doConnect()办法返回为 true 的状况, 在非阻塞模式下,对于 local connection,连贯可能马上就建设好了,那该办法会返回 true,对于这种状况,不会再触发之后的 connect 事件。因而 kafka 用一个独自的汇合 immediatelyConnectedKeys 将这些非凡的连贯记录下来。在接下来的步骤会进行非凡解决。
这里要留意到 KafkaChannel 就是在这里被创立的。到这里咱们就要来看看 KafkaChannel 和 Selector 有哪些属性是须要咱们留神的

public class KafkaChannel {
  // 继承 java.nio.channels.Channel,可读可写, 对 socketChannel 的封装
  private final TransportLayer transportLayer;

  // 通过它来创立 Buffer 和回收 Buffer
  private final MemoryPool memoryPool;

  // 收到的数据
  private NetworkReceive receive;

  // 发送的数据
  private Send send;
}

public class Selector implements Selectable, AutoCloseable {

  //java nio 中的 Selector
  private final java.nio.channels.Selector nioSelector;

  //kafka 服务器节点和 Channel 之间对应关系
  private final Map<String, KafkaChannel> channels;
  // 发送实现的申请
  private final List<Send> completedSends;
  // 残缺的音讯响应
  private final List<NetworkReceive> completedReceives;

  // 暂存的音讯响应
  private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;

  // 立刻连贯上的 SelectionKey
  private final Set<SelectionKey> immediatelyConnectedKeys;

  // 用于调配 ByteBuffer
  private final MemoryPool memoryPool;

}

轮询数据

初始化连贯实现之后,这个时候就是开始轮询了,在 Selector.poll()办法中对于数据读写的逻辑如下

public void poll(long timeout) throws IOException {
    
  /* check ready keys */
  int numReadyKeys = select(timeout);

  if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

      pollSelectionKeys(readyKeys, false, endSelect);
      // 革除所有 SelectionKey,防止下一次在进行解决
      readyKeys.clear();
      
      // 解决发动连贯时,马上就建设连贯的申请,这种个别只在 broker 和 client 在同一台机器上才存在
      pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
      immediatelyConnectedKeys.clear();}
  // 将暂存起来的网络响应增加到已实现网络响应汇合外面
  addToCompletedReceives();}

void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                       boolean isImmediatelyConnected,
                       long currentTimeNanos) {for (SelectionKey key : determineHandlingOrder(selectionKeys)) {KafkaChannel channel = channel(key);
 
    boolean sendFailed = false;
    //READ 事件
    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
    && !explicitlyMutedChannels.contains(channel)) {

      NetworkReceive networkReceive;

      //read 办法会从 channel 中将数据读取到 Buffer 中(还是通过 KafkaChannel 中的 transportLayer),while ((networkReceive = channel.read()) != null) {if (!stagedReceives.containsKey(channel))
          stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        // 将读到的申请存起来
        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
      }
  }

  // 写事件
  if (channel.ready() && key.isWritable()) {// 从 buffer 中写入数据到 Channel(KafkaChannel 中的 transportLayer)
    Send send = channel.write();}
}

下面的代码次要做了两件事儿

  1. 读取网络返回的申请,从 Channel 读进 Buffer,Buffer 是有容量限度的,所以可能一次只能读取一个 req 的局部数据。只有读到一个残缺的 req 的状况下,channel.read()办法才返回非 null
  2. 发送数据,从 Buffer 写入 Channel, 这里发动了真正的网络 IO

读出数据后,会先放到 stagedReceives 汇合中,而后在 addToCompletedReceives()办法中对于每个 channel 都会从 stagedReceives 取出一个 NetworkReceive(如果有的话),放入到 completedReceives 中。

private void addToCompletedReceives() {if (!this.stagedReceives.isEmpty()) {Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
    while (iter.hasNext()) {Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
        KafkaChannel channel = entry.getKey();
        // 被 mute 的 channel 会被放到 explicitlyMutedChannels 中,chanel 被 mute 是在服务端 (scala) 执行的
        if (!explicitlyMutedChannels.contains(channel)) {Deque<NetworkReceive> deque = entry.getValue();
            addToCompletedReceives(channel, deque);
            if (deque.isEmpty())
                iter.remove();}
    }
  }
}

依据官网代码正文这样做的起因有 2 点, 还能够参考(https://github.com/apache/kaf…。

  1. 对于 SSL 连贯,数据内容是加密的,不能精准的确定本次须要读取的数据大小,只能尽可能的多读,这样会导致可能会比申请的数据读的要多。那如果该 channel 之后没有数据能够读,会导致多读的数据将不会被解决。
  2. kafka 须要确保一个 channel 上 request 被解决的程序是其发送的程序。因而对于每个 channel 而言,每次 poll 下层最多只能看见一个申请,当该申请解决实现之后,再解决其余的申请。对 Server 端和 Client 端来说解决形式不一样。Selector 这个类在 Client 和 Server 端都会调用,所以这里存在两种状况

    1. 利用在 Server 端时,Server 为了保障音讯的时序性,在 Selector 中提供了两个办法:mute(String id) 和 unmute(String id),对该 KafkaChannel 做标记来保障同时只能解决这个 Channel 的一个 request(能够了解为排它锁)。当 Server 端接管到 request 后,先将其放入 stagedReceives 汇合中,此时该 Channel 还未 mute,这个 Receive 会被放入 completedReceives 汇合中。Server 在对 completedReceives 汇合中的 request 进行解决时,会先对该 Channel mute,解决后的 response 发送实现后再对该 Channel unmute,而后能力解决该 Channel 其余的申请
    2. 利用在 Client 端时,Client 并不会调用 Selector 的 mute() 和 unmute() 办法,client 发送音讯的时序性而是通过 InFlightRequests(保留了 max.in.flight.requests.per.connection 参数的值) 和 RecordAccumulator 的 mutePartition 来保障的,因而对于 Client 端而言,这里接管到的所有 Receive 都会被放入到 completedReceives 的汇合中期待后续解决。

解决响应

当初响应数据曾经收到了,在 KafkaClient.poll 办法中会调用 handleCompletedReceives()办法解决曾经实现的响应

private void handleCompletedReceives(List<ClientResponse> responses, long now) {for (NetworkReceive receive : this.selector.completedReceives()) {String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
      
       // 依据返回的数据结构解析对应 body
        AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
        maybeThrottle(body, req.header.apiVersion(), req.destination, now);
        // 解决 MetadataResponse 数据, 从中解析 topic,partition,broker 的对应关系
        if (req.isInternalRequest && body instanceof MetadataResponse)
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            responses.add(req.completed(body, now));
    }
}

至此, 发送 metadata request 的流程曾经剖析结束,发送音讯的流程和 metadata request 的流程大体是统一的, 这里就不做过多剖析了。

总结

总结下发送流程

  1. sender 线程第一次调用 poll() 办法时,初始化与 node 的连贯;
  2. sender 线程第二次调用 poll() 办法时,发送 Metadata 申请;
  3. sender 线程第三次调用 poll() 办法时,获取 metadataResponse,并更新 metadata。

通过上述 sender 线程三次调用 poll()办法,所申请的 metadata 信息才会失去更新,此时 Producer 线程也不会再阻塞,开始发送音讯。

剖析 Kafka 网络层的形成的时候, 肯定要搞清楚 NIO 的解决流程,进一步了解 Kafka 中的 Selector 和 KafkaChannel。

本次源代码剖析基于 kafka-client-2.0.0 版本。

正文完
 0