共计 1198 个字符,预计需要花费 3 分钟才能阅读完成。
咱们数据都写入了缓冲区,那这里的数据不可能始终放在缓存里的,所以 kafka 有一个线程,叫做 Sender,这个线程,会始终运行,查看缓冲区里的能够发送的音讯,就会把他收回去,上面看看有几种场景能够发消息。
内存池不够
上一篇也提供,当申请的内存不足时,就会进入阻塞状态,这个阻塞状态是在 waiters 的 deque 里放入一个 Condition 对象,所以当 waiters 里有 Condition 对象的时候,阐明此事内存池是不够的,此事就能够发送音讯了。
批次写满
咱们回顾一下上篇的内容,每个 RecordBatch 默认为 16k,可能会有多个音讯往一个 RecordBatch 写,当写满 16k 的时候,此时就具备了发送进来的条件。
所以 Sender 就会从元数据里拿到每一个 topic+partition 形成的 TopicPartition,每一个 TopicPartition 又有一个 Deque,从 Deque 里拿出曾经写满的 RecordBatch 收回去。
这里有个比拟重要的是,每次都只查看 deque 里的第一个 RecordBatch,只有他满了,就能够开始发送。
工夫到了
如果 RecordBatch 始终不到 16k,那音讯也不能始终寄存在缓存里吧,所以 lingerMs,默认 0ms,通过 linger.ms
进行设置。也就是当 lingerMs 毫秒后,这个 RecordBatch 如果没有 16k,也会发送进来。所以这里咱们个别不能用默认值,当默认为 0ms 的时候,Sender 每次都间接把音讯收回去,这样就失去了 RecordBatch 按批次散发的劣势了。
因为 kafka 发送失败的时候会进行重试,所以如果有重试的话,那以重试的工夫作为准,通过 retry.backoff.ms
进行设置,默认 100ms,所以重试的时候,当超过 100ms 的时候也会主动收回去。
如果过期了,这些 RecordBatch 就会从队列里移除,并且开释对应的内存资源。
其余
其余两个,散布为缓冲区敞开的时候或者有过程在 flush 的时候。
连贯
当确认到能够发送的 deque 的时候,就会把对应的 Node 给记录下来,寄存在 Node 列表中,这个 Node 是每个 partition 的 leader 节点,如果没有对应的 leader 节点,就须要进行标记,等发送的时候再从新拉取元数据信息。
遍历完每个 RecordBatch 的队列后,此时还没有达到要收回去的条件,毕竟 Node 里对应的 broker 服务器,此时还不晓得能不能连贯的上,所以还要验证有没有连贯,如果没有,看看能不能连贯的上,如果都不行,阐明这个 Node 节点是用不了了,此时就要把这个 Node 从 Node 列表中,那这个 Node 对应的音讯就不能发送了。
下面的 Node 对应的是 broker 服务器,如果 Node0 和 Node1 是一个服务器,那发送的时候,就会把同一个服务器的对应的 RecordBatch 封装成一个个的 ClientRequest 一起发送。