共计 1942 个字符,预计需要花费 5 分钟才能阅读完成。
音讯在真正发往 Kafka 之前,有可能须要经验拦截器、序列化器和分区器等一系列的作用,后面曾经做了一系列剖析。那么在此之后又会产生什么呢?先看一下生产者客户端的整体架构,如下图所示。
整个生产者客户端由两个线程协调运行,这两个线程别离为主线程和发送线程。在主线程中由 KafkaProducer 创立音讯,而后通过可能的拦截器、序列化器和分区器的作用之后缓存到音讯收集器(RecordAccumulator,也称为音讯累加器)中。发送线程负责从音讯收集器中获取音讯并将其发送到 Kafka 中。
次要用来缓存音讯以便发送线程能够批量发送,进而缩小网络传输的资源耗费以晋升性能。音讯收集器缓存的大小能够通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送音讯的速度超过发送到服务器的速度,则会导致生产者空间有余,这个时候 KafkaProducer 的 send() 办法调用要么被阻塞,要么抛出异样,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。
主线程中发送过去的音讯都会被追加到音讯收集器的某个双端队列(Deque)中,在其的外部为每个分区都保护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。音讯写入缓存时,追加到双端队列的尾部;Sender 读取音讯时,从双端队列的头部读取。留神 ProducerBatch 不是 ProducerRecord,ProducerBatch 中能够蕴含一至多个 ProducerRecord。
艰深地说,ProducerRecord 是生产者中创立的音讯,而 ProducerBatch 是指一个音讯批次,ProducerRecord 会被蕴含在 ProducerBatch 中,这样能够使字节的应用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也能够缩小网络申请的次数以晋升整体的吞吐量。
如果生产者客户端须要向很多分区发送音讯,则能够将 buffer.memory 参数适当调大以减少整体的吞吐量。
ProducerBatch 的大小和 batch.size 参数也有着亲密的关系。当一条音讯流入音讯收集器时,会先寻找与音讯分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还能够写入这个 ProducerRecord,如果能够则写入,如果不能够则须要创立一个新的 ProducerBatch。
在新建 ProducerBatch 时评估这条音讯的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创立 ProducerBatch,这样在应用完这段内存区域之后,能够通过 BufferPool 的治理来进行复用;如果超过,那么就以评估的大小来创立 ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取缓存的音讯之后,会进一步将本来 < 分区, Deque< ProducerBatch>> 的保留模式转变成 <Node, List< ProducerBatch> 的模式,其中 Node 示意 Kafka 集群的 broker 节点。
对于网络连接来说,生产者客户端是与具体的 broker 节点建设的连贯,也就是向具体的 broker 节点发送音讯,而并不关怀音讯属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,咱们只关注向哪个分区中发送哪些音讯,所以在这里须要做一个应用逻辑层面到网络 I / O 层面的转换。
申请在从 Sender 线程发往 Kafka 之前还会保留到 InFlightRequests 中,保留对象的具体模式为 Map<NodeId, Deque>,它的次要作用是缓存了曾经收回去但还没有收到响应的申请(NodeId 是一个 String 类型,示意节点的 id 编号)。
与此同时,InFlightRequests 还提供了许多治理类的办法,并且通过配置参数还能够限度每个连贯(也就是客户端与 Node 之间的连贯)最多缓存的申请数。这个配置参数为 max.in.flight.requests.per.connection,默认值为 5,即每个连贯最多只能缓存 5 个未响应的申请,超过该数值之后就不能再向这个连贯发送更多的申请了,除非有缓存的申请收到了响应(Response)。通过比拟 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否曾经沉积了很多未响应的音讯,如果真是如此,那么阐明这个 Node 节点负载较大或网络连接有问题,再持续向其发送申请会增大申请超时的可能。