1 外围组件程序启动

2 Partitioner
用来决定每个音讯路由到哪个分区。是个接口,外围就一个partition办法,返回int是应用哪一个partition。实现类是DefaultPartitioner, 实现了partition,并且用了一个原子自增随机数,

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

抉择partition的时候有两个状况
(1)如果音讯没指定key,则counter+1,依据集群元数据信息,获得所有可用partition,而后counter和partition数量取模,失去应用哪个partition,这个成果就是轮询。

(2)如果指定了key,那么通过工具类对key取hash,失去的int还是和partition个数取模。所以同样的key能够固定发某个partition。

3 Metadata- 元数据组件
从broker集群拉取元数据,包含topic-partitions的状况,每个partition的leader和follower的地位等。
new的时候传入两个重要配置:

retry.backoff.ms:拉取元数据如果失败,距离多久再去拉,默认100ms,metadata.max.age.ms:每隔一段时间更新本人的元数据,默认5分钟

随着启动,回去调用update办法,这里只是把咱们手动配置的broker地址(bootstrap.servers)设置到了cluster对象,并没有本质的发送申请去拉元数据。为什么这样做?因为元数据可能很多,发消息时用到什么topic再去拉
更好,也是提早加载的一个体现。

4 RecordAccumulator - 音讯缓存器
负责音讯的接管和缓存,把药发送到每个partition的音讯打包成批次batch,每个批次满了发送;或者批次始终没满,到了肯定工夫必须发送。new的时候传入几个外围参数:

batch.size:批次大小,默认16kbuffer.memory:缓存总大小,默认32m。超过缓存后再发消息默认block60s,maxBlockTimeMs配置compression.type:压缩类型,gzip这种,默认不压缩。retry.backoff.ms:失败重试距离,默认100ms。

(1)外围append办法,发消息的时候用append,目标是追加到partition对应的最初一个批次的最初地位,
(2)append中还蕴含了申请小块内存(搭配批次),创立批次等内容。

5 NetworkClient - 网络通信组件
封装了网络相干内容,外围就是nio,从参数里封装过的selector就能看进去。外围参数

connections.max.idle.ms 默认值为9分钟,一个网络连接最多闲暇多长时间,超过就回收,倡议比服务器10分钟小一点。max.in.flight.requests.per.connection 默认5,该值示意每个网络连接能够忍耐的producer端发送给broker音讯而后音讯没有响应的个数。reconnect.backoff.ms 连贯重连工夫send.buffer.bytes socket发送缓冲区大小,默认是128Kreceive.buffer.bytes socket接收数据的缓冲区大小,默认是32K

request.timeout.ms:申请超时工夫,默认30s

6 Sender - 数据发送组件
从缓冲区拿数据发。是个线程,持有网络组件NetworkClient和RecordAccumulator,串联缓冲区和网络组件,传入参数:

max.in.flight.requests.per.connection  该参数指定了生产者在收到服务器响应之前能够发送多少个音讯。它的值越高,就会占用越多的内存,不过也会晋升吞吐量。把它设为 1 能够保障音讯是依照发送的程序写入服务器。设置的多,那么其中有局部batch失败重试的话,就乱序了。request.timeout.ms:申请超时工夫,默认30smax.request.size 一次申请最多发送多大,依据这个决定发多少个batch。acks : 0 发送进来就返回,不论成功失败;1 默认,写入leader后返回;-1 isr都写入才返回。retries : 重试次数

7 总结下初始化波及到的一些重要参数
(1)metadata

retry.backoff.ms:拉取元数据如果失败,距离多久再去拉,默认100ms,metadata.max.age.ms:每隔一段时间更新本人的元数据,默认5分钟

(2)RecordAccumulator

batch.size:批次大小,默认16kbuffer.memory:缓存总大小,默认32m。超过缓存后再发消息默认block60s,maxBlockTimeMs配置compression.type:压缩类型,gzip这种,默认不压缩。retry.backoff.ms:失败重试距离,默认100ms。

(3)Sender:

request.timeout.ms:申请超时工夫,默认30smax.request.size 一次申请最多发送多大,依据这个决定发多少个batch。默认1m,能够大点到10m。acks : 0 发送进来就返回,不论成功失败;1 默认,写入leader后返回;-1 isr都写入才返回。retries : 重试次数 

从参数也能够看进去,各自组件负责各自的参数,还是比拟明确的。