共计 3220 个字符,预计需要花费 9 分钟才能阅读完成。
概述
前面两节分析了部分源码,中间间隔了一个月没有继续分享,原因是在深入阅读中碰到了一些问题,因为我阅读源码是以 KafkaProducer 发送消息(send 方法)作为入口开始逐行分析,涉及到新的类后就把整个类和相关的类全部解析一遍,但是引入的其他类在 send 方法里只用到了部分方法,导致我在分析时没有条理,效率很低。从本节开始,我调整下分享的思路,基本点不变,还是以 KafkaProducer->send 方法为入口,优先解析 send 中设计的代码,其他代码等用到时在深入解析。本节我们从 RecordAccumulator 开始,RecordAccumulator 是一个消息累加器,producer 会把消息封装到累加器里方便发送给 broker。
入口
先放上 KafkaProducer.send 里调用累加器的部分代码,这里看到 producer 把消息的目标分区信息,序列化以后的 key,value 和回调还有 headers 统一传给累加器。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
append 解析
RecordAccumulator 内部有一个类型为 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 的成员变量 batches,key 我们之前有提到过,是 Topic 的分区信息,value 是一个批次的双端队列,ProducerBatch 是一个批次,里面存放具体的消息。整个 append 方法重点步骤如下
- 拿到对应 TopicPartition 的批次队列 Deque<ProducerBatch>。
- 第一次 tryAppend
- 成功则返回,如果失败分配缓冲区
- 第二次 tryAppend
- 成功则返回,失败则创建新批次 ProducerBatch
- 第三次 tryAppend
- 释放缓冲区
这里 RecordAccumulator 用了一个细粒度锁的设计,主要考虑两点:
- 没有给整个方法加锁,通过使用 ConcurrentHashMap 保证并发安全,这样并发度就高了,每一个 TopicPartition 对应的的批次都可以并发操作。
- 也没有给整段操作批次的代码加锁,而是分段加锁,把能并发执行的申请缓冲区分离出来,这个设计可以看出在性能上考虑的很极致,只在必需的地方加锁。
进一步的解析可以看下面代码注释:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 获取批次队列,有就返还旧的,没有就新增然后返回新的
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {if (closed)
throw new KafkaException("Producer closed while send in progress");
// 第一次尝试 append,如果批次队列中没有批次或者空间不够就返回 null
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 申请缓冲区,这里可以并发执行没必要加锁影响性能
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 第二次尝试,因为上面锁锁释放掉了所以别的线程有可能已经创建好了批次,这里再尝试 append 一次
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 又失败了,说明释放锁的时候没有别的线程创建批次,只能自己创建了,下面是准备工作
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
// 第三次 tryAppend,这次一定要成功了
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
// 新创建的批次加入到队列中
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
// 释放掉刚才申请的缓冲区,缓冲区后面会讲
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();}
}
正文完
发表至: hadoop
2019-09-25