上一节咱们次要剖析了 RecordAccumulator 通过 BufferPool 申请内存的源码原理,在之前的剖析中,在 KafkaProducer 发送音讯时,把音讯放入内存缓冲区中次要分为了三步。如下:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
try {
// check if we have an in-progress batch
// 1、创立寄存音讯的内存后果 实质是一个 map 汇合,外部次要是双端队列 (已剖析)
Deque<RecordBatch> dq = getOrCreateDeque(tp);
// 这段 tryAppend 代码发第一条音讯的时候不会执行,临时没有剖析
synchronized (dq) {if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
//2、BufferPool 申请内存块逻辑 (已剖析)
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
//3、将音讯封装到内存块中,放入之前筹备的内存构造(待剖析)synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
} finally {appendsInProgress.decrementAndGet();
这一节咱们持续来剖析将音讯放入缓冲区的最初一步—tryAppend 的逻辑,让咱们一起来看下吧!
tryAppend 的外围脉络
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
1)执行了一段 tryAppend() 如果返回 appendResult 非空,则进行了一个开释 BufferPool 内存块的逻辑。否则什么都不做,这个之前第一步 getOrCreateDeque 前面的逻辑很像,应该是同样的逻辑。
2)创立了 MemoryRecords 和 RecordBatch 这两个要害的对象,应该是通过这两个对象,把 Buffer 这个内存块和音讯一起进行了封装
3)之后执行了 Recordbatch.tryAppend 办法,又一个 tryAppend,还是异步的一个 Feature。应该也是某种追加逻辑。
4)最初就是将 RecordBatch 这个 batch 对象放入了 2 个内存构造了,dp 就是之前的双端队列,incomplete 是一个 RecordBatch 的 Set 汇合,从名字上看是正在解决中或者未实现发送的 RecordBatch 的意思
两段 tryAppend()到底在干什么?
在下面外围脉络中,有两段 TryAppend 逻辑。首先咱们来看下 tryAppend()在做什么。
synchronized (dq) {RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {RecordBatch last = deque.peekLast();
if (last != null) {FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
return null;
// RecordBatch.tryAppend()
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {if (!this.records.hasRoomFor(key, value)) {return null;} else {long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
return future;
//MemoryRecords.java append 办法
public long append(long offset, long timestamp, byte[] key, byte[] value) {if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = Record.recordSize(key, value);
long crc = compressor.putRecord(timestamp, key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
return crc;
这段逻辑其实比拟不好了解。外部调用了两个不同类的 tryAppend()。我起初看这块的时候,都没了解它在干什么。
为什么这里加了 synchronized (dq)?
从这个队列中 deque.peekLast()是在干吗?
为啥又执行了 last.tryAppend?
反正 有各种问题,各种不了解。咱们看源码常常会遇到这种状况。这个时候怎么办呢?
尝试各种办法,比方看看正文、比方能不能 debug、比方能不能举例子、比方画图、比方能不能搜搜帖子看看有人钻研过这块逻辑么 。 或者你可能就是本人非要死磕硬钢,一直反复浏览逻辑,聚焦这里,认真思考、钻研这这块逻辑外围目标是什么 …..
场景是这样的,Producer 有一个线程 1,一直发送总大小为 3KB 的音讯 Record。间断发送了 6 条,咱们来看下它的流程是怎么样的呢?
首先线程 1 第 1 次发送音讯的话,依照下面的外围脉络,执行过程如下所示:
从图中能够清晰的看到线程 1 第 1 次发送音讯时候,创立了一个内存块 ByteBuffer,之后将音讯进行了封装解决,写入到了 RecordBatch,最初将 RecordBatch 放入队列中。(能够对应下面的代码来了解)
接这线程 1 持续发送音讯,发送第 2 - 5 条音讯,流程则会产生一些扭转,如下图所示:
在发送第 2 - 5 条音讯时,能够发现不会申请新的 ByteBuffer, 间接向现有的的 RecordBatch 写入音讯,通过 MemoryRecords 追加音讯到 ByteBuffer 的字节流中,最终 RecordBatch 的大小为 15KB。(能够对应下面的代码来了解)
接着线程 1 发送第 6 条音讯时,流程又会产生新的变动,整体如下图所示:
在发送第 6 条音讯时,又申请了一块内存块和创立了新的 RecordBatch。之后因为第一个 Batch 写满了,会将音讯写入第二个 Batch,并放入队列中。(能够对应下面的代码来了解)
通过下面的例子,你能够发现 KafkaProducer 整个发送音讯的过程是一个打包发送的过程,会将音讯一条一条打成一个 16KB 的 batch,放入队列,之后发送进来。
比方单条音讯如果超过 16KB 会怎么样?提醒:ByteBuffer 不会被复用,间接会生成一个大的 RecordBatch。
每个 Batch 为什么会生成一个异步 feature,放入 List<Thunk> thunks 列表中?提醒:可能是为了每条音讯绑定回调和超时管制,晋升性能和吞吐量。
synchronized 加锁的目标什么?提醒:必定是思考多线程发送音讯的场景的。
Record 音讯的进一步封装和序列化
剖析完两段 tryAppend()到底在干什么之后,其实整个 tryAppend 的外围脉络就剩下一段逻辑了。
如何创立了 MemoryRecords 和 RecordBatch 这两个要害的对象,并通过这两个对象,把 ByteBuffer 这个内存块和音讯一起进行了封装的呢?
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
this.writable = writable;
this.writeLimit = writeLimit;
this.initialCapacity = buffer.capacity();
if (this.writable) {
this.buffer = null;
this.compressor = new Compressor(buffer, type);
} else {
this.buffer = buffer;
this.compressor = null;
public Compressor(ByteBuffer buffer, CompressionType type) {
this.type = type;
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
this.compressionRate = 1;
this.maxTimestamp = Record.NO_TIMESTAMP;
if (type != CompressionType.NONE) {
// for compressed records, leave space for the header and the shallow message metadata
// and move the starting position to the value payload offset
buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
// create the stream
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
this.createdMs = now;
this.lastAttemptMs = now;
this.records = records;
this.topicPartition = tp;
this.produceFuture = new ProduceRequestResult();
this.thunks = new ArrayList<Thunk>();
this.lastAppendTime = createdMs;
this.retry = false;
其实能够看到,根本就是初始化 RecordBatch 和 MemoryRecords 外部数据结构,外围通过一个组件 Compressor 操作 ByteBuffer,将音讯音讯的 key 和 Value 之后写入到一个 outPutStream 流中。
至于具体如何写入的,其实是在之后 MemoryRecords.append()中
public long append(long offset, long timestamp, byte[] key, byte[] value) {if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = Record.recordSize(key, value);
long crc = compressor.putRecord(timestamp, key, value);
compressor.recordWritten(size + Records.LOG_OVERHEAD);
return crc;
public static int recordSize(int keySize, int valueSize) {return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;}
public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
int valueOffset, int valueSize) {
// put a record as un-compressed into the underlying stream
long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
byte attributes = Record.computeAttributes(type);
putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
return crc;
public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
// write crc
compressor.putInt((int) (crc & 0xffffffffL));
// write magic value
// write attributes
// write timestamp
// write the key
if (key == null) {compressor.putInt(-1);
} else {compressor.putInt(key.length);
compressor.put(key, 0, key.length);
// write the value
if (value == null) {compressor.putInt(-1);
} else {int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
compressor.put(value, valueOffset, size);
public void putByte(final byte value) {
try {appendStream.write(value);
} catch (IOException e) {throw new KafkaException("I/O exception when writing to the append stream, closing", e);
offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value
整个过程他标准里规定了,就是先是几个字节的 offset,而后是几个字节的 size,而后是几个字节的 crc,接着是几个字节的 magic,以此类推,他就是齐全依照标准来写入 ByteBuffer 里去的。
并且能够看到他最最底层的写入 ByteBuffer 的 IO 流的形式。
如果你仔细分析下,会发现个流能够进行压缩,ByteBufferOutputStream 给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的流的缓冲区,之后再写入 ByteBufferOutputStream。如果是非压缩的模式,最最一般的状况下,就是 DataOutputStream 包裹了 ByteBufferOutputSteram,而后写入数据,Long、Byte、String,都会在底层转换为字节进入到 ByteBuffer 里去。
其实这才是音讯最终的序列化,依据自定义的二进制协定写入流中发送进来。之前的序列化定义只是对 key 和 value 的定义而已,最终底层的序列化,会包装一些元数据。
这里 kafka 对音讯格局其实做过优化和改良,有趣味的同学能够去查阅下材料理解下。它为什么这么设计呢?解决了哪些问题,其实都是值得思考的中央。
好了,明天就到这里了。咱们次要剖析了 tryAppend 逻辑:
2)并且晓得了 batch 打包音讯的机制
这当中其实有很多值得思考的亮点,Kafka 这块的源码逻辑还是很值得大家多钻研几遍。
之后的咱们要剖析的逻辑其实就是如何内存队列中将打包好的音讯发送给 Broker 的。咱们下一节再见!
