关于java:kafka原理剖析1producer的启动和初始化

6次阅读

共计 2258 个字符,预计需要花费 6 分钟才能阅读完成。

1 外围组件程序启动

2 Partitioner
用来决定每个音讯路由到哪个分区。是个接口,外围就一个 partition 办法,返回 int 是应用哪一个 partition。实现类是 DefaultPartitioner, 实现了 partition,并且用了一个原子自增随机数,

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

抉择 partition 的时候有两个状况
(1)如果音讯没指定 key,则 counter+1,依据集群元数据信息,获得所有可用 partition,而后 counter 和 partition 数量取模,失去应用哪个 partition,这个成果就是轮询。

(2)如果指定了 key,那么通过工具类对 key 取 hash,失去的 int 还是和 partition 个数取模。所以同样的 key 能够固定发某个 partition。

3 Metadata- 元数据组件
从 broker 集群拉取元数据,包含 topic-partitions 的状况,每个 partition 的 leader 和 follower 的地位等。
new 的时候传入两个重要配置:

retry.backoff.ms:拉取元数据如果失败,距离多久再去拉,默认 100ms,metadata.max.age.ms:每隔一段时间更新本人的元数据,默认 5 分钟

随着启动,回去调用 update 办法,这里只是把咱们手动配置的 broker 地址(bootstrap.servers)设置到了 cluster 对象,并没有本质的发送申请去拉元数据。为什么这样做?因为元数据可能很多,发消息时用到什么 topic 再去拉
更好,也是提早加载的一个体现。

4 RecordAccumulator – 音讯缓存器
负责音讯的接管和缓存,把药发送到每个 partition 的音讯打包成批次 batch,每个批次满了发送;或者批次始终没满,到了肯定工夫必须发送。new 的时候传入几个外围参数:

batch.size:批次大小,默认 16k
buffer.memory:缓存总大小,默认 32m。超过缓存后再发消息默认 block60s,maxBlockTimeMs 配置
compression.type:压缩类型,gzip 这种,默认不压缩。retry.backoff.ms:失败重试距离,默认 100ms。

(1)外围 append 办法,发消息的时候用 append,目标是追加到 partition 对应的最初一个批次的最初地位,
(2)append 中还蕴含了申请小块内存(搭配批次),创立批次等内容。

5 NetworkClient – 网络通信组件
封装了网络相干内容,外围就是 nio,从参数里封装过的 selector 就能看进去。外围参数

connections.max.idle.ms 默认值为 9 分钟,一个网络连接最多闲暇多长时间,超过就回收,倡议比服务器 10 分钟小一点。max.in.flight.requests.per.connection 默认 5,该值示意每个网络连接能够忍耐的 producer 端发送给 broker 音讯而后音讯没有响应的个数。reconnect.backoff.ms 连贯重连工夫
send.buffer.bytes socket 发送缓冲区大小,默认是 128K
receive.buffer.bytes socket 接收数据的缓冲区大小,默认是 32K

request.timeout.ms:申请超时工夫,默认 30s

6 Sender – 数据发送组件
从缓冲区拿数据发。是个线程,持有网络组件 NetworkClient 和 RecordAccumulator,串联缓冲区和网络组件,传入参数:

max.in.flight.requests.per.connection  该参数指定了生产者在收到服务器响应之前能够发送多少个音讯。它的值越高,就会占用越多的内存,不过也会晋升吞吐量。把它设为 1 能够保障音讯是依照发送的程序写入服务器。设置的多,那么其中有局部 batch 失败重试的话,就乱序了。request.timeout.ms:申请超时工夫,默认 30s
max.request.size 一次申请最多发送多大,依据这个决定发多少个 batch。acks:0 发送进来就返回,不论成功失败;1 默认,写入 leader 后返回;-1 isr 都写入才返回。retries:重试次数

7 总结下初始化波及到的一些重要参数
(1)metadata

retry.backoff.ms:拉取元数据如果失败,距离多久再去拉,默认 100ms,metadata.max.age.ms:每隔一段时间更新本人的元数据,默认 5 分钟

(2)RecordAccumulator

batch.size:批次大小,默认 16k
buffer.memory:缓存总大小,默认 32m。超过缓存后再发消息默认 block60s,maxBlockTimeMs 配置
compression.type:压缩类型,gzip 这种,默认不压缩。retry.backoff.ms:失败重试距离,默认 100ms。

(3)Sender:

request.timeout.ms:申请超时工夫,默认 30s
max.request.size 一次申请最多发送多大,依据这个决定发多少个 batch。默认 1m,能够大点到 10m。acks:0 发送进来就返回,不论成功失败;1 默认,写入 leader 后返回;-1 isr 都写入才返回。retries:重试次数 

从参数也能够看进去,各自组件负责各自的参数,还是比拟明确的。

正文完
 0