关于springboot:Kakfa成长记8Producer如何将消息放入到内存缓冲区下

5次阅读

共计 10539 个字符,预计需要花费 27 分钟才能阅读完成。

上一节咱们次要剖析了 RecordAccumulator 通过 BufferPool 申请内存的源码原理,在之前的剖析中,在 KafkaProducer 发送音讯时,把音讯放入内存缓冲区中次要分为了三步。如下:

而且之前咱们次要剖析了前两步的代码,如下正文所示:

 public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        try {
            // check if we have an in-progress batch
            // 1、创立寄存音讯的内存后果 实质是一个 map 汇合,外部次要是双端队列 (已剖析)
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            
            // 这段 tryAppend 代码发第一条音讯的时候不会执行,临时没有剖析
            synchronized (dq) {if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            //2、BufferPool 申请内存块逻辑 (已剖析)
            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            
            //3、将音讯封装到内存块中,放入之前筹备的内存构造(待剖析)synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {appendsInProgress.decrementAndGet();
        }
    }

之前剖析的逻辑,能够概括如下图所示:

这一节咱们持续来剖析将音讯放入缓冲区的最初一步—tryAppend 的逻辑,让咱们一起来看下吧!

tryAppend 的外围脉络

首先咱们必定要初步摸一下第三步代码次要在做什么。它的逻辑次要如下:

 synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }

能够看到下面的代码次要脉络:

1)执行了一段 tryAppend() 如果返回 appendResult 非空,则进行了一个开释 BufferPool 内存块的逻辑。否则什么都不做,这个之前第一步 getOrCreateDeque 前面的逻辑很像,应该是同样的逻辑。

2)创立了 MemoryRecords 和 RecordBatch 这两个要害的对象,应该是通过这两个对象,把 Buffer 这个内存块和音讯一起进行了封装

3)之后执行了 Recordbatch.tryAppend 办法,又一个 tryAppend,还是异步的一个 Feature。应该也是某种追加逻辑。

4)最初就是将 RecordBatch 这个 batch 对象放入了 2 个内存构造了,dp 就是之前的双端队列,incomplete 是一个 RecordBatch 的 Set 汇合,从名字上看是正在解决中或者未实现发送的 RecordBatch 的意思

整个脉络补充到下面的图中,如下所示:

既然晓得了这个外围脉络,咱们一步一步来剖析下就好。

两段 tryAppend()到底在干什么?

在下面外围脉络中,有两段 TryAppend 逻辑。首先咱们来看下 tryAppend()在做什么。

//RecordAccumulator.java
synchronized (dq) {RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
    if (appendResult != null) {
        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
        free.deallocate(buffer);
        return appendResult;
    }  
    
    
}

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {RecordBatch last = deque.peekLast();
    if (last != null) {FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
        if (future == null)
            last.records.close();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
    }
    return null;
}

// RecordBatch.tryAppend()
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {if (!this.records.hasRoomFor(key, value)) {return null;} else {long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

//MemoryRecords.java append 办法
    public long append(long offset, long timestamp, byte[] key, byte[] value) {if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        int size = Record.recordSize(key, value);
        compressor.putLong(offset);
        compressor.putInt(size);
        long crc = compressor.putRecord(timestamp, key, value);
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        return crc;
    }

这段逻辑其实比拟不好了解。外部调用了两个不同类的 tryAppend()。我起初看这块的时候,都没了解它在干什么。

为什么这里加了 synchronized (dq)?

从这个队列中 deque.peekLast()是在干吗?

为啥又执行了 last.tryAppend?

……

反正 有各种问题,各种不了解。咱们看源码常常会遇到这种状况。这个时候怎么办呢?

其实就是一个方法:尝试。

尝试各种办法,比方看看正文、比方能不能 debug、比方能不能举例子、比方画图、比方能不能搜搜帖子看看有人钻研过这块逻辑么 或者你可能就是本人非要死磕硬钢,一直反复浏览逻辑,聚焦这里,认真思考、钻研这这块逻辑外围目标是什么 …..

总之,就这样一直反复尝试,钻研,不放弃,你就会缓缓了解艰涩的知识点。

就像有句老话说:”只有思维不滑坡,方法总比艰难多。“意思就是对技术有趣味,放弃心态好,不埋怨,不焦躁,慢慢来,一直钻研和尝试各种办法,总会解决困难的。其实这个就是许多人的差别所在,也正是每个人须要成长的中央。

而这块逻辑通过我一直钻研和思考后,发现逻辑并不简单。上面我举几个例子来帮忙大家了解这段逻辑。

场景是这样的,Producer 有一个线程 1,一直发送总大小为 3KB 的音讯 Record。间断发送了 6 条,咱们来看下它的流程是怎么样的呢?

首先线程 1 第 1 次发送音讯的话,依照下面的外围脉络,执行过程如下所示:

从图中能够清晰的看到线程 1 第 1 次发送音讯时候,创立了一个内存块 ByteBuffer,之后将音讯进行了封装解决,写入到了 RecordBatch,最初将 RecordBatch 放入队列中。(能够对应下面的代码来了解)

通过画图,咱们能够梳理思路,串起来整个流程整个办法在你困惑的时候十分有用。

接这线程 1 持续发送音讯,发送第 2 - 5 条音讯,流程则会产生一些扭转,如下图所示:

在发送第 2 - 5 条音讯时,能够发现不会申请新的 ByteBuffer, 间接向现有的的 RecordBatch 写入音讯,通过 MemoryRecords 追加音讯到 ByteBuffer 的字节流中,最终 RecordBatch 的大小为 15KB。(能够对应下面的代码来了解)

接着线程 1 发送第 6 条音讯时,流程又会产生新的变动,整体如下图所示:

在发送第 6 条音讯时,又申请了一块内存块和创立了新的 RecordBatch。之后因为第一个 Batch 写满了,会将音讯写入第二个 Batch,并放入队列中。(能够对应下面的代码来了解)

通过下面的例子,你能够发现 KafkaProducer 整个发送音讯的过程是一个打包发送的过程,会将音讯一条一条打成一个 16KB 的 batch,放入队列,之后发送进来。

这种微打包逻辑很多中间件和框架,甚至业务零碎中都能够借鉴,你能够发散下思维想想是不是?

其实这里还有很多思考点:

比方单条音讯如果超过 16KB 会怎么样?提醒:ByteBuffer 不会被复用,间接会生成一个大的 RecordBatch。

每个 Batch 为什么会生成一个异步 feature,放入 List<Thunk> thunks 列表中?提醒:可能是为了每条音讯绑定回调和超时管制,晋升性能和吞吐量。

synchronized 加锁的目标什么?提醒:必定是思考多线程发送音讯的场景的。

这些留给大家去思考和练习下,大家能够在评论区留下你的答复。或者在之后的我建设微信群后探讨下。

Record 音讯的进一步封装和序列化

剖析完两段 tryAppend()到底在干什么之后,其实整个 tryAppend 的外围脉络就剩下一段逻辑了。

如何创立了 MemoryRecords 和 RecordBatch 这两个要害的对象,并通过这两个对象,把 ByteBuffer 这个内存块和音讯一起进行了封装的呢?

让咱们来看下代码:

 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());

//MemoryRecords.java
private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
    this.writable = writable;
    this.writeLimit = writeLimit;
    this.initialCapacity = buffer.capacity();
    if (this.writable) {
        this.buffer = null;
        this.compressor = new Compressor(buffer, type);
    } else {
        this.buffer = buffer;
        this.compressor = null;
    }
}
//Compressor.java
public Compressor(ByteBuffer buffer, CompressionType type) {
    this.type = type;
    this.initPos = buffer.position();

    this.numRecords = 0;
    this.writtenUncompressed = 0;
    this.compressionRate = 1;
    this.maxTimestamp = Record.NO_TIMESTAMP;

    if (type != CompressionType.NONE) {
        // for compressed records, leave space for the header and the shallow message metadata
        // and move the starting position to the value payload offset
        buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
    }

    // create the stream
    bufferStream = new ByteBufferOutputStream(buffer);
    appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

//RecordBatch.java
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
    this.createdMs = now;
    this.lastAttemptMs = now;
    this.records = records;
    this.topicPartition = tp;
    this.produceFuture = new ProduceRequestResult();
    this.thunks = new ArrayList<Thunk>();
    this.lastAppendTime = createdMs;
    this.retry = false;
}

其实能够看到,根本就是初始化 RecordBatch 和 MemoryRecords 外部数据结构,外围通过一个组件 Compressor 操作 ByteBuffer,将音讯音讯的 key 和 Value 之后写入到一个 outPutStream 流中。

至于具体如何写入的,其实是在之后 MemoryRecords.append()中

public long append(long offset, long timestamp, byte[] key, byte[] value) {if (!writable)
        throw new IllegalStateException("Memory records is not writable");

    int size = Record.recordSize(key, value);
    compressor.putLong(offset);
    compressor.putInt(size);
    long crc = compressor.putRecord(timestamp, key, value);
    compressor.recordWritten(size + Records.LOG_OVERHEAD);
    return crc;
}
public static int recordSize(int keySize, int valueSize) {return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;}

public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
                      int valueOffset, int valueSize) {
    // put a record as un-compressed into the underlying stream
    long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
    byte attributes = Record.computeAttributes(type);
    putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
    return crc;
}


 public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
        // write crc
        compressor.putInt((int) (crc & 0xffffffffL));
        // write magic value
        compressor.putByte(CURRENT_MAGIC_VALUE);
        // write attributes
        compressor.putByte(attributes);
        // write timestamp
        compressor.putLong(timestamp);
        // write the key
        if (key == null) {compressor.putInt(-1);
        } else {compressor.putInt(key.length);
            compressor.put(key, 0, key.length);
        }
        // write the value
        if (value == null) {compressor.putInt(-1);
        } else {int size = valueSize >= 0 ? valueSize : (value.length - valueOffset);
            compressor.putInt(size);
            compressor.put(value, valueOffset, size);
        }
    }
    public void putByte(final byte value) {
        try {appendStream.write(value);
        } catch (IOException e) {throw new KafkaException("I/O exception when writing to the append stream, closing", e);
        }
    }

通过下面的代码能够看进去,音讯最终会依照肯定格局最终序列化成字节数组写入到输入流中。

什么样的格局呢?其实依据写入的程序能够看出它的自定义二进制协定如下:

offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value

整个过程他标准里规定了,就是先是几个字节的 offset,而后是几个字节的 size,而后是几个字节的 crc,接着是几个字节的 magic,以此类推,他就是齐全依照标准来写入 ByteBuffer 里去的。

并且能够看到他最最底层的写入 ByteBuffer 的 IO 流的形式。

整个过程简略能够概括如下图所示:

如果你仔细分析下,会发现个流能够进行压缩,ByteBufferOutputStream 给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的流的缓冲区,之后再写入 ByteBufferOutputStream。如果是非压缩的模式,最最一般的状况下,就是 DataOutputStream 包裹了 ByteBufferOutputSteram,而后写入数据,Long、Byte、String,都会在底层转换为字节进入到 ByteBuffer 里去。

其实这才是音讯最终的序列化,依据自定义的二进制协定写入流中发送进来。之前的序列化定义只是对 key 和 value 的定义而已,最终底层的序列化,会包装一些元数据。

这里 kafka 对音讯格局其实做过优化和改良,有趣味的同学能够去查阅下材料理解下。它为什么这么设计呢?解决了哪些问题,其实都是值得思考的中央。

这里剧透下,通过这样的音讯格局其实能够解决粘包拆包的问题,你看进去了么?

小结

好了,明天就到这里了。咱们次要剖析了 tryAppend 逻辑:

1)是如何将音讯最终放入到内存的队列中的。

2)并且晓得了 batch 打包音讯的机制

3)最终序列化音讯的自定义二进制协定

这当中其实有很多值得思考的亮点,Kafka 这块的源码逻辑还是很值得大家多钻研几遍。

之后的咱们要剖析的逻辑其实就是如何内存队列中将打包好的音讯发送给 Broker 的。咱们下一节再见!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0