共计 3818 个字符,预计需要花费 10 分钟才能阅读完成。
次要流程
作为一个 Producer 来说其实外围是梳理 2 个货色:Sender 和 RecordAccumulator
Sender:是 kafka 发送流程的次要服务,负责接收数据并将其搁置到 RecordAccumulator,或者从 RecordAccumulator 中取出数据发送到 Kafka 的服务端,或者负责更新一些 meta 服务等状况。
RecordAccumulator:kafka 的整个发送流程是异步的,次要目标是为了 batch 一些数据以增大吞吐,而 RecordAccumulator 则是次要负责进行对数据缓存进行治理的次要对象
作为 Sender 单次循环体内的外围的流程大如上图所示,咱们能够依照图中的流程自顶向下拆解出各个步骤的细。上述流程在 Sender#sendProducerData 中
如何判断和获取能够发送的 kafka 节点
首先在 RecordAccumulator 外部,数据是以 Map<TopicPartition, Deque> 的模式缓存的:
TopicPartition 是很显然指 topic-partion
ProducerBatch 则是须要同一批发送的 Record 申请,ProducerBatch 自身不是线程平安的,实际操作时会以所在的 Deque 粒度进行上锁。在 ProducerBatch 内,理论的 recrod 以 MemoryRecordsBuilder 的模式保护,同时 ProducerBatch 也会为何很多其余数据,比方一些 request 的数据回调等等,如果前面咱们能够持续聊,现阶段还是先回归主流程的剖析
final long createdMs;
final TopicPartition topicPartition;
final ProduceRequestResult produceFuture;
private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
int recordCount;
int maxRecordSize;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private boolean retry;
private boolean reopened;
复制代码
判断这哪些数据是 ready 的外围代码在 kafka 的 RecordAccumulator 的 ready 的局部:
首先须要服务端满足肯定的条件:
须要被发送的 partion 的 leader 是已知的,如果蕴含未知的 leader 则须要拜访 kafka 的服务端查问元数据,不过这部分内容会 block 整体流程,因而实际上会做成异步的
以后待发送的 partion 并且没有被 muted,也就是没有被设置为阻塞状态。
以后 partion 不处于 backoff 状态,这里次要指以后的 partion 有正在触发重试的状态。
其次则是以后 partion 的 batch 的须要满足肯定条件
以后 batch 的间隔上一次发送过来的工夫的等待时间 > 容许期待的时延(如果是首次尝试则应用 lingerMs,如果是重试逻辑则应用 retryBackoffMs)
以后双端队列是否存在已满的 batch,比方队列中的原始的数量大于 1,或者仅有一个元素然而 size 满足发送条件
以后 Producer 曾经处于 close 状态
总体内存已满:咱们曾经晓得 Producer 的数据是须要缓存一段时间的,Producer 外部有一个管制内存的内存池即 BufferPool,如果内存不够用了则会排队申请,如果这对队列不为空则阐明总内存不够了
存在正在刷新的线程:这里略微难了解一点,等我比拟确定了再补充。
事务实现,(高版本 kakfa 反对的事务模型,暂不赘述)
如何获取待发送的 Batch 数据
次要逻辑概括的说:
遍历 RecordAccumulator 中的 ConcurrentMap<TopicPartition, Deque>,针对每个 TopicPartition 尝试获取不高于 maxRequestSize 的 batch 列表,将这些 Batch 敞开并放入待发送列表中。
然而在实现中还是有一些逻辑须要留神。
咱们都晓得根本的 kafka 的 broker 和 kafka 的 topic-partion 的概念,不同的 partion 可能调配到同一个 broker 上。在 kafka 的实现中,每次 drain 的过程只会从以后的 node 节点中调出一个 partion 进行发送音讯。
为了防止每次投递的时候都从 0 开始投递从而导致序列化较大的 partion 会饥饿,客户端虚构出了一个 drainIndex,在每次 drain 的过程中会递增,理论的其实节点从 start 开始。
int start = drainIndex = drainIndex % parts.size();
复制代码
不过这里有一点我没太看懂,为什么 drainInde 是全局的,如果是我做可能就做 nodeId 维度的了,不太分明这里思考的点是什么?如果是全局的 drainIndex,其实还存在如果单个 Node 的 partion 太多远远多余其余的 Node 从而导致饥饿?
另一个有意思的问题是当有一些比拟极其的 case,比方单个 Batch 外面只有一个 message,然而这个 message 的 size 曾经大于 request size 的限度了,这时候就会尝试将这条音讯独自作为一个 batch 发送,为了实现在这一点,kafka 的 client 只在待发送列表不为空时检测以后待发送 +nextbatch 的 size 之和是否大于 request size
上述代在:RecordAccumulator#drainBatchesForOneNode 中
如何检测过期数据
检测其实分成 2 个局部,一部分是 RecordAccumulator 中的 buffer 的数据 ConcurrentMap<TopicPartition, Deque> 中的过期数据,一部分是 Sender 中的待发送的数据 Map<TopicPartition, List> inFlightBatches
比拟的工夫是 deliveryTimeoutMs,和以后工夫 - 创立工夫的差值。
对于生效的数据会调用失败的 callBack。
数据发送和打包
实现了上述数据过滤的数据会打包成 ProduceRequest 交给 client 进行发送。
配置和限度条件
梳理完上述的条件之后咱们来一起看下有哪些配置管制了上述的一些流程:
batch.size:这里指的是每个双端队列的 ProducerBatch 的 size 大小
buffer.memory:这里指的是 RecordAccumulator 的 Buffer 的 size 大小
max.request.size:这里指的是在 drain 的过程中发给每个 Node 的 size 大小,如果是单个 messge 大于这个值是会跳过检测的,然而会影响打包的形式。
linger.ms:在非重试的场景下数据从 ProducerBatch 开始创立到 drain 到待发送区域之间,在 buffer 中驻留的工夫
retry.backoff.ms:根本同上,区别在于是在重试场景下容许驻留在 buffer 和待发送区域的最大工夫,这个配置实际上是为了防止一些极其的场景,比方在重试的场景下,可能是因为服务端有问题,如果咱们不减少 client 在内存驻留的工夫,则可能在十分短的工夫内把重试次数耗尽。
delivery.timeout.ms:这个配置指的的是从数据从 add 到 kafka 的客户端到 client 开始解决发送流程的总耗时,包含驻留在 buffer 中的工夫和待发送列表中的工夫。
request.timeout.ms:这部分工夫实际上指的是 client 开始发送 request 到收到 response 之间的工夫。
比方我所遇到的线上问题:e.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
从代码来看是卡在了申请内存的阶段,实际上就是 buffer 的 size 不够了。对照了一下 Producer 的配置发现了 batch.size 设置过大导致了上游的 topicpartion 的数量 batch.size 之后远超过 buffer.memory,也就是 buffer 最多只能放局部 partion 的数据,进而导致整个 Producer 的生产流程阻塞。
最初
如果你感觉此文对你有一丁点帮忙,点个赞。或者能够退出我的开发交换群:1025263163 互相学习,咱们会有业余的技术答疑解惑
如果你感觉这篇文章对你有点用的话,麻烦请给咱们的开源我的项目点点 star:http://github.crmeb.net/u/defu 不胜感激!
PHP 学习手册:https://doc.crmeb.com
技术交换论坛:https://q.crmeb.com