共计 1717 个字符,预计需要花费 5 分钟才能阅读完成。
1、生产者
生产者发送音讯流程
在音讯发送的过程中,波及到了两个线程 : main 线程和 Sender 线程。在 main 线程中创立了一个双端队列 RecordAccumulator。main 线程将音讯发送给 RecordAccumulator,Sender 线程一直从 RecordAccumulator 中拉取音讯发送到 Kafka Broker
Main 线程
Producer(send ProducerRecord) -> Interceptors(拦截器) -> Serializer(序列化器) -> Paritioner(分区器),最终放入到 RecordAccumulator(默认 32M),双端队列,最小单位是 ProducerBatch 默认 16K
Sender 线程
NetworkClient 网络通信组件,Selector 一旦有写事件,将通过 batch.size 或 linger.ms 来判断是否要发送。同时会在应用 inFlightRequests,默认每个 broker 节点最多缓存 5 个申请
留神 : linger.ms 生产环境是必须要设置的,默认是 0,就是说会外面发送数据,必定是不行的 (吞吐量太小)。 那 linger.ms 设置多大比拟好呢?没有绝对值,只有相对值,绝对谁?绝对 batch.size,batch.size 默认是 16kb,linger.ms 指定时间段内,肯定要大于 16kb 的数据
才有意义,不然每次触发的条件都是 linger.ms 了,batch.size 默认是 16kb 就失去了意义
生产者参数讲解
buffer.memory : RecordAccumulator 缓冲区总大小,默认 32m
batch.size : 16kb
linger.ms : 默认值 0,示意没有提早,立刻发送。生产环境倡议该值大小为 5-100ms 之间
acks :
0 : 生产者发送过去的数据,不须要等数据落盘应答
1 : 生产者发送过去的数据,Leader 收到数据后应答
-1(all) : 生产者发送过去的数据,Leader+ 和 isr 队列 外面的所有节点收齐数据后应答
默认值是 -1,-1 和 all 是等价的
max.in.flight.requests.per.connection : 容许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保障该值是 1-5 的数字
retries :
当音讯发送呈现谬误的时候,零碎会重发消息。retries 示意重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保障音讯的有序性,须要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败音讯的时候,其余的音讯可能发送 胜利了
retry.backoff.ms : 两次重试之间的工夫距离,默认是 100ms
enable.idempotence : 是否开启幂等性,默认 true,开启幂等性
compression.type :
生产者发送的所有数据的压缩形式。默认是 none,也就是不压缩。反对压缩类型:none、gzip、snappy、lz4 和 zstd
数据幂等 & 事务
Kafka 0.11 版本当前,引入了一项重大个性 : 幂等性和事务
数据传递语义
至多一次 (At Least Once) : ACK 级别设置为 -1 + 分区正本大于等于 2 + ISR 里应答的最小正本数量大于等于 2
最多一次 (At Most Once) : ACK 级别设置为 0
准确一次(Exactly Once) : 对于一些十分重要的信息,比方和钱相干的数据,要求数据既不能反复也不失落
幂等
幂等性就是指 Producer 不管向 Broker 发送多少次反复数据,Broker 端都只会长久化一条,保障了不反复
实现原理 :
Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。那这两个概念的用处是什么呢?
- ProducerID : 在每个 新的 Producer 初始化时,会被调配一个惟一的 ProducerID,这个 ProducerID 对客户端使用者是不可见的
- SequenceNumber : 对于每个 ProducerID,Producer 发送数据的每个 Topic 和 Partition 都对应一个从 0 开始枯燥递增的 SequenceNumber 值