关于springboot:Kafka成长记2Producer核心组件分析

上一节咱们次要从HelloWorld开始,剖析了Kafka Producer的创立,重点剖析了如何解析生产者配置的源码原理。

    public KafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), null, null);
    }

Kafka Producer的创立除了配置解析,还有要害的一步就是调用了一个重载的构造函数。这一节咱们就来看下它次要做了什么。

KafkaProducer初始化的哪些组件?

既然时一个要害组件创立,剖析的构造函数,咱们首要做的就是剖析它的代码脉络,看看外围的组件有哪些,画一个组件图先。

让咱们来看下构造函数的代码:

  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {
            log.trace("Starting the Kafka producer");
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = new SystemTime();

            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            Map<String, String> metricTags = new LinkedHashMap<String, String>();
            metricTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
            /* check for user defined settings.
             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
             * This should be removed with release 0.9 when the deprecated configs are removed.
             */
            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
                if (blockOnBufferFull) {
                    this.maxBlockTimeMs = Long.MAX_VALUE;
                } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                            "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
                } else {
                    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
                }
            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {
                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            }

            /* check for user defined settings.
             * If the TIME_OUT config is set use that for request timeout.
             * This should be removed with release 0.9
             */
            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
            } else {
                this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            }

            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs, time);
            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

            this.errors = this.metrics.sensor("errors");

            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);    
        }
    }

这个构造函数的代码还是比拟都多的,不过没关系,先扫一下它的脉络:

1)次要是依据之前解析好的ProducerConfig对象,设置了一堆Producer的参数

2)new Metadata(),它应该算一个组件,从名字上猜想,应该是负责元数据相干的

3)new RecordAccumulator()应该也是一个组件,临时不晓得是啥意思,名字是翻译下是记录累加器

4)new NetworkClient()一看就是网络通信相干的组件

5)new Sender()和 new new KafkaThread() 应该是创立了Runnable,并且应用1个线程启动。看着像是发送音讯的线程

6)new ProducerInterceptors() 貌似是拦截器相干的货色

你能够看到这个构造函数,根本外围脉络就是下面6点了。咱们能够画一个组件图小结下:

RecordAccumulator到底是什么?

晓得了下面次要的组件次要有啥。RecordAccumulator这个类没看进去是啥意思,怎么办?看看有没有类正文。

/**
 * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
 * instances to be sent to the server.
 * 这个类能够应用队列记录Records,筹备待发送的数据给Server(也就是Broker)
 * <p>
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 * 当没有被禁用时,累加器因为应用了无限的内存,达到下限会阻塞。
 */
public final class RecordAccumulator {
    
}

看过正文后,大体晓得RecordAccumulator,它是个记录累加器,这个记录Record其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给Broker。所以咱们就能够称他为发送音讯的内存缓冲器

Metadata元数据到底是什么?

还有一个Metadata元数据这组件,有些人可能也不太分明,元数据是指什么,元数据就是指形容数据,比方我mac或windows文件的元数据,就是它的大小,地位,创立工夫,批改工夫等。

那KafkaProducer生产者的元数据是指什么呢?这里就要给大家回顾一个常识了:

Kafka常识回顾Tips:Topic、Partition、Record,Leader Partition、Follower Partition、Replica是什么?

这几个是kafka治理音讯波及的基本概念。

Topic:Kafka治理音讯的逻辑构造,Topic下能够有多个Partition,用作分布式存储,用来反对海量数据。

Partition:多条音讯存储构造封装,对应到磁盘上的一个个log文件。kafka把音讯存储到磁盘的文件通常称作log,理论就是多条音讯而已。

Record:指每一条音讯的形象封装。

Broker通常有两种角色,leader和follwer,为了高可用。follower是leader的正本。

Replica:正本,leader和follower的都能够算是寄存音讯的一个正本,互为备份。所以replica能够指leader,也能够指follower。

回顾了这几个基本知识,来了解元数据就好多了。

要想发送音讯给Broker,起码得晓得发送到哪里去。所以就须要形容信息,这些形容信息就是发送音讯须要的元数据。

Producer个别都须要从broker集群去拉取元数据,包含了Topic中的Partitions信息,前面如果发送音讯到Topic,才晓得这个Topic有哪些Partitions,哪些是Leader Partition所在的Broker。

组件图最终如下所示:

Producer外围组件—元数据Metadata分析

既然咱们晓得了Producer次要初始化了下面的一些组件,那么只有搞懂下面每个组件做了什么,根本Producer的很多原理就能了解透彻了。

咱们先来看下Metadata这个元数据组件做了什么。

首先Metadata的创立很简略,如下:

    /**
     * Create a new Metadata instance
     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
     *        polling 
     * 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询
     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
     * 不刷新即可保留元数据的最长工夫
     */
    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
        this.refreshBackoffMs = refreshBackoffMs;
        this.metadataExpireMs = metadataExpireMs;
        this.lastRefreshMs = 0L;
        this.lastSuccessfulRefreshMs = 0L;
        this.version = 0;
        this.cluster = Cluster.empty();
        this.needUpdate = false;
        this.topics = new HashSet<String>();
        this.listeners = new ArrayList<>();
        this.needMetadataForAllTopics = false;
    }

这个构造函数,从正文就阐明了,这个元数据对象Metadata会被定时刷新,也就是说,它应该会定时的从Broker拉取外围的元数据到Producer。

而它的脉络就是

1)初始化了一些配置 ,依据名字和正文根本都能从猜想进去含意

默认值就是在之前ConfigDef动态变量初始化能够看到。

refreshBackoffMs 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询,默认100ms

metadataExpireMs ,默认是每隔5分钟拉取一次元数据。

lastRefreshMs 最近拉取元数据的工夫戳

lastSuccessfulRefreshMs 最近拉取元数据胜利的工夫戳

version 元数据拉取的版本

Cluster 这个比拟要害,是元数据信息的对象封装

needUpdate 是否须要拉取标识

topics 记录topic信息的汇合

listeners 元数据变更的监听回调

needMetadataForAllTopics 默认是一个false,临时不晓得是做什么的

2)初始化Cluster元数据对象

下面变量中,元数据最终封装寄存在了Cluster对象中。能够看下它会放了什么数据:

/**
 * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
 */
public final class Cluster {

    private final boolean isBootstrapConfigured;
    //Kafka Broker节点
    private final List<Node> nodes;
    //没有被受权拜访的Topic的列表
    private final Set<String> unauthorizedTopics;
    //TopicPartition:Topic和Partition根本关系信息
    //PartitionInfo:Partition的详细信息,比方数据同步进度ISR列表、Leader、Follower节点信息等
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    //每个topic有哪些分区 
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    //每个topic有哪些以后可用的分区,如果某个分区没有leader是存活的,此时那个分区就不可用了
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    //每个broker上放了哪些分区
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    //broker.id -> Node
    private final Map<Integer, Node> nodesById;
    
    //省略初始化办法
}

次要就是组成了整个Kafka集群信息,比方

Node:记录了Kafka Broker的ip,端口等

TopicPartition:Topic和Partition根本关系信息

PartitionInfo:Partition的详细信息,比方数据同步进度ISR列表、Leader、Follower节点信息等

其余的下面我也用正文根本都标注了他们的大抵意思了。大家大体有一个印象就行,其实只有晓得都是topic的元数据就行了。

下面的信息你如果问我是怎么晓得的,很简略,我debug了下,当前面拉取到元数据后,你能够看下数据,就明确了。debug看源码的办法在这个场景就比拟适宜,咱们目前也没有下载源码,导入源码,只须要写一个helloWorld,通过maven主动下载jar包的源码,进行debug就能够剖析客户端的源码。

之前我提到的源码浏览办法和思维,大家肯定要活学活用。

所以元数据对象次要就是如下所示:

KafkaProducer创立Metadata其实并没有如许简单, 创立了之后做了什么呢?KafkaProducer的构造函数,执行了一个metadata.update办法。

 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
     this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

     // RecordAccumulator、NetworkClient、Sender等组件的初始化,省略...
     this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

     //省略...
}

这个难道就在进行元数据拉取么?咱们来看下这个update办法:

/**
 * Update the cluster metadata
 */
public synchronized void update(Cluster cluster, long now) {
    this.needUpdate = false;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.version += 1;

    for (Listener listener: listeners)
        listener.onMetadataUpdate(cluster);

    // Do this after notifying listeners as subscribed topics' list can be changed by listeners
    this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

    notifyAll();
    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

因为listeners之前初始化是空的,这个needMetadataForAllTopics参数也是false,之后间接调用了Metadata.notifyAll(),其实什么都没干。没有什么元数据拉取或者更新的操作。

最终发现,这个办法阐明其实简直什么都没有做,也就是说KafkaProducer创立的时候,没有进行元数据拉取。只是初始化了一个Metadata对象,其中元数据对象Cluster的信息默认是空的

Metadata的整个过程的要害,如下图所示:

到这里,你会发现浏览源码的时候,不是什么时候都是一帆风顺的,会被各种分支和代码搞得昏头昏脑。像下面的update()办法,就会蛊惑你。

但此时你不要灰心,肯定要缕清外围脉络思路,多画图,先记录要害逻辑,把这里放一放,能够尝试持续剖析其余的场景和逻辑。当剖析的逻辑和场景足够多的时候,多反复剖析几次。你就会缓缓悟到之前不懂的逻辑,会串起来所有的逻辑的。

Producer外围组件—RecordAccumulator分析

仔细分析过了元数据组件的创立之后,咱们接着看下一个组件RecordAccumulator音讯内存缓冲器。

之前通过正文咱们大体晓得RecordAccumulator,它是个记录累加器,这个记录Record其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给Broker。所以咱们就能够称他为发送音讯的内存缓冲器

创立它的代码次要如下:

  /**
     * Create a new record accumulator
     * 
     * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
     * @param totalSize The maximum memory the record accumulator can use.
     * @param compression The compression codec for the records
     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
     *        exhausting all retries in a short period of time.
     * @param metrics The metrics
     * @param time The time instance to use
     */
    public RecordAccumulator(int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time) {
        this.drainIndex = 0;
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<>();
        String metricGrpName = "producer-metrics";
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
        this.incomplete = new IncompleteRecordBatches();
        this.muted = new HashSet<>();
        this.time = time;
        registerMetrics(metrics, metricGrpName);
    }

这个办法的脉络其实正文曾经通知咱们了,次要就是:

1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression等

2)初始化了一些数据结构,比方batches是一个 new CopyOnWriteMap<>()

3)初始化了BufferPool和IncompleteRecordBatches

1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression等

首先是设置了一些参数 ,从上一节ConfigDef初始化能够看到默认值和根本作用

batchSize 默认是16kb,批量打包音讯发送给Broker的大小管制

totalSize 默认是32MB,示意音讯内存缓冲区的大小

retryBackoffMs 默认每隔100ms重试一次

lingerMs 10ms内还没有凑成1个batch发送进来,必须立刻发送进来

compression 压缩申请形式,默认none

2)初始化了一些数据结构,比方batches是一个 new CopyOnWriteMap<>()

应该是寄存Record音讯的一个汇合,看着像是依照某个topic某个分区下,寄存一些音讯,用到了一个双端队列

batches = new ConcurrentMap<TopicPartition, Deque<RecordBatch>>()            

3)初始化了BufferPool和IncompleteRecordBatches

IncompleteRecordBatches的创立比较简单。如下:

  /*
     * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
     */
    private final static class IncompleteRecordBatches {
        private final Set<RecordBatch> incomplete;

        public IncompleteRecordBatches() {
            this.incomplete = new HashSet<RecordBatch>();
        }
        
        public void add(RecordBatch batch) {
            synchronized (incomplete) {
                this.incomplete.add(batch);
            }
        }
        
        public void remove(RecordBatch batch) {
            synchronized (incomplete) {
                boolean removed = this.incomplete.remove(batch);
                if (!removed)
                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
            }
        }
        
        public Iterable<RecordBatch> all() {
            synchronized (incomplete) {
                return new ArrayList<>(this.incomplete);
            }
        }
    }

正文能够看进去,它是一个线程平安的辅助类,通过synchronized 操作HashSet保障用于保留Broker尚未确认(ack)的RecordBatches。

而new BufferPool初始化缓冲区,代码如下:

 */
public final class BufferPool {

    private final long totalMemory;
    private final int poolableSize;
    private final ReentrantLock lock;
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;

    /**
     * Create a new buffer pool
     * 
     * @param memory The maximum amount of memory that this buffer pool can allocate
     * @param poolableSize The buffer size to cache in the free list rather than deallocating
     * @param metrics instance of Metrics
     * @param time time instance
     * @param metricGrpName logical group name for metrics
     */
    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<ByteBuffer>();
        this.waiters = new ArrayDeque<Condition>();
        this.totalMemory = memory;
        this.availableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
    }

次要是有一把锁和有两个队列,应该是寄存音讯的真正的内存缓存区域。

整个过程如下所示:

你看过这些的组件的内部结构,其实可能并不知道它们到底是干嘛的,没关系,这里咱们次要的目标原本就是初步就是对这些组件有个印象就能够了,之后剖析某个组件的行为和作用的时候,能力更好的了解。

Producer外围组件—NetworkClient分析

如果要拉去元数据或者发送音讯,首先必定要和Broker建设连贯。之前剖析KafkaProducer的源码脉络时,有一个网络通信组件NetworkClient,咱们能够剖析下这个组件怎么创立,做了哪些事件。看看元数据拉取会不会在这里呢?

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 一些参数设置,省略...

    // RecordAccumulator、Metadata、Sender等组件的初始化,省略...

    NetworkClient client = new NetworkClient(
        // Kafka将原生的Selector稍微包装了下,包装成Kafka自已的一个Selector网络通信组件
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
        this.metadata,
        clientId,
        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMs, time);
    
    //省略...
    }

private NetworkClient(MetadataUpdater metadataUpdater,
                          Metadata metadata,
                          Selectable selector,
                          String clientId,
                          int maxInFlightRequestsPerConnection,
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int requestTimeoutMs,
                          Time time) {

        /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
         * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
         * super constructor is invoked.
         */
        if (metadataUpdater == null) {
            if (metadata == null)
                throw new IllegalArgumentException("`metadata` must not be null");
            //更新元数据的一个组件?
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {
            this.metadataUpdater = metadataUpdater;
        }
        this.selector = selector;
        this.clientId = clientId;
        // 已发送或正在发送但尚未收到响应的申请集
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        // Producer与集群中每个节点的连贯状态
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
    }

下面的NetworkClient 创立,次要是

1)创立了一个Selector,Selector这个名称,如果你相熟Java NIO的API的话,应该不会生疏,它是NIO三大组件之一Selector、Buffer、Channel。Kafka将原生的Selector稍微包装了下,包装成Kafka自已的一个Selector网络通信组件。

这里我不开展将NIO的原理,Selector这个组件,你能够简略的了解为是用来监听网络连接是否有建设和读写申请的。

2)设置了一堆配置参数。

3)创立了一个DefaultMetadataUpdater组件,将metadata传递给了它。从名字连蒙带猜下,如同是更新元数据的一个组件。难道找到元数据拉取的逻辑了?一会能够重点关注下这个类的应用

4)创立了InFlightRequests和ClusterConnectionStates 从这两个类的正文咱们能够看进去,InFlightRequests是已发送或正在发送但尚未收到响应的申请集,ClusterConnectionStates 是Producer与集群中每个节点的连贯状态。**

下面的NetworkClient的初始化,整个过程能够总结如下图:

看过了创立的脉络,上面咱们看下细节(先脉络后细节的思维),下面的信息如果你不是一下在就能看进去的话,你就须要看下每个类的细节,确认下了。

细节1:首先是创立Selector代码如下:

public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
   this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}

 public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            //实质还是创立了一个NIO的Selector
            this.nioSelector = java.nio.channels.Selector.open();
        } catch (IOException e) {
            throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.channels = new HashMap<>();
        this.completedSends = new ArrayList<>();
        this.completedReceives = new ArrayList<>();
        this.stagedReceives = new HashMap<>();
        this.immediatelyConnectedKeys = new HashSet<>();
        this.connected = new ArrayList<>();
        this.disconnected = new ArrayList<>();
        this.failedSends = new ArrayList<>();
        this.sensors = new SelectorMetrics(metrics);
        this.channelBuilder = channelBuilder;
        // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
        this.lruConnections = new LinkedHashMap<>(16, .75F, true);
        currentTimeNanos = time.nanoseconds();
        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
        this.metricsPerConnection = metricsPerConnection;
    }

能够看到,创立Kafka的Selector实质还是创立了一个NIO的Selector:java.nio.channels.Selector.open();

细节2:DefaultMetadataUpdater这个类的初始化,什么都没做,就是援用了下Metadata

    class DefaultMetadataUpdater implements MetadataUpdater {

        //援用了下Metadata
        /* the current cluster metadata */
        private final Metadata metadata;

        /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
        private boolean metadataFetchInProgress;

        /* the last timestamp when no broker node is available to connect */
        private long lastNoNodeAvailableMs;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.metadataFetchInProgress = false;
            this.lastNoNodeAvailableMs = 0;
        }

细节3:InFlightRequests的正文确实是已发送或正在发送但尚未收到响应的申请集的意思。不了解也没关系,前面咱们会看到它应用的中央的。

/**
 * The set of requests which have been sent or are being sent but haven't yet received a response
 * 已发送或正在发送但尚未收到响应的申请集的意思
 */
final class InFlightRequests {

    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
    
    public InFlightRequests(int maxInFlightRequestsPerConnection) {
        this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
    }
}

细节4:ClusterConnectionStates这个类正文也就是Producer与集群中每个节点的连贯状态的意思。连贯状态次要有已连贯、连贯中、断开。

/**
 * The state of our connection to each node in the cluster.
 * Producer与集群中每个节点的连贯状态的意思
 * 
 */
final class ClusterConnectionStates {
    private final long reconnectBackoffMs;
    private final Map<String, NodeConnectionState> nodeState;

    public ClusterConnectionStates(long reconnectBackoffMs) {
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.nodeState = new HashMap<String, NodeConnectionState>();
    }
    /**
     * The state of our connection to a node
     */
    private static class NodeConnectionState {

        ConnectionState state;
        long lastConnectAttemptMs;

        public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
            this.state = state;
            this.lastConnectAttemptMs = lastConnectAttempt;
        }

        public String toString() {
            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
        }
    }
    /**
     * The states of a node connection
     * 连贯状态次要有已连贯、连贯中、断开
     */
    public enum ConnectionState {
        DISCONNECTED, CONNECTING, CONNECTED
    }

下面整个NeworkClient的初始化,就实现了。至于网络组件的相干参数这里先不做解释,当应用到的时候我再给大家解释。目前解释了大家可能也太能了解。

整个细节,我大抵整顿如下图:

Producer外围组件—Sender线程分析

网络组件NeworkClient和元数据Metadata、RecordAccumulator发送音讯的内存缓冲器,咱们都分析了下它们的初始化过程。次要晓得它们初始化了那些货色。咱们总结了组件图,记录了要害信息。咱们能够持续往下剖析最初一个外围的组件Send线程。咱们来看看它搞了哪些事件。

Sender的初始化逻辑如下所示:

  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
     // 一些参数设置,省略...
    // RecordAccumulator、NetworkClient、Metadata等组件的初始化,省略...
    this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
    // 省略...
 }
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              String clientId,
              int requestTimeout) {
    this.client = client;
    this.accumulator = accumulator;
    this.metadata = metadata;
    this.guaranteeMessageOrder = guaranteeMessageOrder;
    this.maxRequestSize = maxRequestSize;
    this.running = true;
    this.acks = acks;
    this.retries = retries;
    this.time = time;
    this.clientId = clientId;
    this.sensors = new SenderMetrics(metrics);
    this.requestTimeout = requestTimeout;
}

public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    setDaemon(daemon);
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        public void uncaughtException(Thread t, Throwable e) {
            log.error("Uncaught exception in " + name + ": ", e);
        }
    });
}

这个初始化外围脉络很简略,次要就是将其余组件交给了Sender去应用。

1) 设置了sender的一些外围参数

retries:重试次数,默认是0,不重试

acks:”all”, “-1”, “0”, “1” 确认策略 默认是1,leader broker写入胜利,就算发送胜利。 (可能导致音讯失落)

max.request.size:最大的申请大小 默认1mb

max.in.flight.requests.per.connection 参数默认值是5,每个Broker最多只能有5个申请是发送进来然而还没接管到响应的(重试可能导致音讯程序错乱)

2)援用了其余三个要害组件:网络组件NeworkClient和元数据Metadata、RecordAccumulator发送音讯的内存缓冲器

3)之后通过KafkaThread包装了Runnable线程,启动了线程,开始执行Sender的run办法了

整个过程如下所示:

run办法的执行,不是这一节咱们次要关怀的了。我前面几节会详细分析的。

小结

最初咱们小结下吧,明天咱们次要相熟了如下的内容:

KafkaProducer初始化的哪些组件

Producer外围组件—RecordAccumulator分析

Producer外围组件—元数据Metadata分析

Producer外围组件—NetworkClient分析

Producer外围组件—Sender线程分析

咱们只是根本意识了下,每个组件是什么,次要干什么,外部次要有些什么货色。我把图明天相熟的只是,给大家汇总一了一张大图:

有了这张图,前面几节咱们就来重点开始剖析Kafka Prodcuer外围流程就容易很多了。比方

元数据拉取机制wait+notifyAll的灵便应用、发送音讯的路由策略

网络通信机制,基于原生NIO发送音讯机制+粘包拆包问题的奇妙解决

Producer的高吞吐:内存缓冲区的双端队列+批量打包Batch发送机制

大家敬请期待吧,好了明天就到这里,咱们下节见!

本文由博客一文多发平台 OpenWrite 公布!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理