

上一节咱们次要从 HelloWorld 开始,剖析了 Kafka Producer 的创立,重点剖析了如何解析生产者配置的源码原理。

    public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null);

Kafka Producer 的创立除了配置解析,还有要害的一步就是调用了一个重载的构造函数。这一节咱们就来看下它次要做了什么。

KafkaProducer 初始化的哪些组件?



  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {log.trace("Starting the Kafka producer");
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = new SystemTime();

            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            Map<String, String> metricTags = new LinkedHashMap<String, String>();
            metricTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            /* check for user defined settings.
             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
             * This should be removed with release 0.9 when the deprecated configs are removed.
            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + "config is deprecated and will be removed soon." +
                        "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
                if (blockOnBufferFull) {this.maxBlockTimeMs = Long.MAX_VALUE;} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
                            "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
                } else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
                        "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);

            /* check for user defined settings.
             * If the TIME_OUT config is set use that for request timeout.
             * This should be removed with release 0.9
            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.TIMEOUT_CONFIG + "config is deprecated and will be removed soon. Please use" +
                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
            } else {this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient client = new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                    this.requestTimeoutMs, time);
            this.sender = new Sender(client,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    new SystemTime(),
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

            this.errors = this.metrics.sensor("errors");

            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                this.keySerializer.configure(config.originals(), true);
            } else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                this.valueSerializer.configure(config.originals(), false);
            } else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);    


1)次要是依据之前解析好的 ProducerConfig 对象,设置了一堆 Producer 的参数

2)new Metadata(),它应该算一个组件,从名字上猜想,应该是负责元数据相干的

3)new RecordAccumulator()应该也是一个组件,临时不晓得是啥意思, 名字是翻译下是记录累加器

4)new NetworkClient()一看就是网络通信相干的组件

5)new Sender()和 new new KafkaThread() 应该是创立了 Runnable, 并且应用 1 个线程启动。看着像是发送音讯的线程

6)new ProducerInterceptors() 貌似是拦截器相干的货色

你能够看到这个构造函数,根本外围脉络就是下面 6 点了。咱们能够画一个组件图小结下:

RecordAccumulator 到底是什么?

晓得了下面次要的组件次要有啥。RecordAccumulator 这个类没看进去是啥意思,怎么办?看看有没有类正文。

 * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
 * instances to be sent to the server.
 * 这个类能够应用队列记录 Records,筹备待发送的数据给 Server(也就是 Broker)* <p>
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 * 当没有被禁用时,累加器因为应用了无限的内存,达到下限会阻塞。*/
public final class RecordAccumulator {}

看过正文后,大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器

Metadata 元数据到底是什么?

还有一个 Metadata 元数据这组件,有些人可能也不太分明,元数据是指什么,元数据就是指形容数据,比方我 mac 或 windows 文件的元数据,就是它的大小,地位,创立工夫,批改工夫等。

那 KafkaProducer 生产者的元数据是指什么呢?这里就要给大家回顾一个常识了:

Kafka 常识回顾 Tips:Topic、Partition、Record,Leader Partition、Follower Partition、Replica 是什么?

这几个是 kafka 治理音讯波及的基本概念。

Topic:Kafka 治理音讯的逻辑构造,Topic 下能够有多个 Partition,用作分布式存储,用来反对海量数据。

Partition:多条音讯存储构造封装,对应到磁盘上的一个个 log 文件。kafka 把音讯存储到磁盘的文件通常称作 log,理论就是多条音讯而已。

Record: 指每一条音讯的形象封装。

Broker 通常有两种角色,leader 和 follwer,为了高可用。follower 是 leader 的正本。

Replica:正本,leader 和 follower 的都能够算是寄存音讯的一个正本,互为备份。所以 replica 能够指 leader,也能够指 follower。


要想发送音讯给 Broker,起码得晓得发送到哪里去。所以就须要形容信息,这些形容信息就是发送音讯须要的元数据。

Producer 个别都须要从 broker 集群去拉取元数据,包含了 Topic 中的 Partitions 信息,前面如果发送音讯到 Topic,才晓得这个 Topic 有哪些 Partitions,哪些是 Leader Partition 所在的 Broker。


Producer 外围组件—元数据 Metadata 分析

既然咱们晓得了 Producer 次要初始化了下面的一些组件,那么只有搞懂下面每个组件做了什么,根本 Producer 的很多原理就能了解透彻了。

咱们先来看下 Metadata 这个元数据组件做了什么。

首先 Metadata 的创立很简略,如下:

     * Create a new Metadata instance
     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
     *        polling 
     * 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询
     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
     * 不刷新即可保留元数据的最长工夫
    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
        this.refreshBackoffMs = refreshBackoffMs;
        this.metadataExpireMs = metadataExpireMs;
        this.lastRefreshMs = 0L;
        this.lastSuccessfulRefreshMs = 0L;
        this.version = 0;
        this.cluster = Cluster.empty();
        this.needUpdate = false;
        this.topics = new HashSet<String>();
        this.listeners = new ArrayList<>();
        this.needMetadataForAllTopics = false;

这个构造函数,从正文就阐明了,这个元数据对象 Metadata 会被定时刷新,也就是说,它应该会定时的从 Broker 拉取外围的元数据到 Producer。



默认值就是在之前 ConfigDef 动态变量初始化能够看到。

refreshBackoffMs 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询,默认 100ms

metadataExpireMs,默认是每隔 5 分钟拉取一次元数据。

lastRefreshMs 最近拉取元数据的工夫戳

lastSuccessfulRefreshMs 最近拉取元数据胜利的工夫戳

version 元数据拉取的版本

Cluster 这个比拟要害,是元数据信息的对象封装

needUpdate 是否须要拉取标识

topics 记录 topic 信息的汇合

listeners 元数据变更的监听回调

needMetadataForAllTopics 默认是一个 false,临时不晓得是做什么的

2)初始化 Cluster 元数据对象

下面变量中,元数据最终封装寄存在了 Cluster 对象中。能够看下它会放了什么数据:

 * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
public final class Cluster {

    private final boolean isBootstrapConfigured;
    //Kafka Broker 节点
    private final List<Node> nodes;
    // 没有被受权拜访的 Topic 的列表
    private final Set<String> unauthorizedTopics;
    //TopicPartition:Topic 和 Partition 根本关系信息
    //PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // 每个 topic 有哪些分区 
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // 每个 topic 有哪些以后可用的分区,如果某个分区没有 leader 是存活的,此时那个分区就不可用了
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // 每个 broker 上放了哪些分区
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    //broker.id -> Node
    private final Map<Integer, Node> nodesById;
    // 省略初始化办法

次要就是组成了整个 Kafka 集群信息,比方

Node: 记录了 Kafka Broker 的 ip,端口等

TopicPartition:Topic 和 Partition 根本关系信息

PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等

其余的下面我也用正文根本都标注了他们的大抵意思了。大家大体有一个印象就行,其实只有晓得都是 topic 的元数据就行了。

下面的信息你如果问我是怎么晓得的,很简略,我 debug 了下,当前面拉取到元数据后,你能够看下数据,就明确了。debug 看源码的办法 在这个场景就比拟适宜,咱们目前也没有下载源码,导入源码,只须要写一个 helloWorld,通过maven 主动下载 jar 包的源码,进行 debug 就能够剖析客户端的源码。



KafkaProducer 创立 Metadata 其实并没有如许简单,创立了之后做了什么呢?KafkaProducer 的构造函数,执行了一个 metadata.update 办法。

 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
     this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

     // RecordAccumulator、NetworkClient、Sender 等组件的初始化,省略...
     this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

     // 省略...

这个难道就在进行元数据拉取么?咱们来看下这个 update 办法:

 * Update the cluster metadata
public synchronized void update(Cluster cluster, long now) {
    this.needUpdate = false;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.version += 1;

    for (Listener listener: listeners)

    // Do this after notifying listeners as subscribed topics' list can be changed by listeners
    this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);

因为 listeners 之前初始化是空的,这个 needMetadataForAllTopics 参数也是 false,之后间接调用了 Metadata.notifyAll(),其实什么都没干。没有什么元数据拉取或者更新的操作。

最终发现,这个办法阐明其实简直什么都没有做,也就是说 KafkaProducer 创立的时候,没有进行元数据拉取。只是初始化了一个 Metadata 对象,其中元数据对象 Cluster 的信息默认是空的

Metadata 的整个过程的要害,如下图所示:

到这里,你会发现浏览源码的时候,不是什么时候都是一帆风顺的,会被各种分支和代码搞得昏头昏脑。像下面的 update()办法,就会蛊惑你。

但此时你不要灰心,肯定要 缕清外围脉络思路,多画图,先记录要害逻辑,把这里放一放,能够尝试持续剖析其余的场景和逻辑。当剖析的逻辑和场景足够多的时候,多反复剖析几次。你就会缓缓悟到之前不懂的逻辑,会串起来所有的逻辑的。

Producer 外围组件—RecordAccumulator 分析

仔细分析过了元数据组件的创立之后,咱们接着看下一个组件 RecordAccumulator 音讯内存缓冲器。

之前通过正文咱们大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器


     * Create a new record accumulator
     * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
     * @param totalSize The maximum memory the record accumulator can use.
     * @param compression The compression codec for the records
     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
     *        exhausting all retries in a short period of time.
     * @param metrics The metrics
     * @param time The time instance to use
    public RecordAccumulator(int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time) {
        this.drainIndex = 0;
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<>();
        String metricGrpName = "producer-metrics";
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
        this.incomplete = new IncompleteRecordBatches();
        this.muted = new HashSet<>();
        this.time = time;
        registerMetrics(metrics, metricGrpName);


1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()

3)初始化了 BufferPool 和 IncompleteRecordBatches

1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

首先是设置了一些参数,从上一节 ConfigDef 初始化能够看到默认值和根本作用

batchSize 默认是 16kb,批量打包音讯发送给 Broker 的大小管制

totalSize 默认是 32MB,示意音讯内存缓冲区的大小

retryBackoffMs 默认每隔 100ms 重试一次

lingerMs 10ms 内还没有凑成 1 个 batch 发送进来,必须立刻发送进来

compression 压缩申请形式,默认 none

2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()

应该是寄存 Record 音讯的一个汇合,看着像是依照某个 topic 某个分区下,寄存一些音讯,用到了一个双端队列

batches = new ConcurrentMap<TopicPartition, Deque<RecordBatch>>()            

3)初始化了 BufferPool 和 IncompleteRecordBatches

IncompleteRecordBatches 的创立比较简单。如下:

     * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
    private final static class IncompleteRecordBatches {
        private final Set<RecordBatch> incomplete;

        public IncompleteRecordBatches() {this.incomplete = new HashSet<RecordBatch>();
        public void add(RecordBatch batch) {synchronized (incomplete) {this.incomplete.add(batch);
        public void remove(RecordBatch batch) {synchronized (incomplete) {boolean removed = this.incomplete.remove(batch);
                if (!removed)
                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
        public Iterable<RecordBatch> all() {synchronized (incomplete) {return new ArrayList<>(this.incomplete);

正文能够看进去,它是一个线程平安的辅助类,通过 synchronized 操作 HashSet 保障用于保留 Broker 尚未确认(ack)的 RecordBatches。

而 new BufferPool 初始化缓冲区,代码如下:

public final class BufferPool {

    private final long totalMemory;
    private final int poolableSize;
    private final ReentrantLock lock;
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;

     * Create a new buffer pool
     * @param memory The maximum amount of memory that this buffer pool can allocate
     * @param poolableSize The buffer size to cache in the free list rather than deallocating
     * @param metrics instance of Metrics
     * @param time time instance
     * @param metricGrpName logical group name for metrics
    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<ByteBuffer>();
        this.waiters = new ArrayDeque<Condition>();
        this.totalMemory = memory;
        this.availableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                   "The fraction of time an appender waits for space allocation.");
        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));



你看过这些的组件的内部结构,其实可能并不知道它们到底是干嘛的,没关系,这里咱们 次要的目标原本就是初步就是对这些组件有个印象就能够了,之后剖析某个组件的行为和作用的时候,能力更好的了解。

Producer 外围组件—NetworkClient 分析

如果要拉去元数据或者发送音讯,首先必定要和 Broker 建设连贯。之前剖析 KafkaProducer 的源码脉络时,有一个网络通信组件 NetworkClient,咱们能够剖析下这个组件怎么创立,做了哪些事件。看看元数据拉取会不会在这里呢?

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 一些参数设置,省略...

    // RecordAccumulator、Metadata、Sender 等组件的初始化,省略...

    NetworkClient client = new NetworkClient(
        // Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
        this.requestTimeoutMs, time);
    // 省略...

private NetworkClient(MetadataUpdater metadataUpdater,
                          Metadata metadata,
                          Selectable selector,
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time) {

        /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
         * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
         * super constructor is invoked.
        if (metadataUpdater == null) {if (metadata == null)
                throw new IllegalArgumentException("`metadata` must not be null");
            // 更新元数据的一个组件?
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {this.metadataUpdater = metadataUpdater;}
        this.selector = selector;
        this.clientId = clientId;
        // 已发送或正在发送但尚未收到响应的申请集
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        // Producer 与集群中每个节点的连贯状态
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;

下面的 NetworkClient 创立,次要是

1)创立了一个 Selector,Selector 这个名称,如果你相熟 Java NIO 的 API 的话,应该不会生疏,它是 NIO 三大组件之一 Selector、Buffer、Channel。Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件。

这里我不开展将 NIO 的原理,Selector 这个组件,你能够简略的了解为是用来监听网络连接是否有建设和读写申请的。


3)创立了一个 DefaultMetadataUpdater 组件,将 metadata 传递给了它。从名字连蒙带猜下,如同是更新元数据的一个组件。难道找到元数据拉取的逻辑了?一会能够重点关注下这个类的应用

4)创立了 InFlightRequests 和 ClusterConnectionStates 从这两个类的正文咱们能够看进去,InFlightRequests 是已发送或正在发送但尚未收到响应的申请集,ClusterConnectionStates 是 Producer 与集群中每个节点的连贯状态。**

下面的 NetworkClient 的初始化,整个过程能够总结如下图:


细节 1:首先是创立 Selector 代码如下:

public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);

 public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            // 实质还是创立了一个 NIO 的 Selector
            this.nioSelector = java.nio.channels.Selector.open();} catch (IOException e) {throw new KafkaException(e);
        this.maxReceiveSize = maxReceiveSize;
        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.channels = new HashMap<>();
        this.completedSends = new ArrayList<>();
        this.completedReceives = new ArrayList<>();
        this.stagedReceives = new HashMap<>();
        this.immediatelyConnectedKeys = new HashSet<>();
        this.connected = new ArrayList<>();
        this.disconnected = new ArrayList<>();
        this.failedSends = new ArrayList<>();
        this.sensors = new SelectorMetrics(metrics);
        this.channelBuilder = channelBuilder;
        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
        this.lruConnections = new LinkedHashMap<>(16, .75F, true);
        currentTimeNanos = time.nanoseconds();
        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
        this.metricsPerConnection = metricsPerConnection;

能够看到,创立 Kafka 的 Selector 实质还是创立了一个 NIO 的 Selector:java.nio.channels.Selector.open();

细节 2:DefaultMetadataUpdater 这个类的初始化,什么都没做,就是援用了下 Metadata

    class DefaultMetadataUpdater implements MetadataUpdater {

        // 援用了下 Metadata
        /* the current cluster metadata */
        private final Metadata metadata;

        /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
        private boolean metadataFetchInProgress;

        /* the last timestamp when no broker node is available to connect */
        private long lastNoNodeAvailableMs;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.metadataFetchInProgress = false;
            this.lastNoNodeAvailableMs = 0;

细节 3:InFlightRequests 的正文确实是已发送或正在发送但尚未收到响应的申请集的意思。不了解也没关系,前面咱们会看到它应用的中央的。

 * The set of requests which have been sent or are being sent but haven't yet received a response
 * 已发送或正在发送但尚未收到响应的申请集的意思
final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
    public InFlightRequests(int maxInFlightRequestsPerConnection) {this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;}

细节 4:ClusterConnectionStates 这个类正文也就是 Producer 与集群中每个节点的连贯状态的意思。连贯状态次要有已连贯、连贯中、断开。

 * The state of our connection to each node in the cluster.
 * Producer 与集群中每个节点的连贯状态的意思
final class ClusterConnectionStates {
    private final long reconnectBackoffMs;
    private final Map<String, NodeConnectionState> nodeState;

    public ClusterConnectionStates(long reconnectBackoffMs) {
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.nodeState = new HashMap<String, NodeConnectionState>();}
     * The state of our connection to a node
    private static class NodeConnectionState {

        ConnectionState state;
        long lastConnectAttemptMs;

        public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
            this.state = state;
            this.lastConnectAttemptMs = lastConnectAttempt;

        public String toString() {return "NodeState(" + state + "," + lastConnectAttemptMs + ")";
     * The states of a node connection
     * 连贯状态次要有已连贯、连贯中、断开
    public enum ConnectionState {DISCONNECTED, CONNECTING, CONNECTED}

下面整个 NeworkClient 的初始化,就实现了。至于网络组件的相干参数这里先不做解释,当应用到的时候我再给大家解释。目前解释了大家可能也太能了解。


Producer 外围组件—Sender 线程分析

网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器,咱们都分析了下它们的初始化过程。次要晓得它们初始化了那些货色。咱们总结了组件图,记录了要害信息。咱们能够持续往下剖析最初一个外围的组件 Send 线程。咱们来看看它搞了哪些事件。

Sender 的初始化逻辑如下所示:

  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
    // RecordAccumulator、NetworkClient、Metadata 等组件的初始化,省略...
    this.sender = new Sender(client,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    new SystemTime(),
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    // 省略...
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              String clientId,
              int requestTimeout) {
    this.client = client;
    this.accumulator = accumulator;
    this.metadata = metadata;
    this.guaranteeMessageOrder = guaranteeMessageOrder;
    this.maxRequestSize = maxRequestSize;
    this.running = true;
    this.acks = acks;
    this.retries = retries;
    this.time = time;
    this.clientId = clientId;
    this.sensors = new SenderMetrics(metrics);
    this.requestTimeout = requestTimeout;

public KafkaThread(final String name, Runnable runnable, boolean daemon) {super(runnable, name);
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error("Uncaught exception in" + name + ":", e);

这个初始化外围脉络很简略,次要就是将其余组件交给了 Sender 去应用。

1) 设置了 sender 的一些外围参数

retries:重试次数,默认是 0,不重试

acks:”all”, “-1”, “0”, “1” 确认策略 默认是 1,leader broker 写入胜利,就算发送胜利。(可能导致音讯失落)

max.request.size:最大的申请大小 默认 1mb

max.in.flight.requests.per.connection 参数默认值是 5,每个 Broker 最多只能有 5 个申请是发送进来然而还没接管到响应的(重试可能导致音讯程序错乱)

2)援用了其余三个要害组件:网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器

3)之后通过 KafkaThread 包装了 Runnable 线程,启动了线程,开始执行 Sender 的 run 办法了


run 办法的执行,不是这一节咱们次要关怀的了。我前面几节会详细分析的。



KafkaProducer 初始化的哪些组件

Producer 外围组件—RecordAccumulator 分析

Producer 外围组件—元数据 Metadata 分析

Producer 外围组件—NetworkClient 分析

Producer 外围组件—Sender 线程分析


有了这张图,前面几节咱们就来重点开始剖析 Kafka Prodcuer 外围流程就容易很多了。比方

元数据拉取机制 wait+notifyAll 的灵便应用、发送音讯的路由策略

网络通信机制,基于原生 NIO 发送音讯机制 + 粘包拆包问题的奇妙解决

Producer 的高吞吐:内存缓冲区的双端队列 + 批量打包 Batch 发送机制


