次要流程
作为一个Producer来说其实外围是梳理2个货色:Sender和RecordAccumulator

Sender: 是kafka发送流程的次要服务,负责接收数据并将其搁置到RecordAccumulator,或者从RecordAccumulator中取出数据发送到Kafka的服务端,或者负责更新一些meta服务等状况。
RecordAccumulator:kafka的整个发送流程是异步的,次要目标是为了batch一些数据以增大吞吐,而RecordAccumulator则是次要负责进行对数据缓存进行治理的次要对象

作为Sender单次循环体内的外围的流程大如上图所示,咱们能够依照图中的流程自顶向下拆解出各个步骤的细。上述流程在Sender#sendProducerData中

如何判断和获取能够发送的kafka节点
首先在RecordAccumulator外部,数据是以Map<TopicPartition, Deque>的模式缓存的:

TopicPartition是很显然指topic-partion

ProducerBatch则是须要同一批发送的Record申请,ProducerBatch自身不是线程平安的,实际操作时会以所在的Deque粒度进行上锁。在ProducerBatch内,理论的recrod以MemoryRecordsBuilder的模式保护,同时ProducerBatch也会为何很多其余数据,比方一些request的数据回调等等,如果前面咱们能够持续聊,现阶段还是先回归主流程的剖析

final long createdMs;
final TopicPartition topicPartition;
final ProduceRequestResult produceFuture;

private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);

int recordCount;
int maxRecordSize;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private boolean retry;
private boolean reopened;
复制代码
判断这哪些数据是ready的外围代码在kafka的RecordAccumulator的ready的局部:

首先须要服务端满足肯定的条件:

须要被发送的partion的leader是已知的,如果蕴含未知的leader则须要拜访kafka的服务端查问元数据,不过这部分内容会block整体流程,因而实际上会做成异步的
以后待发送的partion并且没有被muted,也就是没有被设置为阻塞状态。
以后partion不处于backoff状态,这里次要指以后的partion有正在触发重试的状态。
其次则是以后partion的batch的须要满足肯定条件

以后batch的间隔上一次发送过来的工夫的等待时间> 容许期待的时延(如果是首次尝试则应用lingerMs,如果是重试逻辑则应用retryBackoffMs)
以后双端队列是否存在已满的batch,比方队列中的原始的数量大于1,或者仅有一个元素然而size满足发送条件
以后Producer曾经处于close状态
总体内存已满:咱们曾经晓得Producer的数据是须要缓存一段时间的,Producer外部有一个管制内存的内存池即BufferPool,如果内存不够用了则会排队申请,如果这对队列不为空则阐明总内存不够了
存在正在刷新的线程:这里略微难了解一点,等我比拟确定了再补充。
事务实现,(高版本kakfa反对的事务模型,暂不赘述)
如何获取待发送的Batch数据
次要逻辑概括的说:

遍历RecordAccumulator中的ConcurrentMap<TopicPartition, Deque>,针对每个TopicPartition尝试获取不高于maxRequestSize的batch列表,将这些Batch敞开并放入待发送列表中。

然而在实现中还是有一些逻辑须要留神。

咱们都晓得根本的kafka的broker和kafka的topic-partion的概念,不同的partion可能调配到同一个broker上。在kafka的实现中,每次drain的过程只会从以后的node节点中调出一个partion进行发送音讯。

为了防止每次投递的时候都从0开始投递从而导致序列化较大的partion会饥饿,客户端虚构出了一个drainIndex,在每次drain的过程中会递增,理论的其实节点从start开始。

int start = drainIndex = drainIndex % parts.size();
复制代码
不过这里有一点我没太看懂,为什么drainInde是全局的,如果是我做可能就做nodeId维度的了,不太分明这里思考的点是什么?如果是全局的drainIndex,其实还存在如果单个Node的partion太多远远多余其余的Node从而导致饥饿?

另一个有意思的问题是当有一些比拟极其的case,比方单个Batch外面只有一个message,然而这个message的size曾经大于request size的限度了,这时候就会尝试将这条音讯独自作为一个batch发送,为了实现在这一点,kafka的client只在待发送列表不为空时检测以后待发送+nextbatch的size之和是否大于request size

上述代在:RecordAccumulator#drainBatchesForOneNode中

如何检测过期数据
检测其实分成2个局部,一部分是RecordAccumulator中的buffer的数据ConcurrentMap<TopicPartition, Deque>中的过期数据,一部分是Sender中的待发送的数据Map<TopicPartition, List> inFlightBatches

比拟的工夫是deliveryTimeoutMs,和以后工夫-创立工夫的差值。

对于生效的数据会调用失败的callBack。

数据发送和打包
实现了上述数据过滤的数据会打包成ProduceRequest交给client进行发送。

配置和限度条件
梳理完上述的条件之后咱们来一起看下有哪些配置管制了上述的一些流程:

batch.size:这里指的是每个双端队列的ProducerBatch的size大小
buffer.memory:这里指的是RecordAccumulator的Buffer的size大小
max.request.size:这里指的是在drain的过程中发给每个Node的size大小,如果是单个messge大于这个值是会跳过检测的,然而会影响打包的形式。
linger.ms:在非重试的场景下数据从ProducerBatch开始创立到drain到待发送区域之间,在buffer中驻留的工夫
retry.backoff.ms:根本同上,区别在于是在重试场景下容许驻留在buffer和待发送区域的最大工夫,这个配置实际上是为了防止一些极其的场景,比方在重试的场景下,可能是因为服务端有问题,如果咱们不减少client在内存驻留的工夫,则可能在十分短的工夫内把重试次数耗尽。
delivery.timeout.ms:这个配置指的的是从数据从add到kafka的客户端到client开始解决发送流程的总耗时,包含驻留在buffer中的工夫和待发送列表中的工夫。
request.timeout.ms:这部分工夫实际上指的是client开始发送request到收到response之间的工夫。

比方我所遇到的线上问题 :e.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

从代码来看是卡在了申请内存的阶段,实际上就是buffer的size不够了。对照了一下Producer的配置发现了batch.size设置过大导致了上游的topicpartion的数量batch.size之后远超过buffer.memory,也就是buffer最多只能放局部partion的数据,进而导致整个Producer的生产流程阻塞。

最初
如果你感觉此文对你有一丁点帮忙,点个赞。或者能够退出我的开发交换群:1025263163互相学习,咱们会有业余的技术答疑解惑

如果你感觉这篇文章对你有点用的话,麻烦请给咱们的开源我的项目点点star:http://github.crmeb.net/u/defu不胜感激 !

PHP学习手册:https://doc.crmeb.com
技术交换论坛:https://q.crmeb.com