关于kafka:4深潜KafkaProducer-RecordAccumulator精析

48次阅读

共计 27102 个字符,预计需要花费 68 分钟才能阅读完成。

通过上一课时的介绍咱们理解到,业务线程应用 KafkaProducer.send() 办法发送 message 的时候,会先将其写入 RecordAccumulator 中进行缓冲,当 RecordAccumulator 中缓存的 message 达到肯定阈值的时候,会由 IO 线程批量造成申请,发送到 kafka 集群。本课时咱们就重点来看一下 RecordAccumulator 这个缓冲区的构造。

首先,咱们从上图中能够看出,RecordAccumulator 会由业务线程写入、Sender 线程读取,这是一个非常明显的生产者 - 消费者模式,所以咱们须要保障 RecordAccumulator 是线程平安的。
RecordAccumulator 中保护了一个 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 类型的汇合,其中的 Key 是 TopicPartition 用来示意指标 partition,Value 是 ArrayDeque<ProducerBatch> 队列,用来缓冲发往指标 partition 的音讯。这里的 ArrayDeque 并不是线程平安的汇合,前面咱们会看到加锁的相干操作。

在每个 ProducerBatch 中都保护了一个 MemoryRecordsBuilder 对象,MemoryRecordsBuilder 才是真正存储 message 的中央。RecordAccumulator、ProducerBatch、MemoryRecordsBuilder 这三个外围类的关系如下图所示:

client 模块
message 格局
既然咱们筹备深刻 KafkaProducer 进行剖析,那咱们就须要理解 message 在 kafka 外部的格局,而不是简略晓得 message 是个 KV。kafka 目前的 message 的格局有三个版本:

V0:kafka0.10 版本之前
V1:kafka 0.10 ~ 0.11 版本
V2:kafka 0.11.0 之后的版本
V0 版本
在应用 V0 版本的 message 时,message 在 RecordAccumulator 中只是简略的沉积,并没有进行聚合,每个 message 都有独立的元信息,如下图所示:

其中惟一要阐明就是 attributes 局部,其中的低 3 位用来标识以后应用的压缩算法,高 5 位没有应用。

V1 版本
V1 版本 与 V0 版本的格局根本相似,就是多了一个 timestamp 字段,具体构造如下:

其中 attributes 局部的低 3 位仍旧用来标识以后应用的压缩算法,第 4 位用来标识工夫戳的类型。

在 V1 版本中引入 timestamp 次要是为了接口上面几个问题:

更精确的日志保留策略。在后面咱们曾经简略形容过了依据 message 存在工夫的保留策略,在应用 V0 版本的时候,kafka broker 会间接依据磁盘上的 segment 文件的最初批改工夫来判断是否执行删除操作,然而这种计划比拟大的弊病就是如果产生 replica 迁徙或是 replica 扩容,新减少 replica 中的 segment 文件就都是新创建的,其中蕴含的旧 message 就不会被删除。
更精确的日志切分策略。在后面咱们曾经提到过 segment 文件会定时、定量进行切分,在 V0 版本应用 segment 创立工夫进行切分的话,也会存在上述同样的问题,导致呈现单个大文件,也可能因为没有 message 写入,切分出很小的 segment 文件。
V1 版本中的压缩
对于常见压缩算法来说,压缩内容越多,压缩成果比例越高。然而单条 message 的长度个别都不会特地长,如果要让咱们来解决这个矛盾的话,就是将多条 message 放到一起再压缩。kafka 也的确是这么干的,在 V1 版本中,kafka 应用了 wrapper message 的形式来进步压缩效率。简略了解,wrapper message 也是一条 message,然而其中的 value 值则是多条一般 message 组成的 message 汇合,这些外部的一般 message 也称为 inner message。如下图所示:

为了进一步升高 message 的有效负载,kafka 只在 wrapper message 中记录残缺的 offset 值,inner message 中的 offset 只是绝对于 wrapper message offset 的一个偏移量,如下图所示:

当 wrapper message 发送到 kafka broker 之后,broker 无需进行解压缩,间接存储即可,当 consumer 拉取 message 的时候,也是一成不变的进行传递,真正的解压缩在 consumer 实现,这样就能够节俭 broker 解压缩和从新压缩的资源。

再谈 V1 版本中的工夫戳
V1 版本 message 中的 timestamp 类型由 attributes 中的第 4 位标识,有 CreateTime 和 LogAppendTime 两种类型:

CreateTime:timestamp 字段中记录的是 producer 生产这条 message 的工夫戳
LogAppendTime:timestamp 字段中记录的是 broker 将该 message 写入 segment 文件的工夫戳。
在 producer 生成 message 的时候,message 中的工夫戳是 CreateTime,wrapper message 中的 timestamp 是所有 inner message timestamp 的最大值。

当 message 传递到 broker 的时候,broker 会依照本身的 log.message.timestamp.type 配置(或 topic 的 message.timestamp.type 配置)(默认值为 CreateTime)批改 wrapper message 的工夫戳。如果 broker 应用的是 CreateTime,咱们还能够设置 max.message.time.difference.ms 参数,当 message 中的工夫戳与 broker 本地工夫之差大于该配置值时,broker 会回绝写入这条 message。

如果 broker 或是 topic 应用 LogAppendTime,那么会将 broker 本地工夫间接设置到 message 的 timestamp 字段中,并将 attributes 中的 timestamp type 位批改为 1。如果是压缩 message,只会批改 wrapper message 中的 timestamp 和 timestamp type,不会批改 inner message,这是为了防止解压缩和从新压缩。也就是说,broker 只关怀 wrapper message 的工夫戳,疏忽 inner message 的工夫戳。

当 message 被拉去到 consumer 的时候,consumer 只会依据 timestampe type 的值进行解决。如果 wrapper message 为 CreateTime,则 consumer 应用 inner message 的 timestamp 作为 CreateTime;如果 wrapper message 为 LogAppendTime,则 consumer 应用 wrapper message 作为所有 inner message 的 LogAppendTime,疏忽 inner message 的 timestamp 值。

最初,message 中的 timestamp 也是工夫戳索引的重要根底,这个咱们前面介绍 broker 的时候,具体介绍。

V2 版本
在 kafka 0.11 版本之后,开始应用 V2 版本的 message 格局,同时也兼容 V0、V1 版本的 message,当然,应用旧版本的 message 也就无奈应用 kafka 中的一些新个性。

V2 版本的 message 格局参考了 Protocol Buffer 的一些个性,引入了 Varints(变长整型)和 ZigZag 编码,其中,Varints 是应用一个或多个字节来序列化整数的一种办法,数值越小,占用的字节数就越少,说白了,还是为了缩小 message 的体积。ZigZag 编码是为了解决 Varints 对正数编码效率低的问题,ZigZag 会将有符号整数映射为无符号整数,从而进步 Varints 对绝对值较小的正数的编码效率,如下图所示:

理解了 V2 版本格局的实践根底之后,咱们来看 V2 中 message 的格局(也被称为 Record):

其中须要关注的是,所有标识长度的字段都是 varint(或 varlong),也就是变长字段,timestamp 和 offset 都是 delta 值,也就是偏移量。另外,就是 attribute 字段中的所有位都废除了,并增加 header 扩大。

除了根底的 Record 格局之外,V2 版本中还定义了一个 Record Batch 的构造,同学们能够比照 V1 版本格局,Record 是内层构造,Record Batch 是外层构造,如下图所示:

Record Batch 中蕴含的字段十分多,咱们一个个来看:

baseOffset:以后 RecordBatch 的起始位移,Record 中的 offset delta 与该 baseOffset 相加能力失去真正的 offset 值。当 RecordBatch 还在 producer 端的时候,offset 是 producer 调配的一个值,不是 partition 调配的,别搞混了。
batchLength:RecordBatch 的总长度。
partitionLeaderEpoch:用于标记指标 partition 中 leader replica 的纪元信息,前面介绍具体实现时会再次看到该值的相干实现。
magic:V2 版本中的魔数值 2。
crc 校验码:参加校验的局部是从属性值到 RecordBatch 开端的全副数据,partitionLeaderEpoch 不在 CRC 外面是因为每次 broker 收到 RecordBatch 的时候,都会赋值 partitionLeaderEpoch,如果蕴含在 CRC 外面会导致须要从新计算 CRC。这个实现前面会说。
attributes:从 V1 版本中的 8 位扩大到 16 位,0~2 位示意压缩类型,第 3 位示意工夫戳类型,第 4 位示意是否是事务型记录。所谓“事务”是 Kafka 的新性能,开启事务之后,只有在事务提交之后,事务型 consumer 才能够看到记录。5 示意是否是 Control Record,这类记录总是单条呈现,被蕴含在一个 control record batch 外面,它能够用于标记“事务是否曾经提交”、“事务是否曾经停止”等,它只会在 broker 内解决,不会被传输给 consumer 和 producer,即对客户端是通明的。
lastOffsetDelta:RecordBatch 最初一个 Record 的绝对位移,用于 broker 确认 RecordBatch 中 Records 的组装正确性。
firstTimestamp:RecordBatch 第一条 Record 的工夫戳。
maxTimestamp:RecordBatch 中最大的工夫戳,个别状况下是最初一条音讯的工夫戳,用于 broker 确认 RecordBatch 中 Records 的组装正确性。
producer id:生产者编号,用于反对幂等性(Exactly Once 语义),参考 KIP-98 – Exactly Once Delivery and Transactional Messaging。
producer epoch:生产者纪元,用于反对幂等性(Exactly Once 语义)。
base sequence:根底序号,用于反对幂等性(Exactly Once 语义),用于校验是否是反复 Record。
records count:Record 的数量。
通过剖析 V2 版本的音讯格局咱们晓得,kafka message 不仅提供了相似事务、幂等等新性能,还对空间占用提供了足够的优化,总体晋升很大

MemoryRecordsBuilder
理解了 kafka message 格局的演变之后,咱们回到 KafkaProducer 的代码。

每个 MemoryRecordsBuilder 底层依赖一个 ByteBuffer 实现 message 的存储,咱们前面会深刻介绍 KafkaProducer 对 ByteBuffer 的治理。在 MemoryRecordsBuilder 中会将 ByteBuffer 封装成 ByteBufferOutputStream,ByteBufferOutputStream 实现了 OutputStream,这样咱们就能够依照流的形式写入数据了。同时,ByteBufferOutputStream 提供了主动扩容底层 ByteBuffer 的能力。

还有一个须要关注的是 compressionType 字段,它用来指定以后 MemoryRecordsBuilder 应用哪种压缩算法来压缩 ByteBuffer 中的数据,kafka 目前已反对的压缩算法有:GZIP、SNAPPY、LZ4、ZSTD 四种,留神:只有 kafka V2 版本协定反对 ZSTD 压缩算法。

1
2
3
4
5
6
7
8
9
10
11
public MemoryRecordsBuilder(ByteBuffer buffer,…) {// 省略其余参数

// 将 MemoryRecordsBuilder 关联的 ByteBuffer 封装成 ByteBufferOutputStream 流
this(new ByteBufferOutputStream(buffer), ...);

}

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,…) {// 省略其余参数

// 省略其余字段的初始化
this.bufferStream = bufferStream;
// 在 bufferStream 流外层套一层压缩流,而后再套一层 DataOutputStream 流
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));

}
这样,咱们失去的 appendStream 就如下图所示:

理解了 MemoryRecordsBuilder 底层的存储形式之后,咱们来看 MemoryRecordsBuilder 的外围办法。首先是 appendWithOffset() 办法,逻辑并不简单,须要明确的是 ProducerBatch 对标的 V2 中的 RecordBatch,咱们写入的数据对标 V2 中的 Record:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {

return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);

}

private long nextSequentialOffset() {
// 这里的 baseOffset 是 RecordBatch 中的 baseOffset,lastOffset 用来记录以后写入 Record 的 offset 值,前面会用 lastOffset-baseOffset 计算 offset delta。每次有新 Record 写入的时候,都会递增 lastOffset
return lastOffset == null ? baseOffset : lastOffset + 1;
}

private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {

if (isControlRecord != this.isControlBatch) // 查看 isControl 标记是否统一
    throw new IllegalArgumentException("...");

if (lastOffset != null && offset <= this.lastOffset) // 保障 offset 递增
    throw new IllegalArgumentException("...");

if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) // 查看工夫戳
    throw new IllegalArgumentException("...");

// 查看:只有 V2 版本 message 能力有 header
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
    throw new IllegalArgumentException("...");

if (this.firstTimestamp == null) // 更新 firstTimestamp
    this.firstTimestamp = timestamp;

if (magic > RecordBatch.MAGIC_VALUE_V1) { // 针对 V2 的写入
    appendDefaultRecord(offset, timestamp, key, value, headers);
    return null;
} else { // 针对 V0、V1 的写入
    return appendLegacyRecord(offset, timestamp, key, value, magic);
}

}
appendDefaultRecord() 办法中会计算 Record 中的 offsetDelta、timestampDelta,而后实现 Record 写入,最初更新 RecordBatch 的元数据,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,

                                Header[] headers) throws IOException {ensureOpenForRecordAppend(); // 查看 appendStream 状态
// 计算 offsetDelta
int offsetDelta = (int) (offset - this.baseOffset);
// 计算 timestampDelta
long timestampDelta = timestamp - this.firstTimestamp; 
// 这里应用的 DefaultRecord 是一个工具类,其 writeTo()办法会依照 V2 Record 的格局向 appendStream 流中
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// 批改 RecordBatch 中的元信息,例如,Record 数量(numRecords)recordWritten(offset, timestamp, sizeInBytes);

}
MemoryRecordsBuilder 中另一个须要关注的办法是 hasRoomFor() 办法,它次要用来预计以后 MemoryRecordsBuilder 是否还有空间来包容此次写入的 Record,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {

// 查看两个状态,一个是 appendStream 流状态,另一个是以后曾经写入的预估字节数是否以及超过了 writeLimit 字段指定的写入下限,这里的 writeLimit 字段用来记录 MemoryRecordsBuilder 可能写入的字节数上限值
if (isFull()) 
    return false;

// 每个 RecordBatch 至多能够写入一个 Record,此时要是一个 Record 都没有,则能够持续写入
if (numRecords == 0)
    return true;

final int recordSize;
if (magic < RecordBatch.MAGIC_VALUE_V2) { // V0、V1 版本的判断
    recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else { 
    // 估算此次写入的 Record 大小
    int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
    long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
    recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}
// 已写入字节数 + 此次写入 Record 的字节数不能超过 writeLimit
return this.writeLimit >= estimatedBytesWritten() + recordSize;

}
ProducerBatch
接下来咱们向上走一层,来看 ProducerBatch 的实现,其中最外围办法是 tryAppend() 办法,外围步骤如下:

通过 MemoryRecordsBuilder 的 hasRoomFor() 办法查看以后 ProducerBatch 是否还有足够的空间来存储此次待写入的 Record。
调用 MemoryRecordsBuilder.append() 办法将 Record 追加到底层的 ByteBuffer 中。
创立 FutureRecordMetadata 对象,FutureRecordMetadata 继承了 Future 接口,对应此次 Record 的发送。
将 FutureRecordMetadata 对象以及 Record 关联的 Callback 回调封装成 Thunk 对象,记录到 thunks(List 类型)中。
上面是 ProducerBatch.tryAppend() 办法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {

// 查看 MemoryRecordsBuilder 是否还有空间持续写入
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {return null; // 没有空间写入的话,则返回 null} else {// 调用 append()办法写入 Record
    Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
    // 更新 maxRecordSize 和 lastAppendTime
    this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
            recordsBuilder.compressionType(), key, value, headers));
    this.lastAppendTime = now;
    // 创立 FutureRecordMetadata 对象
    FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture,
                                  this.recordCount,timestamp, checksum, 
                                  key == null ? -1 : key.length, 
                                  value == null ? -1 : value.length, Time.SYSTEM);
    // 将 Callback 和 FutureRecordMetadata 记录到 thunks 中
    thunks.add(new Thunk(callback, future));
    this.recordCount++; // 更新 recordCount 字段
    return future; // 返回 FutureRecordMetadata
}

}
除了 MemoryRecordsBuilder 之外,ProducerBatch 中还记录了很多其余要害信息:

这里咱们先来关注 ProduceRequestResult 这个类,其中保护了一个 CountDownLatch 对象(count 值为 1),实现了相似于 Future 的性能。当 ProducerBatch 造成的申请被 broker 端响应(失常响应、超时、异样响应)或是 KafkaProducer 敞开的时候,都会调用 ProduceRequestResult.done() 办法,该办法就会调用 CountDownLatch 对象的 countDown() 办法唤醒阻塞在 CountDownLatch 对象的 await() 办法的线程。这些线程后续能够通过 ProduceRequestResult 的 error 字段来判断此次申请是胜利还是失败。

在 ProduceRequestResult 中还有一个 baseOffset 字段,用来记录 broker 端为关联 ProducerBatch 中第一条 Record 调配的 offset 值,这样,每个 Record 的实在 offset 就能够依据本身在 ProducerBatch 的地位计算出来了(Record 的实在 offset = ProduceRequestResult.baseOffset + relativeOffset)。

接下来看 FutureRecordMetadata,它实现了 JDK 中的 Future 接口,示意一个 Record 的状态。FutureRecordMetadata 中除了保护一个关联的 ProduceRequestResult 对象之外,还保护了一个 relativeOffset 字段,relativeOffset 用来记录对应 Record 在 ProducerBatch 中的偏移量。

在 FutureRecordMetadata 中,有两个值得注意的办法,一个是 get() 办法,其中会依赖 ProduceRequestResult 中的 CountDown 来实现阻塞期待,并调用 value() 办法返回 RecordMetadata 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

// 计算超时工夫
long now = time.milliseconds();
long timeoutMillis = unit.toMillis(timeout);
long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis;
// 依赖 ProduceRequestResult 中的 CountDown 来实现阻塞期待
boolean occurred = this.result.await(timeout, unit);
if (!occurred)
    throw new TimeoutException("Timeout after waiting for" + timeoutMillis + "ms.");
if (nextRecordMetadata != null) // 能够先疏忽 nextRecordMetadata,前面介绍 split 的时候,再深刻介绍
    return nextRecordMetadata.get(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);
// 调用 value()办法返回 RecordMetadata 对象
return valueOrError();

}
另一个是 value() 办法,其中会将 partition 信息、baseOffset、relativeOffset、工夫戳(LogAppendTime 或 CreateTime)等信息封装成 RecordMetadata 对象返回:

1
2
3
4
5
6
7
8
RecordMetadata value() {

if (nextRecordMetadata != null) // 先疏忽 nextRecordMetadata
    return nextRecordMetadata.value();
// 将 partition 信息、baseOffset、relativeOffset、工夫戳(LogAppendTime 或 CreateTime)等信息封装成 RecordMetadata 对象返回
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), 
                          this.relativeOffset, timestamp(), this.checksum, 
                          this.serializedKeySize, this.serializedValueSize);

}
最初来看 ProducerBatch 中的 thunks 汇合,其中的每个 Thunk 对象对应一个 Record 对象,在 Thunk 对象中记录了对应 Record 关联的 Callback 对象以及关联的 FutureRecordMetadata 对象。

理解了 ProducerBatch 写入数据的相干内容之后,咱们回到 ProducerBatch 来关注其 done() 办法。当 KafkaProducer 收到 ProducerBatch 对应的失常响应、或超时、或敞开生产者时,都会调用 ProducerBatch 的 done()办法。在 done() 办法中,ProducerBatch 首先会更新 finalState 状态,而后调用 completeFutureAndFireCallbacks() 办法触发各个 Record 的 Callback 回调,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {

// 依据 exception 字段决定此次 ProducerBatch 发送的最终状态
final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
// CAS 操作更新 finalState 状态,只有第一次更新的时候,才会触发 completeFutureAndFireCallbacks()办法
if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
    return true;
}
// done()办法可能被调用一次或是两次,如果是 SUCCEED 状态切换成其余的失败状态,间接会抛出异样
if (this.finalState.get() != FinalState.SUCCEEDED) {// 省略日志输入代码} else {throw new IllegalStateException("...");
}
return false;

}
在 completeFutureAndFireCallbacks() 办法中,会遍历 thunks 汇合触发每个 Record 的 Callback,更新 ProduceRequestResult 中的 baseOffset、logAppendTime、error 字段,并调用其 done() 办法开释阻塞在其上的线程,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {

// 更新 ProduceRequestResult 中的 baseOffset、logAppendTime、error 字段
produceFuture.set(baseOffset, logAppendTime, exception);
// 遍历 thunks 汇合,触发每个 Record 的 Callback 回调
for (Thunk thunk : thunks) { // 省略 try/catch 代码块
    if (exception == null) {RecordMetadata metadata = thunk.future.value();
        if (thunk.callback != null)
            thunk.callback.onCompletion(metadata, null);
    } else {if (thunk.callback != null)
            thunk.callback.onCompletion(null, exception);
    }
}
// 调用底层 CountDownLatch.countDown()办法,开释阻塞在其上的线程
produceFuture.done();

}
BufferPool
后面提到,MemoryRecordsBuilder 底层应用 ByteBuffer 来存储写入的 Record 数据,然而创立 ByteBuffer 对象自身是一种比拟耗费资源的行为,所以 KafkaProducer 应用 BufferPool 来实现 ByteBuffer 的对立治理。BufferPool 说白了就是一个 ByteBuffer 的资源池,当须要 ByteBuffer 的时候,咱们就从其中获取,当应用实现之后,就将 ByteBuffer 偿还到 BufferPool 中。

BufferPool 是一个比较简单的资源池实现,它只会针对特定大小(poolableSize 字段)的 ByteBuffer 进行治理,对于其余大小的 ByteBuffer 抉择熟视无睹(Netty 外面 Buffer 池更加简单,之后介绍 Netty 源码的时候会具体说)。

个别状况下,咱们会调整 ProducerBatch 的大小(batch.size 配置(指定 Record 个数)* 单个 Record 的预估大小),使每个 ProducerBatch 能够缓存多条 Record。但当呈现一条 Record 的字节数大于整个 ProducerBatch 的意外状况时,就不会尝试从 BufferPool 申请 ByteBuffer,而是间接新调配 ByteBuffer 对象,待其被应用完后间接抛弃由 GC 回收。

上面来看一下 BufferPool 的外围字段:

BufferPool 调配 ByteBuffer 的外围逻辑位于 allocate() 办法中,逻辑并不简单,间接上代码和正文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {

if (size > this.totalMemory) // 首先查看指标 ByteBuffer 的大小是否大于了
    throw new IllegalArgumentException("...");

ByteBuffer buffer = null;
this.lock.lock(); // 加锁

// 查看以后 BufferPool 的状态,如果以后 BufferPool 处于敞开状态,则间接抛出异样(略)

try {
    // 指标大小恰好为 poolableSize,且 free 列表为空的话,间接复用 free 列表中的 ByteBuffer
    if (size == poolableSize && !this.free.isEmpty())
        return this.free.pollFirst();

    // 计算 free 列表中的 ByteBuffer 总空间
    int freeListSize = freeSize() * this.poolableSize;
    // 以后 BufferPool 可能开释出指标大小的空间,则间接通过 freeUp()办法进行调配
    if (this.nonPooledAvailableMemory + freeListSize >= size) {freeUp(size);
        this.nonPooledAvailableMemory -= size;
    } else {
        int accumulated = 0;
        // 如果以后 BufferPool 空间不足以提供指标空间,则须要阻塞以后线程
        Condition moreMemory = this.lock.newCondition();
        try {
            // 计算以后线程最大阻塞时长
            long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
            this.waiters.addLast(moreMemory);
            while (accumulated < size) { // 循环期待调配胜利
                long startWaitNs = time.nanoseconds();
                long timeNs;
                boolean waitingTimeElapsed;
                try {
                    // 以后线程阻塞期待,返回后果为 false 则示意阻塞超时
                    waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                } finally {long endWaitNs = time.nanoseconds();
                    timeNs = Math.max(0L, endWaitNs - startWaitNs);
                }
                // 查看以后 BufferPool 的状态,如果以后 BufferPool 处于敞开状态,则间接抛出异样(略)
                if (waitingTimeElapsed) { 
                    // 指定时长内仍旧没有获取到指标大小的空间,则抛出异样
                    throw new BufferExhaustedException("...");
                }

                remainingTimeToBlockNs -= timeNs;
                // 指标大小是 poolableSize 大小的 ByteBuffer,且 free 中呈现了闲暇的 ByteBuffer
                if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {buffer = this.free.pollFirst();
                    accumulated = size;
                } else {
                    // 先调配一部分空间,并持续期待闲暇空间
                    freeUp(size - accumulated);
                    int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                    this.nonPooledAvailableMemory -= got;
                    accumulated += got;
                }
            }
            accumulated = 0;
        } finally {
            // 如果下面的 while 循环不是失常完结的,则 accumulated 不为 0,这里会偿还
            this.nonPooledAvailableMemory += accumulated;
            this.waiters.remove(moreMemory);
        }
    }
} finally {
    // 以后 BufferPool 要是还存在闲暇空间,则唤醒下一个期待线程来尝试调配 ByteBuffer
    try {if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
            this.waiters.peekFirst().signal();
    } finally {lock.unlock(); // 解锁
    }
}

if (buffer == null) 
    // 调配胜利但无奈复用 free 列表中的 ByteBuffer(可能指标大小不是 poolableSize 大小,或是 free 列表自身是空的)
    return safeAllocateByteBuffer(size);
else
    return buffer; // 间接复用 free 大小的 ByteBuffer

}

// 这里简略看一下 freeUp()办法,这里会一直从 free 列表中开释闲暇的 ByteBuffer 来补充 nonPooledAvailableMemory
private void freeUp(int size) {

while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();

}
从 BufferPool 中调配进去的 ByteBuffer 在应用之后,会调用 deallocate() 办法来开释,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void deallocate(ByteBuffer buffer, int size) {

lock.lock(); // 加锁
try {
    // 如果待开释的 ByteBuffer 大小为 poolableSize,则间接放入 free 列表中
    if (size == this.poolableSize && size == buffer.capacity()) {buffer.clear();
        this.free.add(buffer);
    } else {
        // 如果不是 poolableSize 大小,则由 JVM GC 来回收 ByteBuffer 并减少 nonPooledAvailableMemory
        this.nonPooledAvailableMemory += size;
    }
    // 唤醒 waiters 中的第一个阻塞线程
    Condition moreMem = this.waiters.peekFirst();
    if (moreMem != null)
        moreMem.signal();} finally {lock.unlock(); // 开释锁
}

}
RecordAccumulator
剖析完 MemoryRecordsBuilder、ProducerBatch 以及 BufferPool 与写入相干的办法之后,咱们再来看 RecordAccumulator 的实现。

剖析一个类的时候,还是要先看其数据结构,而后再来看其行为(办法)。RecordAccumulator 中的关键字段如下:

在后面剖析 KafkaProducer.doSend() 办法发送 message 的时候,间接调用了 RecordsAccumulator.append() 办法,这也是调用 ProducerBatch.tryAppend() 办法将音讯追加到底层 MemoryRecordsBuilder 的中央。上面咱们就来看 RecordAccumulator.append() 办法的外围逻辑:

在 batches 汇合中查找指标 partition 对应的 ArrayDeque 汇合,如果查找失败,则创立新的 ArrayDeque,并增加到 batches 汇合中。
对步骤 1 中拿到的 ArrayDeque 汇合进行加锁。这里应用 synchronized 代码块进行加锁。
执行 tryAppend() 办法,尝试向 ArrayDeque 中的最初一个 ProducerBatch 写入 Record。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs) {

// 获取 ArrayDeque<ProducerBatch> 汇合中最初一个 ProducerBatch 对象
ProducerBatch last = deque.peekLast();
if (last != null) {
    // 尝试将音讯写入 last 这个 ProducerBatch 对象
    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
    if (future == null) 
        // 写入失败,则敞开 last 指向的 ProducerBatch 对象,同时返回 null 示意写入失败
        last.closeForRecordAppends();
    else
        // 写入胜利,则返回 RecordAppendResult 对象
        return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;

}
synchronized 块执行完结,主动开释锁。
如果步骤 3 中的追加操作胜利,则返回 RecordAppendResult。
如果步骤 3 中的追加 Record 失败,则可能是因为以后应用的 ProducerBatch 曾经被填满了。这里会判断 abortOnNewBatch 参数是否为 true,如果是的话,会立刻返回 RecordAppendResult 后果(其中的 abortForNewBatch 字段设置为 true),返回的 RecordAppendResult 中如果 abortForNewBatch 为 true,会再触发一次 RecordAccumulator.append()办法。
如果 abortForNewBatch 参数不为 true,则会开始从 BufferPool 中调配新的 ByteBuffer,并封装成新的 ProducerBatch 对象。
再次对 ArrayDeque 加锁,并尝试将 Record 追加到新建的 ProducerBatch 中,同时将新建的 ProducerBatch 追加到对应的 Deque 尾部。
将新建的 ProducerBatch 增加到 incomplete 汇合中。synchronized 块完结,主动解锁。
返回 RecordAppendResult,RecordAppendResult 会中 batchIsFull 字段和 newBatchCreated 字段会作为唤醒 Sender 线程的条件。KafkaProducer.doSend() 办法中唤醒 Sender 线程的代码片段如下:
1
2
3
4
if (result.batchIsFull || result.newBatchCreated) {

// 当此次写入填满一个 ProducerBatch 或是有新 ProducerBatch 创立的时候,会唤醒 Sender 线程来进行发送
this.sender.wakeup();

}
上面来看 RecordAccumulator.append() 办法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public RecordAppendResult append(TopicPartition tp, long timestamp,

    byte[] key, byte[] value, Header[] headers, Callback callback,
    long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {
// 统计正在向 RecordAccumulator 中写入的 Record 数量
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
    // 在 batches 汇合中查找指标 partition 对应的 ArrayDeque<ProducerBatch> 汇合,// 如果查找失败,则创立新的 ArrayDeque<ProducerBatch>,并增加到 batches 汇合中。Deque<ProducerBatch> dq = getOrCreateDeque(tp);
    synchronized (dq) { // 对 ArrayDeque<ProducerBatch> 加锁
        // 如果追加操作胜利,则返回 RecordAppendResult
        RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
        if (appendResult != null) // 追加胜利,则返回的 appendResult 不为 null
            return appendResult;
    }

    // 如果追加 Record 失败,则可能是因为以后应用的 ProducerBatch 曾经被填满了,这里会依据 abortOnNewBatch 参数,// 决定是否立刻返回 RecordAppendResult 后果,返回的 RecordAppendResult 中如果 abortForNewBatch 为 true,// 会再触发一次 append()办法
    if (abortOnNewBatch) {return new RecordAppendResult(null, false, false, true);
    }

    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
    // 从 BufferPool 中调配 ByteBuffer
    buffer = free.allocate(size, maxTimeToBlock);

    nowMs = time.milliseconds();
    synchronized (dq) { // 再次对 ArrayDeque<ProducerBatch> 加锁
        if (closed)
            throw new KafkaException("Producer closed while send in progress");
        // 再次尝试 tryAppend()办法追加 Record
        RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
        if (appendResult != null) {return appendResult;}
        // 将 ByteBuffer 封装成 MemoryRecordsBuilder
        MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
        // 创立 ProducerBatch 对象
        ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
        // 通过 tryAppend()办法将 Record 追加到 ProducerBatch 中
        FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                callback, nowMs));
        // 将 ProducerBatch 增加到 ArrayDeque<ProducerBatch> 中
        dq.addLast(batch);
        // 将 ProducerBatch 增加到 IncompleteBatches 中
        incomplete.add(batch);

        buffer = null; // 置空 buffer
        return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
    }
} finally {if (buffer != null) // 如果 buffer 不为空,则示意写入过程中出现异常,这里会开释 ByteBuffer
        free.deallocate(buffer);
    // 以后 Record 曾经写入实现,递加 appendsInProgress
    appendsInProgress.decrementAndGet();}

}
这里咱们清晰的看到,对 ArrayDeque 进行加锁的代码,ArrayDeque 自身是非线程平安的汇合,加锁解决能够了解,然而为什么分两次加锁呢?这次要是因为在向 BufferPool 申请新 ByteBuffer 的时候,可能会导致阻塞。咱们假如在一个 synchronized 块中实现下面所有追加操作,线程 A 发送的 Record 比拟大,须要向 BufferPool 申请新空间,而此时 BufferPool 空间有余,线程 A 就会阻塞在 BufferPool 上期待,此时它仍然持有对应 ArrayDeque 的锁;线程 B 发送的 Record 较小,而此时的 ArrayDeque 最初一个 ProducerBatch 残余空间正好足够写入该 Record,然而因为线程 A 未开释 Deque 的锁,所以也须要一起期待,这就造成线程 B 不必要阻塞,升高了吞吐量。这里实质其实是通过缩小锁的持有工夫来进行的优化。

除了两次 ArrayDeque 加锁操作,咱们还看到第二次加锁后重试,这次要是为了避免多个线程并发从 BufferPool 申请空间后,造成外部碎片。这种场景如下图所示,线程 A 发现最初一个 ProducerBatch 空间有余,申请空间并创立一个新 ProducerBatch 对象增加到 ArrayDeque 的尾部,而后线程 B 与线程 A 并发执行,也将新创建一个 ProducerBatch 增加到 ArrayDeque 尾部。从下面 tryAppend() 办法的逻辑中咱们能够看到,后续的写入只会在 ArrayDeque 尾部的 ProducerBatch 上进行,这样就会导致下图中的 ProducerBatch3 不再被写入,从而呈现外部碎片:

理解了 RecordAccumulator 对 Record 写入的反对之后,咱们再来看 RecordAccumulator.ready()办法,它是 Sender 线程发送 Record 到 kafka broker 之前被调用的,该办法会依据集群元数据,获取可能接管待发送 Record 的节点汇合,具体筛选条件如下:

batchs 汇合中的 ArrayDeque 中有多个 RecordBatch 或是第一个 RecordBatch 是否满了。
等待时间是否足够长。这次要是两个方面,如果有重试的话,须要超过 retryBackoffMs 的退却时长;如果没有重试的话,须要超过 linger.ms 配置指定的期待时长(linger.ms 默认是 0)。
是否有其余线程在期待 BufferPool 开释空间。
是否有线程调用了 flush() 办法,正在期待 flush 操作实现。
上面来看是 ready 办法的代码,它会遍历 batches 汇合中每个分区,首先查找指标 partition 的 leader replica 所在的 Node,只有晓得 Node 信息,KafkaProducer 才晓得往哪里发。而后针对每个 ArrayDeque 进行解决,如果满足上述四个条件,则将对应的 Node 信息记录到 readyNodes 汇合中。最初,ready() 办法返回的是 ReadyCheckResult 对象,其中记录了满足发送条件的 Node 汇合、在遍历过程中找不到 leader replica 的 topic 以及下次调用 ready() 办法进行查看的工夫距离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ReadyCheckResult ready(Cluster cluster, long nowMs) {

// 用来记录能够向哪些 Node 节点发送数据
Set<Node> readyNodes = new HashSet<>();  
// 记录下次须要调用 ready()办法的工夫距离
long nextReadyCheckDelayMs = Long.MAX_VALUE;
// 记录 Cluster 元数据中找不到 leader replica 的 topic
Set<String> unknownLeaderTopics = new HashSet<>();
// 是否有线程在阻塞期待 BufferPool 开释空间
boolean exhausted = this.free.queued() > 0;

// 上面遍历 batches 汇合,对其中每个 partition 的 leader replica 所在的 Node 都进行判断
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {Deque<ProducerBatch> deque = entry.getValue();
    synchronized (deque) {ProducerBatch batch = deque.peekFirst();
        if (batch != null) {TopicPartition part = entry.getKey();
            // 查找指标 partition 的 leader replica 所在的节点
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                // leader replica 找不到,会认为是异常情况,不能发送音讯
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !isMuted(part)) {boolean full = deque.size() > 1 || batch.isFull();
                long waitedTimeMs = batch.waitedTimeMs(nowMs);
                boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                boolean expired = waitedTimeMs >= timeToWaitMs;
                // 查看上述五个条件,找到此次发送波及到的 Node
                boolean sendable = full || expired || exhausted || closed || flushInProgress();
                if (sendable && !backingOff) {readyNodes.add(leader); 
                } else {long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    // 记录下次须要调用 ready()办法查看的工夫距离
                    nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                }
            }
        }
    }
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);

}
调用 RecordAccumulator.ready() 办法失去的 readyNodes 汇合后,此汇合还要通过 NetworkClient 的过滤(在介绍 Sender 线程的时候再具体介绍)之后,能力失去最终可能发送音讯的 Node 汇合。

之后,Sender 线程会调用 RecordAccumulator.drain() 办法会根据上述 Node 汇合获取要发送的 ProducerBatch,返回 Map<Integer, List> 汇合,其中的 Key 是指标 Node 的 Id,Value 是此次待发送的 ProducerBatch 汇合。在调用 KafkaProducer 的下层业务逻辑中,则是依照 TopicPartition 的形式产生数据,它只关怀发送到哪个 TopicPartition,并不关怀这些 TopicPartition 在哪个 Node 节点上。在网络 IO 层面,生产者是面向 Node 节点发送音讯数据,它只建设到 Node 的连贯并发送数据,并不关怀这些数据属于哪个 TopicPartition。drain()办法的外围性能是将 TopicPartition -> ProducerBatch 汇合的映射关系,转换成了 Node -> ProducerBatch 汇合的映射。上面是 drain() 办法的外围代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {

if (nodes.isEmpty())
    return Collections.emptyMap();
// 转换后的后果,Key 是指标 Node 的 Id,Value 是发送到指标 Node 的 ProducerBatch 汇合
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
    // 获取指标 Node 的 ProducerBatch 汇合
    List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
    batches.put(node.id(), ready);
}
return batches;

}

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {

int size = 0;
// 获取以后 Node 上的 partition 汇合
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
// 记录发往指标 Node 的 ProducerBatch 汇合
List<ProducerBatch> ready = new ArrayList<>();
// drainIndex 是 batches 的下标,记录上次发送进行时的地位,下次持续从此地位开始发送。如果始终从     
// 索引 0 的队列开始发送,可能会呈现始终只发送前几个 partition 的音讯的状况,造成其余 partition 饥饿。int start = drainIndex = drainIndex % parts.size();
do {
    // 获取 partition 的元数据
    PartitionInfo part = parts.get(drainIndex);
    TopicPartition tp = new TopicPartition(part.topic(), part.partition());
    this.drainIndex = (this.drainIndex + 1) % parts.size();
    // 查看指标 partition 对应的 ArrayDeque 是否为空(略)
    synchronized (deque) {
        // 获取 ArrayDeque 中第一个 ProducerBatch 对象
        ProducerBatch first = deque.peekFirst();
        if (first == null)
            continue;
        // 重试操作的话,须要查看是否曾经期待了足够的退却工夫(略)
        if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
            // 此次申请要发送的数据量已满,完结循环
            break;
        } else {if (shouldStopDrainBatchesForPartition(first, tp))
                break;
            // 获取 ArrayDeque 中第一个 ProducerBatch
            ProducerBatch batch = deque.pollFirst();
            // 事务相干的解决(略)
            // 敞开底层输入流,将 ProducerBatch 设置成只读状态
            batch.close();
            size += batch.records().sizeInBytes();
            // 将 ProducerBatch 记录到 ready 汇合中
            ready.add(batch);
            // 批改 ProducerBatch 的 drainedMs 标记
            batch.drained(now);
        }
    }
} while (start != drainIndex);
return ready;

}
总结
本课时首先介绍了 kafka 中 message 格局的演变,详细分析了 V0、V1、V2 三个版本 message 的格局变迁。

而后介绍了 KafkaProducer 中 RecordAccumulator 相干的核心内容,它是业务线程和 Sender 线程之间数据的中转站,次要波及到了 MemoryRecordsBuilder、ProducerBatch、BufferPool 等底层组件,以及 RecordAccumulator 的外围办法。

下一课时,咱们将开始介绍 KafkaProducer 中 Sender 线程相干的内容。

本课时的文章和视频解说,还会放到:

微信公众号:

B 站搜寻:杨四正

正文完
 0