1、生产者

生产者发送音讯流程

在音讯发送的过程中,波及到了两个线程 : main线程和Sender线程。在main线程中创立了一个双端队列RecordAccumulator。main线程将音讯发送给 RecordAccumulator,Sender线程一直从RecordAccumulator中拉取音讯发送到Kafka Broker

Main线程

Producer(send ProducerRecord) -> Interceptors(拦截器) -> Serializer(序列化器) -> Paritioner(分区器),最终放入到 RecordAccumulator(默认32M),双端队列,最小单位是ProducerBatch 默认16K

Sender线程

NetworkClient 网络通信组件,Selector一旦有写事件,将通过 batch.size或linger.ms 来判断是否要发送。同时会在应用inFlightRequests,默认每个broker节点最多缓存5个申请

留神 : linger.ms生产环境是必须要设置的,默认是0,就是说会外面发送数据,必定是不行的(吞吐量太小)。那linger.ms设置多大比拟好呢?没有绝对值,只有相对值,绝对谁?绝对batch.size,batch.size默认是16kb,linger.ms指定时间段内,肯定要大于16kb的数据
才有意义,不然每次触发的条件都是linger.ms了,batch.size默认是16kb就失去了意义

生产者参数讲解

buffer.memory : RecordAccumulator缓冲区总大小,默认32m
batch.size : 16kb
linger.ms : 默认值0,示意没有提早,立刻发送。生产环境倡议该值大小为 5-100ms 之间
acks :
0 : 生产者发送过去的数据,不须要等数据落盘应答
1 : 生产者发送过去的数据,Leader 收到数据后应答
-1(all) : 生产者发送过去的数据,Leader+和 isr 队列 外面的所有节点收齐数据后应答
默认值是-1,-1 和 all 是等价的
max.in.flight.requests.per.connection : 容许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保障该值是 1-5 的数字
retries :
当音讯发送呈现谬误的时候,零碎会重发消息。retries 示意重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保障音讯的有序性,须要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败音讯的时候,其余的音讯可能发送 胜利了
retry.backoff.ms : 两次重试之间的工夫距离,默认是 100ms
enable.idempotence : 是否开启幂等性,默认 true,开启幂等性
compression.type :
生产者发送的所有数据的压缩形式。默认是none,也就是不压缩。反对压缩类型:none、gzip、snappy、lz4和zstd

数据幂等 & 事务

Kafka 0.11版本当前,引入了一项重大个性 : 幂等性和事务

数据传递语义

至多一次(At Least Once) : ACK级别设置为-1 + 分区正本大于等于2 + ISR里应答的最小正本数量大于等于2
最多一次(At Most Once) : ACK级别设置为0
准确一次(Exactly Once) : 对于一些十分重要的信息,比方和钱相干的数据,要求数据既不能反复也不失落

幂等

幂等性就是指Producer不管向Broker发送多少次反复数据,Broker端都只会长久化一条,保障了不反复
实现原理 :
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用处是什么呢?

  • ProducerID : 在每个新的Producer初始化时,会被调配一个惟一的ProducerID,这个ProducerID对客户端使用者是不可见的
  • SequenceNumber : 对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始枯燥递增的SequenceNumber值

2、broker

3、消费者