kafka源码解析4RecordAccumulator的相关组件BufferPoolCopyOnWriteMap下

82次阅读

共计 1103 个字符,预计需要花费 3 分钟才能阅读完成。

CopyOnWriteMap

    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

前面解析 RecordAccumulator 提到了 batches 是用来存放每个 TopicPartition 对应的批次队列的,因为会在多线程环境下使用所以声明为 ConcurrentMap,但是 batches 是一个读多写少的场景,所以 kafka 设计了 CopyOnWriteMap 这种数据结构通过 CopyOnWrite 这种模式,加锁写保证数据不会有并发问题,读的是不可变的 HashMap 来保证性能, 但是 COW 模式会有短暂的数据延迟,kafka 是怎么解决的呢?

    private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }

COW 会数据不一致问题的原因是因为 COW 只加写锁不加读锁,
先分析下既有读锁又有写锁的情况,读写互斥运行,即同一时间读和写只能执行一个,这样保证读的时候就是最新值。如果去掉读锁只加写锁呢,那么读的时候如果写在执行读的就不是最新值,但是 batches 的场景比较特殊,它只会插入一次,不会更新。所以 TopicPartition 存在的话读就是不需要加锁的,这样和 COW 的场景完全吻合,所以只需要先 get,为空的时候再从无锁升级到写锁保证不会重复插入。

    @Override
    public synchronized V putIfAbsent(K k, V v) {if (!containsKey(k))
            return put(k, v);
        else
            return get(k);
    }

这个场景决定了写操作执行的机会很少,无论消息数有多少,加锁的次数只和 TopicPartition 数相关,而读又是 HashMap 无锁操作,这样既提升了性能又规避掉了数据一致性问题。

总结

kafka 的 CopyOnWriteMap 给我们在日常工作设计并发数据结构提供了一个很好的思路,先分析场景,再根据场景的特征(比如读写频率),并且再利用一些合理的设计模式。到这里 RecordAccumulator 的相关组件就解析完了。下节打算分析 kafka 的网络设计。

正文完
 0