共计 24478 个字符,预计需要花费 62 分钟才能阅读完成。
上一节咱们次要从 HelloWorld 开始,剖析了 Kafka Producer 的创立,重点剖析了如何解析生产者配置的源码原理。
public KafkaProducer(Properties properties) {this(new ProducerConfig(properties), null, null);
}
Kafka Producer 的创立除了配置解析,还有要害的一步就是调用了一个重载的构造函数。这一节咱们就来看下它次要做了什么。
KafkaProducer 初始化的哪些组件?
既然时一个要害组件创立,剖析的构造函数,咱们首要做的就是剖析它的代码脉络,看看外围的组件有哪些,画一个组件图先。
让咱们来看下构造函数的代码:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {log.trace("Starting the Kafka producer");
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime();
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + "config is deprecated and will be removed soon." +
"Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {this.maxBlockTimeMs = Long.MAX_VALUE;} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
"Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + "config is deprecated and will be removed soon." +
"Please use" + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
/* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.TIMEOUT_CONFIG + "config is deprecated and will be removed soon. Please use" +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
} else {this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
这个构造函数的代码还是比拟都多的,不过没关系,先扫一下它的脉络:
1)次要是依据之前解析好的 ProducerConfig 对象,设置了一堆 Producer 的参数
2)new Metadata(),它应该算一个组件,从名字上猜想,应该是负责元数据相干的
3)new RecordAccumulator()应该也是一个组件,临时不晓得是啥意思, 名字是翻译下是记录累加器
4)new NetworkClient()一看就是网络通信相干的组件
5)new Sender()和 new new KafkaThread() 应该是创立了 Runnable, 并且应用 1 个线程启动。看着像是发送音讯的线程
6)new ProducerInterceptors() 貌似是拦截器相干的货色
你能够看到这个构造函数,根本外围脉络就是下面 6 点了。咱们能够画一个组件图小结下:
RecordAccumulator 到底是什么?
晓得了下面次要的组件次要有啥。RecordAccumulator 这个类没看进去是啥意思,怎么办?看看有没有类正文。
/**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* instances to be sent to the server.
* 这个类能够应用队列记录 Records,筹备待发送的数据给 Server(也就是 Broker)* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
* 当没有被禁用时,累加器因为应用了无限的内存,达到下限会阻塞。*/
public final class RecordAccumulator {}
看过正文后,大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器。
Metadata 元数据到底是什么?
还有一个 Metadata 元数据这组件,有些人可能也不太分明,元数据是指什么,元数据就是指形容数据,比方我 mac 或 windows 文件的元数据,就是它的大小,地位,创立工夫,批改工夫等。
那 KafkaProducer 生产者的元数据是指什么呢?这里就要给大家回顾一个常识了:
Kafka 常识回顾 Tips:Topic、Partition、Record,Leader Partition、Follower Partition、Replica 是什么?
这几个是 kafka 治理音讯波及的基本概念。
Topic:Kafka 治理音讯的逻辑构造,Topic 下能够有多个 Partition,用作分布式存储,用来反对海量数据。
Partition:多条音讯存储构造封装,对应到磁盘上的一个个 log 文件。kafka 把音讯存储到磁盘的文件通常称作 log,理论就是多条音讯而已。
Record: 指每一条音讯的形象封装。
Broker 通常有两种角色,leader 和 follwer,为了高可用。follower 是 leader 的正本。
Replica:正本,leader 和 follower 的都能够算是寄存音讯的一个正本,互为备份。所以 replica 能够指 leader,也能够指 follower。
回顾了这几个基本知识,来了解元数据就好多了。
要想发送音讯给 Broker,起码得晓得发送到哪里去。所以就须要形容信息,这些形容信息就是发送音讯须要的元数据。
Producer 个别都须要从 broker 集群去拉取元数据,包含了 Topic 中的 Partitions 信息,前面如果发送音讯到 Topic,才晓得这个 Topic 有哪些 Partitions,哪些是 Leader Partition 所在的 Broker。
组件图最终如下所示:
Producer 外围组件—元数据 Metadata 分析
既然咱们晓得了 Producer 次要初始化了下面的一些组件,那么只有搞懂下面每个组件做了什么,根本 Producer 的很多原理就能了解透彻了。
咱们先来看下 Metadata 这个元数据组件做了什么。
首先 Metadata 的创立很简略,如下:
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* 不刷新即可保留元数据的最长工夫
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashSet<String>();
this.listeners = new ArrayList<>();
this.needMetadataForAllTopics = false;
}
这个构造函数,从正文就阐明了,这个元数据对象 Metadata 会被定时刷新,也就是说,它应该会定时的从 Broker 拉取外围的元数据到 Producer。
而它的脉络就是
1)初始化了一些配置,依据名字和正文根本都能从猜想进去含意
默认值就是在之前 ConfigDef 动态变量初始化能够看到。
refreshBackoffMs 元数据刷新之间必须终止的最短时间,以防止忙碌的轮询,默认 100ms
metadataExpireMs,默认是每隔 5 分钟拉取一次元数据。
lastRefreshMs 最近拉取元数据的工夫戳
lastSuccessfulRefreshMs 最近拉取元数据胜利的工夫戳
version 元数据拉取的版本
Cluster 这个比拟要害,是元数据信息的对象封装
needUpdate 是否须要拉取标识
topics 记录 topic 信息的汇合
listeners 元数据变更的监听回调
needMetadataForAllTopics 默认是一个 false,临时不晓得是做什么的
2)初始化 Cluster 元数据对象
下面变量中,元数据最终封装寄存在了 Cluster 对象中。能够看下它会放了什么数据:
/**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public final class Cluster {
private final boolean isBootstrapConfigured;
//Kafka Broker 节点
private final List<Node> nodes;
// 没有被受权拜访的 Topic 的列表
private final Set<String> unauthorizedTopics;
//TopicPartition:Topic 和 Partition 根本关系信息
//PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
// 每个 topic 有哪些分区
private final Map<String, List<PartitionInfo>> partitionsByTopic;
// 每个 topic 有哪些以后可用的分区,如果某个分区没有 leader 是存活的,此时那个分区就不可用了
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// 每个 broker 上放了哪些分区
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//broker.id -> Node
private final Map<Integer, Node> nodesById;
// 省略初始化办法
}
次要就是组成了整个 Kafka 集群信息,比方
Node: 记录了 Kafka Broker 的 ip,端口等
TopicPartition:Topic 和 Partition 根本关系信息
PartitionInfo:Partition 的详细信息,比方数据同步进度 ISR 列表、Leader、Follower 节点信息等
其余的下面我也用正文根本都标注了他们的大抵意思了。大家大体有一个印象就行,其实只有晓得都是 topic 的元数据就行了。
下面的信息你如果问我是怎么晓得的,很简略,我 debug 了下,当前面拉取到元数据后,你能够看下数据,就明确了。debug 看源码的办法 在这个场景就比拟适宜,咱们目前也没有下载源码,导入源码,只须要写一个 helloWorld,通过maven 主动下载 jar 包的源码,进行 debug 就能够剖析客户端的源码。
之前我提到的源码浏览办法和思维,大家肯定要活学活用。
所以元数据对象次要就是如下所示:
KafkaProducer 创立 Metadata 其实并没有如许简单,创立了之后做了什么呢?KafkaProducer 的构造函数,执行了一个 metadata.update 办法。
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
// 一些参数设置,省略...
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
// RecordAccumulator、NetworkClient、Sender 等组件的初始化,省略...
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
// 省略...
}
这个难道就在进行元数据拉取么?咱们来看下这个 update 办法:
/**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
因为 listeners 之前初始化是空的,这个 needMetadataForAllTopics 参数也是 false,之后间接调用了 Metadata.notifyAll(),其实什么都没干。没有什么元数据拉取或者更新的操作。
最终发现,这个办法阐明其实简直什么都没有做,也就是说 KafkaProducer 创立的时候,没有进行元数据拉取。只是初始化了一个 Metadata 对象,其中元数据对象 Cluster 的信息默认是空的。
Metadata 的整个过程的要害,如下图所示:
到这里,你会发现浏览源码的时候,不是什么时候都是一帆风顺的,会被各种分支和代码搞得昏头昏脑。像下面的 update()办法,就会蛊惑你。
但此时你不要灰心,肯定要 缕清外围脉络思路,多画图,先记录要害逻辑,把这里放一放,能够尝试持续剖析其余的场景和逻辑。当剖析的逻辑和场景足够多的时候,多反复剖析几次。你就会缓缓悟到之前不懂的逻辑,会串起来所有的逻辑的。
Producer 外围组件—RecordAccumulator 分析
仔细分析过了元数据组件的创立之后,咱们接着看下一个组件 RecordAccumulator 音讯内存缓冲器。
之前通过正文咱们大体晓得 RecordAccumulator, 它是个记录累加器,这个记录 Record 其实能够看做是一条音讯的形象封装,也就是它是音讯累加器,通过一个内存队列缓存,做了一个缓冲,筹备将这个数据发送给 Broker。所以咱们就能够称他为 发送音讯的内存缓冲器。
创立它的代码次要如下:
/**
* Create a new record accumulator
*
* @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
* @param totalSize The maximum memory the record accumulator can use.
* @param compression The compression codec for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
* @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
* exhausting all retries in a short period of time.
* @param metrics The metrics
* @param time The time instance to use
*/
public RecordAccumulator(int batchSize,
long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
Metrics metrics,
Time time) {
this.drainIndex = 0;
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches();
this.muted = new HashSet<>();
this.time = time;
registerMetrics(metrics, metricGrpName);
}
这个办法的脉络其实正文曾经通知咱们了,次要就是:
1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等
2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()
3)初始化了 BufferPool 和 IncompleteRecordBatches
1)设置了一些参数 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等
首先是设置了一些参数,从上一节 ConfigDef 初始化能够看到默认值和根本作用
batchSize 默认是 16kb,批量打包音讯发送给 Broker 的大小管制
totalSize 默认是 32MB,示意音讯内存缓冲区的大小
retryBackoffMs 默认每隔 100ms 重试一次
lingerMs 10ms 内还没有凑成 1 个 batch 发送进来,必须立刻发送进来
compression 压缩申请形式,默认 none
2)初始化了一些数据结构,比方 batches 是一个 new CopyOnWriteMap<>()
应该是寄存 Record 音讯的一个汇合,看着像是依照某个 topic 某个分区下,寄存一些音讯,用到了一个双端队列
batches = new ConcurrentMap<TopicPartition, Deque<RecordBatch>>()
3)初始化了 BufferPool 和 IncompleteRecordBatches
IncompleteRecordBatches 的创立比较简单。如下:
/*
* A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
*/
private final static class IncompleteRecordBatches {
private final Set<RecordBatch> incomplete;
public IncompleteRecordBatches() {this.incomplete = new HashSet<RecordBatch>();
}
public void add(RecordBatch batch) {synchronized (incomplete) {this.incomplete.add(batch);
}
}
public void remove(RecordBatch batch) {synchronized (incomplete) {boolean removed = this.incomplete.remove(batch);
if (!removed)
throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
}
}
public Iterable<RecordBatch> all() {synchronized (incomplete) {return new ArrayList<>(this.incomplete);
}
}
}
正文能够看进去,它是一个线程平安的辅助类,通过 synchronized 操作 HashSet 保障用于保留 Broker 尚未确认(ack)的 RecordBatches。
而 new BufferPool 初始化缓冲区,代码如下:
*/
public final class BufferPool {
private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
private long availableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<ByteBuffer>();
this.waiters = new ArrayDeque<Condition>();
this.totalMemory = memory;
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
次要是有一把锁和有两个队列,应该是寄存音讯的真正的内存缓存区域。
整个过程如下所示:
你看过这些的组件的内部结构,其实可能并不知道它们到底是干嘛的,没关系,这里咱们 次要的目标原本就是初步就是对这些组件有个印象就能够了,之后剖析某个组件的行为和作用的时候,能力更好的了解。
Producer 外围组件—NetworkClient 分析
如果要拉去元数据或者发送音讯,首先必定要和 Broker 建设连贯。之前剖析 KafkaProducer 的源码脉络时,有一个网络通信组件 NetworkClient,咱们能够剖析下这个组件怎么创立,做了哪些事件。看看元数据拉取会不会在这里呢?
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
// 一些参数设置,省略...
// RecordAccumulator、Metadata、Sender 等组件的初始化,省略...
NetworkClient client = new NetworkClient(
// Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
// 省略...
}
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
*/
if (metadataUpdater == null) {if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
// 更新元数据的一个组件?
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {this.metadataUpdater = metadataUpdater;}
this.selector = selector;
this.clientId = clientId;
// 已发送或正在发送但尚未收到响应的申请集
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
// Producer 与集群中每个节点的连贯状态
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
this.requestTimeoutMs = requestTimeoutMs;
this.time = time;
}
下面的 NetworkClient 创立,次要是
1)创立了一个 Selector,Selector 这个名称,如果你相熟 Java NIO 的 API 的话,应该不会生疏,它是 NIO 三大组件之一 Selector、Buffer、Channel。Kafka 将原生的 Selector 稍微包装了下,包装成 Kafka 自已的一个 Selector 网络通信组件。
这里我不开展将 NIO 的原理,Selector 这个组件,你能够简略的了解为是用来监听网络连接是否有建设和读写申请的。
2)设置了一堆配置参数。
3)创立了一个 DefaultMetadataUpdater 组件,将 metadata 传递给了它。从名字连蒙带猜下,如同是更新元数据的一个组件。难道找到元数据拉取的逻辑了?一会能够重点关注下这个类的应用
4)创立了 InFlightRequests 和 ClusterConnectionStates 从这两个类的正文咱们能够看进去,InFlightRequests 是已发送或正在发送但尚未收到响应的申请集,ClusterConnectionStates 是 Producer 与集群中每个节点的连贯状态。**
下面的 NetworkClient 的初始化,整个过程能够总结如下图:
看过了创立的脉络,上面咱们看下细节(先脉络后细节的思维),下面的信息如果你不是一下在就能看进去的话,你就须要看下每个类的细节,确认下了。
细节 1:首先是创立 Selector 代码如下:
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}
public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
try {
// 实质还是创立了一个 NIO 的 Selector
this.nioSelector = java.nio.channels.Selector.open();} catch (IOException e) {throw new KafkaException(e);
}
this.maxReceiveSize = maxReceiveSize;
this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
this.time = time;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
this.channels = new HashMap<>();
this.completedSends = new ArrayList<>();
this.completedReceives = new ArrayList<>();
this.stagedReceives = new HashMap<>();
this.immediatelyConnectedKeys = new HashSet<>();
this.connected = new ArrayList<>();
this.disconnected = new ArrayList<>();
this.failedSends = new ArrayList<>();
this.sensors = new SelectorMetrics(metrics);
this.channelBuilder = channelBuilder;
// initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
this.lruConnections = new LinkedHashMap<>(16, .75F, true);
currentTimeNanos = time.nanoseconds();
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
this.metricsPerConnection = metricsPerConnection;
}
能够看到,创立 Kafka 的 Selector 实质还是创立了一个 NIO 的 Selector:java.nio.channels.Selector.open();
细节 2:DefaultMetadataUpdater 这个类的初始化,什么都没做,就是援用了下 Metadata
class DefaultMetadataUpdater implements MetadataUpdater {
// 援用了下 Metadata
/* the current cluster metadata */
private final Metadata metadata;
/* true iff there is a metadata request that has been sent and for which we have not yet received a response */
private boolean metadataFetchInProgress;
/* the last timestamp when no broker node is available to connect */
private long lastNoNodeAvailableMs;
DefaultMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
this.metadataFetchInProgress = false;
this.lastNoNodeAvailableMs = 0;
}
细节 3:InFlightRequests 的正文确实是已发送或正在发送但尚未收到响应的申请集的意思。不了解也没关系,前面咱们会看到它应用的中央的。
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
* 已发送或正在发送但尚未收到响应的申请集的意思
*/
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
public InFlightRequests(int maxInFlightRequestsPerConnection) {this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;}
}
细节 4:ClusterConnectionStates 这个类正文也就是 Producer 与集群中每个节点的连贯状态的意思。连贯状态次要有已连贯、连贯中、断开。
/**
* The state of our connection to each node in the cluster.
* Producer 与集群中每个节点的连贯状态的意思
*
*/
final class ClusterConnectionStates {
private final long reconnectBackoffMs;
private final Map<String, NodeConnectionState> nodeState;
public ClusterConnectionStates(long reconnectBackoffMs) {
this.reconnectBackoffMs = reconnectBackoffMs;
this.nodeState = new HashMap<String, NodeConnectionState>();}
/**
* The state of our connection to a node
*/
private static class NodeConnectionState {
ConnectionState state;
long lastConnectAttemptMs;
public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
this.state = state;
this.lastConnectAttemptMs = lastConnectAttempt;
}
public String toString() {return "NodeState(" + state + "," + lastConnectAttemptMs + ")";
}
}
/**
* The states of a node connection
* 连贯状态次要有已连贯、连贯中、断开
*/
public enum ConnectionState {DISCONNECTED, CONNECTING, CONNECTED}
下面整个 NeworkClient 的初始化,就实现了。至于网络组件的相干参数这里先不做解释,当应用到的时候我再给大家解释。目前解释了大家可能也太能了解。
整个细节,我大抵整顿如下图:
Producer 外围组件—Sender 线程分析
网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器,咱们都分析了下它们的初始化过程。次要晓得它们初始化了那些货色。咱们总结了组件图,记录了要害信息。咱们能够持续往下剖析最初一个外围的组件 Send 线程。咱们来看看它搞了哪些事件。
Sender 的初始化逻辑如下所示:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
// 一些参数设置,省略...
// RecordAccumulator、NetworkClient、Metadata 等组件的初始化,省略...
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? "|" + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
// 省略...
}
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
String clientId,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
this.guaranteeMessageOrder = guaranteeMessageOrder;
this.maxRequestSize = maxRequestSize;
this.running = true;
this.acks = acks;
this.retries = retries;
this.time = time;
this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {super(runnable, name);
setDaemon(daemon);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error("Uncaught exception in" + name + ":", e);
}
});
}
这个初始化外围脉络很简略,次要就是将其余组件交给了 Sender 去应用。
1) 设置了 sender 的一些外围参数
retries:重试次数,默认是 0,不重试
acks:”all”, “-1”, “0”, “1” 确认策略 默认是 1,leader broker 写入胜利,就算发送胜利。(可能导致音讯失落)
max.request.size:最大的申请大小 默认 1mb
max.in.flight.requests.per.connection 参数默认值是 5,每个 Broker 最多只能有 5 个申请是发送进来然而还没接管到响应的(重试可能导致音讯程序错乱)
2)援用了其余三个要害组件:网络组件 NeworkClient 和元数据 Metadata、RecordAccumulator 发送音讯的内存缓冲器
3)之后通过 KafkaThread 包装了 Runnable 线程,启动了线程,开始执行 Sender 的 run 办法了
整个过程如下所示:
run 办法的执行,不是这一节咱们次要关怀的了。我前面几节会详细分析的。
小结
最初咱们小结下吧,明天咱们次要相熟了如下的内容:
KafkaProducer 初始化的哪些组件
Producer 外围组件—RecordAccumulator 分析
Producer 外围组件—元数据 Metadata 分析
Producer 外围组件—NetworkClient 分析
Producer 外围组件—Sender 线程分析
咱们只是根本意识了下,每个组件是什么,次要干什么,外部次要有些什么货色。我把图明天相熟的只是,给大家汇总一了一张大图:
有了这张图,前面几节咱们就来重点开始剖析 Kafka Prodcuer 外围流程就容易很多了。比方
元数据拉取机制 wait+notifyAll 的灵便应用、发送音讯的路由策略
网络通信机制,基于原生 NIO 发送音讯机制 + 粘包拆包问题的奇妙解决
Producer 的高吞吐:内存缓冲区的双端队列 + 批量打包 Batch 发送机制
大家敬请期待吧,好了明天就到这里,咱们下节见!
本文由博客一文多发平台 OpenWrite 公布!