kafka源码解析3RecordAccumulator消息存储

8次阅读

共计 3220 个字符,预计需要花费 9 分钟才能阅读完成。

概述

前面两节分析了部分源码,中间间隔了一个月没有继续分享,原因是在深入阅读中碰到了一些问题,因为我阅读源码是以 KafkaProducer 发送消息(send 方法)作为入口开始逐行分析,涉及到新的类后就把整个类和相关的类全部解析一遍,但是引入的其他类在 send 方法里只用到了部分方法,导致我在分析时没有条理,效率很低。从本节开始,我调整下分享的思路,基本点不变,还是以 KafkaProducer->send 方法为入口,优先解析 send 中设计的代码,其他代码等用到时在深入解析。本节我们从 RecordAccumulator 开始,RecordAccumulator 是一个消息累加器,producer 会把消息封装到累加器里方便发送给 broker。

入口

先放上 KafkaProducer.send 里调用累加器的部分代码,这里看到 producer 把消息的目标分区信息,序列化以后的 key,value 和回调还有 headers 统一传给累加器。

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

append 解析

RecordAccumulator 内部有一个类型为 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 的成员变量 batches,key 我们之前有提到过,是 Topic 的分区信息,value 是一个批次的双端队列,ProducerBatch 是一个批次,里面存放具体的消息。整个 append 方法重点步骤如下

  1. 拿到对应 TopicPartition 的批次队列 Deque<ProducerBatch>。
  2. 第一次 tryAppend
  3. 成功则返回,如果失败分配缓冲区
  4. 第二次 tryAppend
  5. 成功则返回,失败则创建新批次 ProducerBatch
  6. 第三次 tryAppend
  7. 释放缓冲区

这里 RecordAccumulator 用了一个细粒度锁的设计,主要考虑两点:

  1. 没有给整个方法加锁,通过使用 ConcurrentHashMap 保证并发安全,这样并发度就高了,每一个 TopicPartition 对应的的批次都可以并发操作。
  2. 也没有给整段操作批次的代码加锁,而是分段加锁,把能并发执行的申请缓冲区分离出来,这个设计可以看出在性能上考虑的很极致,只在必需的地方加锁。

进一步的解析可以看下面代码注释:

    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // 获取批次队列,有就返还旧的,没有就新增然后返回新的
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                // 第一次尝试 append,如果批次队列中没有批次或者空间不够就返回 null
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            // 申请缓冲区,这里可以并发执行没必要加锁影响性能
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                // 第二次尝试,因为上面锁锁释放掉了所以别的线程有可能已经创建好了批次,这里再尝试 append 一次
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
                // 又失败了,说明释放锁的时候没有别的线程创建批次,只能自己创建了,下面是准备工作
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                // 第三次 tryAppend,这次一定要成功了
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
                // 新创建的批次加入到队列中
                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;

                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            // 释放掉刚才申请的缓冲区,缓冲区后面会讲
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();}
    }

正文完
 0