上一节咱们次要剖析了RecordAccumulator通过BufferPool申请内存的源码原理,在之前的剖析中,在KafkaProducer发送音讯时,把音讯放入内存缓冲区中次要分为了三步。如下:
而且之前咱们次要剖析了前两步的代码,如下正文所示:
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { // check if we have an in-progress batch // 1、创立寄存音讯的内存后果 实质是一个map汇合,外部次要是双端队列 (已剖析) Deque<RecordBatch> dq = getOrCreateDeque(tp); //这段tryAppend代码发第一条音讯的时候不会执行,临时没有剖析 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) return appendResult; } //2、BufferPool申请内存块逻辑 (已剖析) // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); //3、将音讯封装到内存块中,放入之前筹备的内存构造 (待剖析) synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
之前剖析的逻辑,能够概括如下图所示:
这一节咱们持续来剖析将音讯放入缓冲区的最初一步—tryAppend的逻辑,让咱们一起来看下吧!
tryAppend的外围脉络
首先咱们必定要初步摸一下第三步代码次要在做什么。它的逻辑次要如下:
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }
能够看到下面的代码次要脉络:
1)执行了一段tryAppend() 如果返回appendResult非空,则进行了一个开释BufferPool内存块的逻辑。否则什么都不做,这个之前第一步 getOrCreateDeque前面的逻辑很像,应该是同样的逻辑。
2)创立了 MemoryRecords和RecordBatch这两个要害的对象,应该是通过这两个对象,把Buffer这个内存块和音讯一起进行了封装
3)之后执行了Recordbatch.tryAppend办法,又一个tryAppend,还是异步的一个Feature。应该也是某种追加逻辑。
4)最初就是将RecordBatch这个batch对象放入了2个内存构造了,dp就是之前的双端队列,incomplete是一个RecordBatch的Set汇合,从名字上看是正在解决中或者未实现发送的RecordBatch的意思
整个脉络补充到下面的图中,如下所示:
既然晓得了这个外围脉络,咱们一步一步来剖析下就好。
两段tryAppend()到底在干什么?
在下面外围脉络中,有两段TryAppend逻辑。首先咱们来看下tryAppend()在做什么。
//RecordAccumulator.javasynchronized (dq) { RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } }private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) { RecordBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) last.records.close(); else return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); } return null;}// RecordBatch.tryAppend() public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { long checksum = this.records.append(offsetCounter++, timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }//MemoryRecords.java append办法 public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); long crc = compressor.putRecord(timestamp, key, value); compressor.recordWritten(size + Records.LOG_OVERHEAD); return crc; }
这段逻辑其实比拟不好了解。外部调用了两个不同类的tryAppend()。我起初看这块的时候,都没了解它在干什么。
为什么这里加了synchronized (dq)?
从这个队列中deque.peekLast()是在干吗?
为啥又执行了last.tryAppend?
......
反正有各种问题,各种不了解。咱们看源码常常会遇到这种状况。这个时候怎么办呢?
其实就是一个方法:尝试。
尝试各种办法,比方看看正文、比方能不能debug、比方能不能举例子、比方画图、比方能不能搜搜帖子看看有人钻研过这块逻辑么。或者你可能就是本人非要死磕硬钢,一直反复浏览逻辑,聚焦这里,认真思考、钻研这这块逻辑外围目标是什么.....
总之,就这样一直反复尝试,钻研,不放弃,你就会缓缓了解艰涩的知识点。
就像有句老话说:”只有思维不滑坡,方法总比艰难多。“ 意思就是对技术有趣味,放弃心态好,不埋怨,不焦躁,慢慢来,一直钻研和尝试各种办法,总会解决困难的。其实这个就是许多人的差别所在,也正是每个人须要成长的中央。
而这块逻辑通过我一直钻研和思考后,发现逻辑并不简单。上面我举几个例子来帮忙大家了解这段逻辑。
场景是这样的,Producer有一个线程1,一直发送总大小为3KB的音讯Record。间断发送了6条,咱们来看下它的流程是怎么样的呢?
首先线程1第1次发送音讯的话,依照下面的外围脉络,执行过程如下所示:
从图中能够清晰的看到线程1第1次发送音讯时候,创立了一个内存块ByteBuffer,之后将音讯进行了封装解决,写入到了RecordBatch,最初将RecordBatch放入队列中。(能够对应下面的代码来了解)
通过画图,咱们能够梳理思路,串起来整个流程整个办法在你困惑的时候十分有用。
接这线程1持续发送音讯,发送第2-5条音讯,流程则会产生一些扭转,如下图所示:
在发送第2-5条音讯时,能够发现不会申请新的ByteBuffer,间接向现有的的RecordBatch写入音讯,通过MemoryRecords追加音讯到ByteBuffer的字节流中,最终RecordBatch的大小为15KB。(能够对应下面的代码来了解)
接着线程1发送第6条音讯时,流程又会产生新的变动,整体如下图所示:
在发送第6条音讯时,又申请了一块内存块和创立了新的RecordBatch。之后因为第一个Batch写满了,会将音讯写入第二个Batch,并放入队列中。(能够对应下面的代码来了解)
通过下面的例子,你能够发现KafkaProducer整个发送音讯的过程是一个打包发送的过程,会将音讯一条一条打成一个16KB的batch,放入队列,之后发送进来。
这种微打包逻辑很多中间件和框架,甚至业务零碎中都能够借鉴,你能够发散下思维想想是不是?
其实这里还有很多思考点:
比方单条音讯如果超过16KB会怎么样?提醒:ByteBuffer不会被复用,间接会生成一个大的RecordBatch。
每个Batch为什么会生成一个异步feature,放入List<Thunk> thunks列表中?提醒:可能是为了每条音讯绑定回调和超时管制,晋升性能和吞吐量。
synchronized加锁的目标什么?提醒:必定是思考多线程发送音讯的场景的。
这些留给大家去思考和练习下,大家能够在评论区留下你的答复。或者在之后的我建设微信群后探讨下。
Record音讯的进一步封装和序列化
剖析完两段tryAppend()到底在干什么之后,其实整个tryAppend的外围脉络就剩下一段逻辑了。
如何创立了 MemoryRecords和RecordBatch这两个要害的对象,并通过这两个对象,把ByteBuffer这个内存块和音讯一起进行了封装的呢?
让咱们来看下代码:
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//MemoryRecords.javaprivate MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) { this.writable = writable; this.writeLimit = writeLimit; this.initialCapacity = buffer.capacity(); if (this.writable) { this.buffer = null; this.compressor = new Compressor(buffer, type); } else { this.buffer = buffer; this.compressor = null; }}//Compressor.javapublic Compressor(ByteBuffer buffer, CompressionType type) { this.type = type; this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; this.compressionRate = 1; this.maxTimestamp = Record.NO_TIMESTAMP; if (type != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata // and move the starting position to the value payload offset buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); } // create the stream bufferStream = new ByteBufferOutputStream(buffer); appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);}//RecordBatch.javapublic RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.createdMs = now; this.lastAttemptMs = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); this.thunks = new ArrayList<Thunk>(); this.lastAppendTime = createdMs; this.retry = false;}
其实能够看到,根本就是初始化RecordBatch和MemoryRecords外部数据结构,外围通过一个组件Compressor操作ByteBuffer,将音讯音讯的key和Value之后写入到一个outPutStream流中。
至于具体如何写入的,其实是在之后MemoryRecords.append()中
public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); long crc = compressor.putRecord(timestamp, key, value); compressor.recordWritten(size + Records.LOG_OVERHEAD); return crc;}public static int recordSize(int keySize, int valueSize) { return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;}public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { // put a record as un-compressed into the underlying stream long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize); byte attributes = Record.computeAttributes(type); putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize); return crc;} public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) { // write crc compressor.putInt((int) (crc & 0xffffffffL)); // write magic value compressor.putByte(CURRENT_MAGIC_VALUE); // write attributes compressor.putByte(attributes); // write timestamp compressor.putLong(timestamp); // write the key if (key == null) { compressor.putInt(-1); } else { compressor.putInt(key.length); compressor.put(key, 0, key.length); } // write the value if (value == null) { compressor.putInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); compressor.putInt(size); compressor.put(value, valueOffset, size); } } public void putByte(final byte value) { try { appendStream.write(value); } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); } }
通过下面的代码能够看进去,音讯最终会依照肯定格局最终序列化成字节数组写入到输入流中。
什么样的格局呢?其实依据写入的程序能够看出它的自定义二进制协定如下:
offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value
整个过程他标准里规定了,就是先是几个字节的offset,而后是几个字节的size,而后是几个字节的crc,接着是几个字节的magic,以此类推,他就是齐全依照标准来写入ByteBuffer里去的。
并且能够看到他最最底层的写入ByteBuffer的IO流的形式。
整个过程简略能够概括如下图所示:
如果你仔细分析下,会发现个流能够进行压缩,ByteBufferOutputStream给包裹在一个压缩流里,gzip、lz4、snappy,如果是包裹在压缩流里,写入的时候会先进入压缩流的流的缓冲区,之后再写入ByteBufferOutputStream。 如果是非压缩的模式,最最一般的状况下,就是DataOutputStream包裹了ByteBufferOutputSteram,而后写入数据,Long、Byte、String,都会在底层转换为字节进入到ByteBuffer里去。
其实这才是音讯最终的序列化,依据自定义的二进制协定写入流中发送进来。之前的序列化定义只是对key和value的定义而已,最终底层的序列化,会包装一些元数据。
这里kafka对音讯格局其实做过优化和改良,有趣味的同学能够去查阅下材料理解下。它为什么这么设计呢?解决了哪些问题,其实都是值得思考的中央。
这里剧透下,通过这样的音讯格局其实能够解决粘包拆包的问题,你看进去了么?
小结
好了,明天就到这里了。咱们次要剖析了tryAppend逻辑:
1)是如何将音讯最终放入到内存的队列中的。
2)并且晓得了batch打包音讯的机制
3)最终序列化音讯的自定义二进制协定
这当中其实有很多值得思考的亮点,Kafka这块的源码逻辑还是很值得大家多钻研几遍。
之后的咱们要剖析的逻辑其实就是如何内存队列中将打包好的音讯发送给Broker的。咱们下一节再见!
本文由博客一文多发平台 OpenWrite 公布!