关于springboot:Kafka成长记2Producer核心组件分析

7次阅读

共计 24478 个字符,预计需要花费 62 分钟才能阅读完成。

上一节咱们次要从 HelloWorld 开始,剖析了 Kafka Producer 的创立,重点剖析了如何解析生产者配置的源码原理。

    public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null);
    }

Kafka Producer 的创立除了配置解析,还有要害的一步就是调用了一个重载的构造函数。这一节咱们就来看下它次要做了什么。

KafkaProducer 初始化的哪些组件?

既然时一个要害组件创立,剖析的构造函数,咱们首要做的就是剖析它的代码脉络,看看外围的组件有哪些,画一个组件图先。

让咱们来看下构造函数的代码:

  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {log.trace("Starting the Kafka producer");
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = new SystemTime();

            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            Map<String, String> metricTags = new LinkedHashMap<String, String>();
            metricTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            /* check for user defined settings.
             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
             * This should be removed with release 0.9 when the deprecated configs are removed.
             */
            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + "config is deprecated and will be removed soon." +
                        "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
                if (blockOnBufferFull) {this.maxBlockTimeMs = Long.MAX_VALUE;} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
                            "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
                } else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
                }
            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
                        "Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            }

            /* check for user defined settings.
             * If the TIME_OUT config is set use that for request timeout.
             * This should be removed with release 0.9
             */
            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.TIMEOUT_CONFIG + "config is deprecated and will be removed soon. Please use" +
                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
            } else {this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            }

            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient client = new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs, time);
            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

            this.errors = this.metrics.sensor("errors");

            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);    
        }
    }

这个构造函数的代码还是比拟都多的,不过没关系,先扫一下它的脉络:

1)次要是依据之前解析好的 ProducerConfig 对象,设置了一堆 Producer 的参数

2)new Metadata(),它应该算一个组件,从名字上猜想,应该是负责元数据相干的

3)new RecordAccumulator()应该也是一个组件,临时不晓得是啥意思, 名字是翻译下是记录累加器

4)new NetworkClient()一看就是网络通信相干的组件

5)new Sender()和 new new KafkaThread() 应该是创立了 Runnable, 并且应用 1 个线程启动。看着像是发送音讯的线程

6)new ProducerInterceptors() 貌似是拦截器相干的货色

你能够看到这个构造函数,根本外围脉络就是下面 6 点了。咱们能够画一个组件图小结下:

RecordAccumulator 到底是什么?

晓得了下面次要的组件次要有啥。RecordAccumulator 这个类没看进去是啥意思,怎么办?看看有没有类正文。

/**
 * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
 * instances to be sent to the server.
 * 这个类能够应用队列记录 Records,筹备待发送的数据给 Server(也就是 Broker)* <p>
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 * 当没有被禁用时,累加器因为应用了无限的内存,达到下限会阻塞。*/
public final class RecordAccumulator {}

看过正文后,大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器

Metadata 元数据到底是什么?

还有一个 Metadata 元数据这组件,有些人可能也不太分明,元数据是指什么,元数据就是指形容数据,比方我 mac 或 windows 文件的元数据,就是它的大小,地位,创立工夫,批改工夫等。

那 KafkaProducer 生产者的元数据是指什么呢?这里就要给大家回顾一个常识了:

Kafka 常识回顾 Tips:Topic、Partition、Record,Leader Partition、Follower Partition、Replica 是什么?

这几个是 kafka 治理音讯波及的基本概念。

Topic:Kafka 治理音讯的逻辑构造,Topic 下能够有多个 Partition,用作分布式存储,用来反对海量数据。

Partition:多条音讯存储构造封装,对应到磁盘上的一个个 log 文件。kafka 把音讯存储到磁盘的文件通常称作 log,理论就是多条音讯而已。

Record: 指每一条音讯的形象封装。

Broker 通常有两种角色,leader 和 follwer,为了高可用。follower 是 leader 的正本。

Replica:正本,leader 和 follower 的都能够算是寄存音讯的一个正本,互为备份。所以 replica 能够指 leader,也能够指 follower。

回顾了这几个基本知识,来了解元数据就好多了。

要想发送音讯给 Broker,起码得晓得发送到哪里去。所以就须要形容信息,这些形容信息就是发送音讯须要的元数据。

Producer 个别都须要从 broker 集群去拉取元数据,包含了 Topic 中的 Partitions 信息,前面如果发送音讯到 Topic,才晓得这个 Topic 有哪些 Partitions,哪些是 Leader Partition 所在的 Broker。

组件图最终如下所示:

Producer 外围组件—元数据 Metadata 分析

既然咱们晓得了 Producer 次要初始化了下面的一些组件,那么只有搞懂下面每个组件做了什么,根本 Producer 的很多原理就能了解透彻了。

咱们先来看下 Metadata 这个元数据组件做了什么。

首先 Metadata 的创立很简略,如下:

    /**
     * Create a new Metadata instance
     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
     *        polling 
     * 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询
     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
     * 不刷新即可保留元数据的最长工夫
     */
    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
        this.refreshBackoffMs = refreshBackoffMs;
        this.metadataExpireMs = metadataExpireMs;
        this.lastRefreshMs = 0L;
        this.lastSuccessfulRefreshMs = 0L;
        this.version = 0;
        this.cluster = Cluster.empty();
        this.needUpdate = false;
        this.topics = new HashSet<String>();
        this.listeners = new ArrayList<>();
        this.needMetadataForAllTopics = false;
    }

这个构造函数,从正文就阐明了,这个元数据对象 Metadata 会被定时刷新,也就是说,它应该会定时的从 Broker 拉取外围的元数据到 Producer。

而它的脉络就是

1)初始化了一些配置,依据名字和正文根本都能从猜想进去含意

默认值就是在之前 ConfigDef 动态变量初始化能够看到。

refreshBackoffMs 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询,默认 100ms

metadataExpireMs,默认是每隔 5 分钟拉取一次元数据。

lastRefreshMs 最近拉取元数据的工夫戳

lastSuccessfulRefreshMs 最近拉取元数据胜利的工夫戳

version 元数据拉取的版本

Cluster 这个比拟要害,是元数据信息的对象封装

needUpdate 是否须要拉取标识

topics 记录 topic 信息的汇合

listeners 元数据变更的监听回调

needMetadataForAllTopics 默认是一个 false,临时不晓得是做什么的

2)初始化 Cluster 元数据对象

下面变量中,元数据最终封装寄存在了 Cluster 对象中。能够看下它会放了什么数据:

/**
 * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
 */
public final class Cluster {

    private final boolean isBootstrapConfigured;
    //Kafka Broker 节点
    private final List<Node> nodes;
    // 没有被受权拜访的 Topic 的列表
    private final Set<String> unauthorizedTopics;
    //TopicPartition:Topic 和 Partition 根本关系信息
    //PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // 每个 topic 有哪些分区 
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // 每个 topic 有哪些以后可用的分区,如果某个分区没有 leader 是存活的,此时那个分区就不可用了
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // 每个 broker 上放了哪些分区
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    //broker.id -> Node
    private final Map<Integer, Node> nodesById;
    
    // 省略初始化办法
}

次要就是组成了整个 Kafka 集群信息,比方

Node: 记录了 Kafka Broker 的 ip,端口等

TopicPartition:Topic 和 Partition 根本关系信息

PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等

其余的下面我也用正文根本都标注了他们的大抵意思了。大家大体有一个印象就行,其实只有晓得都是 topic 的元数据就行了。

下面的信息你如果问我是怎么晓得的,很简略,我 debug 了下,当前面拉取到元数据后,你能够看下数据,就明确了。debug 看源码的办法 在这个场景就比拟适宜,咱们目前也没有下载源码,导入源码,只须要写一个 helloWorld,通过maven 主动下载 jar 包的源码,进行 debug 就能够剖析客户端的源码。

之前我提到的源码浏览办法和思维,大家肯定要活学活用。

所以元数据对象次要就是如下所示:

KafkaProducer 创立 Metadata 其实并没有如许简单,创立了之后做了什么呢?KafkaProducer 的构造函数,执行了一个 metadata.update 办法。

 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
     this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

     // RecordAccumulator、NetworkClient、Sender 等组件的初始化,省略...
     this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

     // 省略...
}

这个难道就在进行元数据拉取么?咱们来看下这个 update 办法:

/**
 * Update the cluster metadata
 */
public synchronized void update(Cluster cluster, long now) {
    this.needUpdate = false;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.version += 1;

    for (Listener listener: listeners)
        listener.onMetadataUpdate(cluster);

    // Do this after notifying listeners as subscribed topics' list can be changed by listeners
    this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

    notifyAll();
    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

因为 listeners 之前初始化是空的,这个 needMetadataForAllTopics 参数也是 false,之后间接调用了 Metadata.notifyAll(),其实什么都没干。没有什么元数据拉取或者更新的操作。

最终发现,这个办法阐明其实简直什么都没有做,也就是说 KafkaProducer 创立的时候,没有进行元数据拉取。只是初始化了一个 Metadata 对象,其中元数据对象 Cluster 的信息默认是空的

Metadata 的整个过程的要害,如下图所示:

到这里,你会发现浏览源码的时候,不是什么时候都是一帆风顺的,会被各种分支和代码搞得昏头昏脑。像下面的 update()办法,就会蛊惑你。

但此时你不要灰心,肯定要 缕清外围脉络思路,多画图,先记录要害逻辑,把这里放一放,能够尝试持续剖析其余的场景和逻辑。当剖析的逻辑和场景足够多的时候,多反复剖析几次。你就会缓缓悟到之前不懂的逻辑,会串起来所有的逻辑的。

Producer 外围组件—RecordAccumulator 分析

仔细分析过了元数据组件的创立之后,咱们接着看下一个组件 RecordAccumulator 音讯内存缓冲器。

之前通过正文咱们大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器

创立它的代码次要如下:

  /**
     * Create a new record accumulator
     * 
     * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
     * @param totalSize The maximum memory the record accumulator can use.
     * @param compression The compression codec for the records
     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
     *        exhausting all retries in a short period of time.
     * @param metrics The metrics
     * @param time The time instance to use
     */
    public RecordAccumulator(int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time) {
        this.drainIndex = 0;
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<>();
        String metricGrpName = "producer-metrics";
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
        this.incomplete = new IncompleteRecordBatches();
        this.muted = new HashSet<>();
        this.time = time;
        registerMetrics(metrics, metricGrpName);
    }

这个办法的脉络其实正文曾经通知咱们了,次要就是:

1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()

3)初始化了 BufferPool 和 IncompleteRecordBatches

1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

首先是设置了一些参数,从上一节 ConfigDef 初始化能够看到默认值和根本作用

batchSize 默认是 16kb,批量打包音讯发送给 Broker 的大小管制

totalSize 默认是 32MB,示意音讯内存缓冲区的大小

retryBackoffMs 默认每隔 100ms 重试一次

lingerMs 10ms 内还没有凑成 1 个 batch 发送进来,必须立刻发送进来

compression 压缩申请形式,默认 none

2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()

应该是寄存 Record 音讯的一个汇合,看着像是依照某个 topic 某个分区下,寄存一些音讯,用到了一个双端队列

batches = new ConcurrentMap<TopicPartition, Deque<RecordBatch>>()            

3)初始化了 BufferPool 和 IncompleteRecordBatches

IncompleteRecordBatches 的创立比较简单。如下:

  /*
     * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
     */
    private final static class IncompleteRecordBatches {
        private final Set<RecordBatch> incomplete;

        public IncompleteRecordBatches() {this.incomplete = new HashSet<RecordBatch>();
        }
        
        public void add(RecordBatch batch) {synchronized (incomplete) {this.incomplete.add(batch);
            }
        }
        
        public void remove(RecordBatch batch) {synchronized (incomplete) {boolean removed = this.incomplete.remove(batch);
                if (!removed)
                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
            }
        }
        
        public Iterable<RecordBatch> all() {synchronized (incomplete) {return new ArrayList<>(this.incomplete);
            }
        }
    }

正文能够看进去,它是一个线程平安的辅助类,通过 synchronized 操作 HashSet 保障用于保留 Broker 尚未确认(ack)的 RecordBatches。

而 new BufferPool 初始化缓冲区,代码如下:

 */
public final class BufferPool {

    private final long totalMemory;
    private final int poolableSize;
    private final ReentrantLock lock;
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;

    /**
     * Create a new buffer pool
     * 
     * @param memory The maximum amount of memory that this buffer pool can allocate
     * @param poolableSize The buffer size to cache in the free list rather than deallocating
     * @param metrics instance of Metrics
     * @param time time instance
     * @param metricGrpName logical group name for metrics
     */
    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<ByteBuffer>();
        this.waiters = new ArrayDeque<Condition>();
        this.totalMemory = memory;
        this.availableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
    }

次要是有一把锁和有两个队列,应该是寄存音讯的真正的内存缓存区域。

整个过程如下所示:

你看过这些的组件的内部结构,其实可能并不知道它们到底是干嘛的,没关系,这里咱们 次要的目标原本就是初步就是对这些组件有个印象就能够了,之后剖析某个组件的行为和作用的时候,能力更好的了解。

Producer 外围组件—NetworkClient 分析

如果要拉去元数据或者发送音讯,首先必定要和 Broker 建设连贯。之前剖析 KafkaProducer 的源码脉络时,有一个网络通信组件 NetworkClient,咱们能够剖析下这个组件怎么创立,做了哪些事件。看看元数据拉取会不会在这里呢?

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 一些参数设置,省略...

    // RecordAccumulator、Metadata、Sender 等组件的初始化,省略...

    NetworkClient client = new NetworkClient(
        // Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
        this.metadata,
        clientId,
        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMs, time);
    
    // 省略...
    }

private NetworkClient(MetadataUpdater metadataUpdater,
                          Metadata metadata,
                          Selectable selector,
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time) {

        /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
         * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
         * super constructor is invoked.
         */
        if (metadataUpdater == null) {if (metadata == null)
                throw new IllegalArgumentException("`metadata` must not be null");
            // 更新元数据的一个组件?
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {this.metadataUpdater = metadataUpdater;}
        this.selector = selector;
        this.clientId = clientId;
        // 已发送或正在发送但尚未收到响应的申请集
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        // Producer 与集群中每个节点的连贯状态
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
    }

下面的 NetworkClient 创立,次要是

1)创立了一个 Selector,Selector 这个名称,如果你相熟 Java NIO 的 API 的话,应该不会生疏,它是 NIO 三大组件之一 Selector、Buffer、Channel。Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件。

这里我不开展将 NIO 的原理,Selector 这个组件,你能够简略的了解为是用来监听网络连接是否有建设和读写申请的。

2)设置了一堆配置参数。

3)创立了一个 DefaultMetadataUpdater 组件,将 metadata 传递给了它。从名字连蒙带猜下,如同是更新元数据的一个组件。难道找到元数据拉取的逻辑了?一会能够重点关注下这个类的应用

4)创立了 InFlightRequests 和 ClusterConnectionStates 从这两个类的正文咱们能够看进去,InFlightRequests 是已发送或正在发送但尚未收到响应的申请集,ClusterConnectionStates 是 Producer 与集群中每个节点的连贯状态。**

下面的 NetworkClient 的初始化,整个过程能够总结如下图:

看过了创立的脉络,上面咱们看下细节(先脉络后细节的思维),下面的信息如果你不是一下在就能看进去的话,你就须要看下每个类的细节,确认下了。

细节 1:首先是创立 Selector 代码如下:

public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}

 public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            // 实质还是创立了一个 NIO 的 Selector
            this.nioSelector = java.nio.channels.Selector.open();} catch (IOException e) {throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.channels = new HashMap<>();
        this.completedSends = new ArrayList<>();
        this.completedReceives = new ArrayList<>();
        this.stagedReceives = new HashMap<>();
        this.immediatelyConnectedKeys = new HashSet<>();
        this.connected = new ArrayList<>();
        this.disconnected = new ArrayList<>();
        this.failedSends = new ArrayList<>();
        this.sensors = new SelectorMetrics(metrics);
        this.channelBuilder = channelBuilder;
        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
        this.lruConnections = new LinkedHashMap<>(16, .75F, true);
        currentTimeNanos = time.nanoseconds();
        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
        this.metricsPerConnection = metricsPerConnection;
    }

能够看到,创立 Kafka 的 Selector 实质还是创立了一个 NIO 的 Selector:java.nio.channels.Selector.open();

细节 2:DefaultMetadataUpdater 这个类的初始化,什么都没做,就是援用了下 Metadata

    class DefaultMetadataUpdater implements MetadataUpdater {

        // 援用了下 Metadata
        /* the current cluster metadata */
        private final Metadata metadata;

        /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
        private boolean metadataFetchInProgress;

        /* the last timestamp when no broker node is available to connect */
        private long lastNoNodeAvailableMs;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.metadataFetchInProgress = false;
            this.lastNoNodeAvailableMs = 0;
        }

细节 3:InFlightRequests 的正文确实是已发送或正在发送但尚未收到响应的申请集的意思。不了解也没关系,前面咱们会看到它应用的中央的。

/**
 * The set of requests which have been sent or are being sent but haven't yet received a response
 * 已发送或正在发送但尚未收到响应的申请集的意思
 */
final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
    
    public InFlightRequests(int maxInFlightRequestsPerConnection) {this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;}
}

细节 4:ClusterConnectionStates 这个类正文也就是 Producer 与集群中每个节点的连贯状态的意思。连贯状态次要有已连贯、连贯中、断开。

/**
 * The state of our connection to each node in the cluster.
 * Producer 与集群中每个节点的连贯状态的意思
 * 
 */
final class ClusterConnectionStates {
    private final long reconnectBackoffMs;
    private final Map<String, NodeConnectionState> nodeState;

    public ClusterConnectionStates(long reconnectBackoffMs) {
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.nodeState = new HashMap<String, NodeConnectionState>();}
    /**
     * The state of our connection to a node
     */
    private static class NodeConnectionState {

        ConnectionState state;
        long lastConnectAttemptMs;

        public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
            this.state = state;
            this.lastConnectAttemptMs = lastConnectAttempt;
        }

        public String toString() {return "NodeState(" + state + "," + lastConnectAttemptMs + ")";
        }
    }
    /**
     * The states of a node connection
     * 连贯状态次要有已连贯、连贯中、断开
     */
    public enum ConnectionState {DISCONNECTED, CONNECTING, CONNECTED}

下面整个 NeworkClient 的初始化,就实现了。至于网络组件的相干参数这里先不做解释,当应用到的时候我再给大家解释。目前解释了大家可能也太能了解。

整个细节,我大抵整顿如下图:

Producer 外围组件—Sender 线程分析

网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器,咱们都分析了下它们的初始化过程。次要晓得它们初始化了那些货色。咱们总结了组件图,记录了要害信息。咱们能够持续往下剖析最初一个外围的组件 Send 线程。咱们来看看它搞了哪些事件。

Sender 的初始化逻辑如下所示:

  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
    // RecordAccumulator、NetworkClient、Metadata 等组件的初始化,省略...
    this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
    // 省略...
 }
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              String clientId,
              int requestTimeout) {
    this.client = client;
    this.accumulator = accumulator;
    this.metadata = metadata;
    this.guaranteeMessageOrder = guaranteeMessageOrder;
    this.maxRequestSize = maxRequestSize;
    this.running = true;
    this.acks = acks;
    this.retries = retries;
    this.time = time;
    this.clientId = clientId;
    this.sensors = new SenderMetrics(metrics);
    this.requestTimeout = requestTimeout;
}

public KafkaThread(final String name, Runnable runnable, boolean daemon) {super(runnable, name);
    setDaemon(daemon);
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error("Uncaught exception in" + name + ":", e);
        }
    });
}

这个初始化外围脉络很简略,次要就是将其余组件交给了 Sender 去应用。

1) 设置了 sender 的一些外围参数

retries:重试次数,默认是 0,不重试

acks:”all”, “-1”, “0”, “1” 确认策略 默认是 1,leader broker 写入胜利,就算发送胜利。(可能导致音讯失落)

max.request.size:最大的申请大小 默认 1mb

max.in.flight.requests.per.connection 参数默认值是 5,每个 Broker 最多只能有 5 个申请是发送进来然而还没接管到响应的(重试可能导致音讯程序错乱)

2)援用了其余三个要害组件:网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器

3)之后通过 KafkaThread 包装了 Runnable 线程,启动了线程,开始执行 Sender 的 run 办法了

整个过程如下所示:

run 办法的执行,不是这一节咱们次要关怀的了。我前面几节会详细分析的。

小结

最初咱们小结下吧,明天咱们次要相熟了如下的内容:

KafkaProducer 初始化的哪些组件

Producer 外围组件—RecordAccumulator 分析

Producer 外围组件—元数据 Metadata 分析

Producer 外围组件—NetworkClient 分析

Producer 外围组件—Sender 线程分析

咱们只是根本意识了下,每个组件是什么,次要干什么,外部次要有些什么货色。我把图明天相熟的只是,给大家汇总一了一张大图:

有了这张图,前面几节咱们就来重点开始剖析 Kafka Prodcuer 外围流程就容易很多了。比方

元数据拉取机制 wait+notifyAll 的灵便应用、发送音讯的路由策略

网络通信机制,基于原生 NIO 发送音讯机制 + 粘包拆包问题的奇妙解决

Producer 的高吞吐:内存缓冲区的双端队列 + 批量打包 Batch 发送机制

大家敬请期待吧,好了明天就到这里,咱们下节见!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0