关于kafka:Kafka-缓冲区里的数据什么时候发送

咱们数据都写入了缓冲区,那这里的数据不可能始终放在缓存里的,所以kafka有一个线程,叫做Sender,这个线程,会始终运行,查看缓冲区里的能够发送的音讯,就会把他收回去,上面看看有几种场景能够发消息。

内存池不够

上一篇也提供,当申请的内存不足时,就会进入阻塞状态,这个阻塞状态是在waiters的deque里放入一个Condition对象,所以当waiters里有Condition对象的时候,阐明此事内存池是不够的,此事就能够发送音讯了。

批次写满

咱们回顾一下上篇的内容,每个RecordBatch默认为16k,可能会有多个音讯往一个RecordBatch写,当写满16k的时候,此时就具备了发送进来的条件。

所以Sender就会从元数据里拿到每一个topic+partition形成的TopicPartition,每一个TopicPartition又有一个Deque,从Deque里拿出曾经写满的RecordBatch收回去。

这里有个比拟重要的是,每次都只查看deque里的第一个RecordBatch,只有他满了,就能够开始发送。

工夫到了

如果RecordBatch始终不到16k,那音讯也不能始终寄存在缓存里吧,所以lingerMs,默认0ms,通过linger.ms进行设置。也就是当lingerMs毫秒后,这个RecordBatch如果没有16k,也会发送进来。所以这里咱们个别不能用默认值,当默认为0ms的时候,Sender每次都间接把音讯收回去,这样就失去了RecordBatch按批次散发的劣势了。

因为kafka发送失败的时候会进行重试,所以如果有重试的话,那以重试的工夫作为准,通过retry.backoff.ms进行设置,默认100ms,所以重试的时候,当超过100ms的时候也会主动收回去。

如果过期了,这些RecordBatch就会从队列里移除,并且开释对应的内存资源。

其余

其余两个,散布为缓冲区敞开的时候或者有过程在flush的时候。

连贯

当确认到能够发送的deque的时候,就会把对应的Node给记录下来,寄存在Node列表中,这个Node是每个partition的leader节点,如果没有对应的leader节点,就须要进行标记,等发送的时候再从新拉取元数据信息。

遍历完每个RecordBatch的队列后,此时还没有达到要收回去的条件,毕竟Node里对应的broker服务器,此时还不晓得能不能连贯的上,所以还要验证有没有连贯,如果没有,看看能不能连贯的上,如果都不行,阐明这个Node节点是用不了了,此时就要把这个Node从Node列表中,那这个Node对应的音讯就不能发送了。

下面的Node对应的是broker服务器,如果Node0和Node1是一个服务器,那发送的时候,就会把同一个服务器的对应的RecordBatch封装成一个个的ClientRequest一起发送。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理