一、什么是 Kafka
MQ 音讯队列作为最罕用的中间件之一,其次要个性有:解耦、异步、限流 / 削峰。
Kafka 和传统的音讯零碎 (也称作消息中间件) 都具备零碎解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等性能。与此同时,Kafka 还提供了大多数音讯零碎难以实现的音讯程序性保障及回溯生产的性能。
二、Kafka 罕用概念
2.1 Topic 与 Partition
Topic(主题)是一个 逻辑概念 ,在物理上并不存储。次要用于形容一个类型的音讯。例如咱们有一个业务零碎会发送一个形容用户订单状态的音讯,那么这一个 类型外面所有的音讯就是一个 Topic,又比方这个业务零碎同时还会发送形容会员余额的音讯,那么这个就是一个新的音讯类型,也就是 一个新的 Topic。
**Partition(分区)** 是一个物理概念,是理论存在于物理设施上的。一个 Topic 由多个 Partition 独特组成。Partition 的存在是为了进步音讯的性能与吞吐量,多个分区多个过程音讯处理速度必定要比单分区快的多。
2.2 Broker 与 Partition
Broker 作为分布式的实现,其实能够间接简略了解为一个 Kafka 过程就是一个 Broker。
咱们之前提到 Partition 是物理存在的,其物理的存在的地位就在 Broker 中。同时,为了服务具备肯定的可靠性,每一个分区都有几个正本,每个正本存在于不同的 Broker 中。
咱们之前提到的 Topic 是逻辑概念即在于此,并没有物理存在,图中每个 TopicA- x 都是一个 Partition,其中前面的数字代表了一个分区中的第几个 正本,每个 Broker 中都有不同的正本,目标就是当有 Broker 宕机时,其余的正本还存在保证系统的可用性。
此外,多个正本 Partition 中会选取一个作为 leader,其余的作为 follower。咱们的 生产者在发送数据的时候,是间接发送到 leader partition 外面 ,而后 follower partition 会去 leader 那里自行同步数据, 消费者生产数据的时候,也是从 leader 那去生产数据的。
正本处于不同的 broker 中,当 leader 正本呈现故障时,从 follower 正本中从新选举新的 leader 正本对外提供服务。Kafka 通过多正本机制实现了故障的主动转移,当 Kafka 集群中某个 broker 生效时依然能保障服务可用。
2.3 生产者消费者与 ZooKeeper
产生音讯的角色或零碎称之为生产者,例如上述某个业务零碎产生了对于订单状态的相干音讯,那么该业务零碎即为生产者。
消费者则是负责接管或者应用音讯的角色或零碎。
ZooKeeper 是 Kafka 用来负责集群元数据的治理、控制器 的选举等操作的。Producer 将音讯发送到 Broker,Broker 负责将收到的音讯存储到磁盘中,而 Consumer 负责从 Broker 订阅并生产音讯。
在每一个 Broker 在启动时都会像向 ZK 注册信息,ZK 会选取一个最早注册的 Broker 作为 Controller,前面 Controller 会与 ZK 进行数据交互获取元数据(即整个 Kafka 集群的信息,例如有那些 Broker, 每个 Broker 中有那些 Partition 等信息),而后其余 Broker 再与 Controller 交互进而所有的 Broker 都能感知到整个集群的所有信息.
2.4 消费者组
目前大部分业务零碎架构都是分布式的,即一个利用会部署多个节点。失常来说,一条音讯只应该被其中某一个节点生产掉,而不应该是所有被所有的消费者同时生产一遍。因而就产生了消费者组的概念,在一个消费者组中,一条音讯只会被消费者组中的一个消费者所生产。
从应用上来说,个别配置为一个利用为一个消费者组,或一个利用中不同的环境也能够配置不必的消费者组。例如生产环境的节点与预发环境的节点能够配置两套消费者组,这样在有新的改变部署在预发时,即时本次改变批改了生产动作的相干逻辑,也不会影响生产的数据。
消费者与生产组这种模型能够让整体的生产能力具备横向伸缩性,咱们能够减少 (或缩小) 消费者的个数来进步(或升高) 整体的生产能力。对于分区数固定的状况,一味地减少消费者 并不会让生产能力始终失去晋升,如果消费者过多,呈现了消费者的个数大于分区个数的状况,就会有消费者调配不到任何分区。参考下图(右下),一共有 8 个消费者,7 个分区,那么最初的生产 者 C7 因为调配不到任何分区而无奈生产任何音讯。
2.5 ISR、HW、LEO
Kafka 通过 ISR 机制尽量保障音讯不会失落。
一个 Partition 中所有正本称为 AR(Assigned Replicas),所有与 leader 正本放弃肯定水平同步的正本(包含 leader 正本在内) 组成 **ISR (In-Sync Replicas)。** 咱们上文提到,follower 正本只负责音讯的同步,很多时候 follower 正本中的音讯绝对 leader 正本而言会有肯定的滞后,而及时与 leader 正本保持数据统一的就能够成为 ISR 成员。与 leader 正本同步滞后过多的正本 (不包含 leader 正本) 组成OSR (Out-of-Sync Replicas),由此可见,AR=ISR+OSR。在失常状况下,所有的 follower 正本都应该与 leader 正本放弃肯定水平的同步,即 AR=ISR,OSR 汇合为空。
leader 正本会监听所有 follower 正本,当其与 leader 正本数据统一时会将其退出 ISR 成员,当与 leader 正本相差太多或宕机时会将其踢出 ISR,也会再其追上 leader 正本后重新加入 ISR。
当 leader 正本宕机或不可用时,只有 ISR 成员能力有机会被抉择为新的 leader 正本,这样就能确保新的 leader 与曾经宕机的 leader 数据统一,而如果抉择 OSR 中的正本作为 leader 时会造成局部未同步的数据失落。
上图状况中,P1 正本首先入选了 leader,且只有 P2 正本同步了 P1 的数据,offset 都为 110,那么此时的 ISR 只有 P1 与 P2,OSR 有 P3 和 P4。当 P3 同步数据到 110 后,也会被 leader 退出到 ISR 中,若此时 leader 宕机,则会从 ISR 中选出一个新的 leader,并将 P0 踢出 ISR 中。
那么 leader 是如何感知到其余正本是否与本人数据统一呢?靠的就是 HW 与 LEO 机制。
LEO 是 Log End Offset 的缩写,它标识以后日志文件中下一条待写入音讯的 offset,LEO 的大小相当于以后日志分区中最初一条音讯的 offset 值加 1。分区 ISR 汇合中的 每个正本都会保护本身的 LEO,而 ISR 汇合中最小的 LEO 即为分区的 HW,HW 是 High Watermark 的缩写,俗称高水位,它标识 了一个特定的音讯偏移量(offset),消费者只能拉取到这个 offset 之前的音讯。
上图中,因为所有正本音讯都是统一的,所以所有 LEO 都是 3,HW 也为 3,当有新的音讯产生时,即 leader 正本新插入了 3 / 4 两条音讯,此时 leader 的 LEO 为 5,两个 follower 的此时未同步音讯,所以 LEO 仍未 3,HW 抉择最小的 LEO 是 3.
当 follower1 同步实现 leader 的数据后,LEO 未 5,但 follower2 未同步,所以此时 HW 仍未 3。尔后 follower2 同步实现后,其 LEO 为 5,所有正本的 LEO 都未 5,此时 HW 抉择最小的为 5。
通过这种机制,leader 正本就能晓得那些正本是满足 ISR 条件的(该正本 LEO 是否等于 leader 正本 LEO)。
三、Kafka 全流程梳理
3.1 注册信息
Kafka 强依赖与 ZooKeeper 以保护整个集群的信息,因而在启动前应该先启动 ZooKeeper。
在 ZK 启动实现之后,所有的 Broker(即所有的 Kafka 过程)都会向 ZK 注册信息,而后争取 /controller
的监听权,获取到监听权的 Broker 称为 Controller,尔后由 Controller 与 ZK 进行信息替换,所有的 Broker 与 Controller 进行音讯替换。进而放弃整个 Kafka 集群的信息一致性。
3.2 创立主题
在所有的 Broker 注册结束后,须要注册主题(Topic)以持续后续流程。
其中某个客户端接管到创立 Topic 申请后,会将申请中的分区计划 (有几个分区、几个正本等) 通知 ZK,ZK 再将信息同步至 Controller,尔后所有的 Broker 与 Controller 替换完元数据,至此所有的 Broker 都曾经晓得该 Topic 的分区计划了,而后依照该分区计划创立本人的分区或正本即可。
以上就是某一个 broker 上面的某一个主题的散布状况
3.3 生产者发送数据
在创立完想要的 Topic 之后,生产者就能够开始发送数据。
3.3.1 封装 ProducerRecord
首先生产者会将信息封装成ProducerRecord
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
其中次要包好了要发送的 Topic 名称,要发送至那个分区,以及要发送的数据和 key。
其余的都比拟好了解,key 的作用是如果 key 存在的话,就会对 key 进行 hash,而后依据不同的后果发送至不同的分区,这样当有雷同的 key 时,所有雷同的 key 都会发送到同一个分区,咱们之前也提到,所有的新音讯都会被增加到分区的尾部,进而保障了数据的程序性。
例如咱们有个对于会员的业务零碎,其中生产者会产生对于某个会员积分的信息,消费者拿到这个音讯之后会理论对积分进行操作。如果某个会员先取得了 100 积分,而后又生产了 50 积分。因而生产者会发送两个 MQ 音讯,然而如果没有应用 key 的性能,这两个音讯被发送到了不同的分区,因为每个分区的消费水平不一样 (例如取得积分的逻辑耗时比拟长而某个分区又都是取得积分的 MQ), 就有可能造成生产 50 积分的 MQ 会先被消费者收到。
而如果此时会员积分为 0 的状况下再去生产 50 积分显著是不合理且逻辑谬误的,会造成业务零碎异样。因而在生产者发送 MQ 时如果音讯有程序性要求则肯定要将 key 赋值,具体的能够是某些有唯一性标识例如此处能够是会员 ID。
3.3.2 序列化数据、获取元数据、确定分区
首先生产则客户端的序列化器会将要发送的 ProducerRecord 对象序列化成字节数组,而后发送到生产端后生产端的反序列化器会将字节数组再转换成对应的生产对象。罕用的序列化器有 String、Doule、Long 等等。
其次也能够自定义序列化器与反序列化器,例如能够将将字节数组进行加密后再进行传输,以此保证数据的安全性。
数据都筹备实现之后就能够开始获取 broker 元数据,例如 host 等,以不便后续确定要发送的地位。
确定要发送至那个分区有几种状况:
- 如果 ProducerRecord 中指定了要发往那个分区,则抉择用户应用的分区
- 如果没有指定分区,则查看 ProducerRecord 中 key 是否为空,如果不为空则对 key 进行计算以获取应用那个分区
- 如果 key 也为空,则依照轮询的形式发送至不同的分区
也能够通过自定义分区器的形式确定发送那个分区。
3.3.3 写入缓冲区、分批分送音讯
生产者发送的 MQ 并不会间接通过网络发送至 broker,而是会先保留在生产者的缓冲区。
而后由生产者的 Sender 线程分批次将数据发送进来,分批次发送的起因是能够节俭肯定的网络耗费与晋升速度,因为一次发送一万条与一万次发送一条必定效率不太一样。
分批次发送次要有两个参数,批次量与等待时间。两个参数次要是解决两个问题,一个是避免一次发送的音讯量过大,比方一次可能发送几十 mb 的数据。另一个解决的问题是避免长时间没有足够音讯产生而导致的音讯始终不发送。因而当上述两个条件任意满足其一就会触发这一批次的发送。
Kafka 的网络模型用的是加强版的 reactor 网络模型
首先客户端发送申请全副会先发送给一个 Acceptor,broker 外面会存在 3 个线程(默认是 3 个),这 3 个线程都是叫做 processor,Acceptor 不会对客户端的申请做任何的解决,间接封装成一个个 socketChannel 发送给这些 processor 造成一个队列,发送的形式是轮询,就是先给第一个 processor 发送,而后再给第二个,第三个,而后又回到第一个。消费者线程去生产这些 socketChannel 时,会获取一个个 request 申请,这些 request 申请中就会随同着数据。
线程池外面默认有 8 个线程,这些线程是用来解决 request 的,解析申请,如果 request 是写申请,就写到磁盘里。读的话返回后果。processor 会从 response 中读取响应数据,而后再返回给客户端。这就是 Kafka 的网络三层架构。
所以如果咱们须要对 kafka 进行加强调优,减少 processor 并减少线程池外面的解决线程,就能够达到成果。request 和 response 那一块局部其实就是起到了一个缓存的成果,是思考到 processor 们生成申请太快,线程数不够不能及时处理的问题。
3.4 消费者生产数据
消费者生产也次要分为两个阶段:
- 信息注册阶段,即整个消费者组向集群注册生产信息等
- 信息生产阶段,开始信息音讯,确保音讯可靠性等
3.4.1 信息注册
首先消费者组内所有消费者都会向集群寻找本人的 Coordinator(以消费者组 id 做平衡)。找到 Coordinator 后,所有的 Consumer 都会向 Coordinator 发动 join group
退出消费者组的申请,Coordinator 会抉择一个最早发动申请的 Consumer 作为 leader Consumer,其余的 Consumer 作为 follower。
leader 会依据要生产的 Topic 及分区状况制订一个生产计划,告知给 Coordinator,Coordinator 再将此生产计划告知给各个 follower。
自此,所有的 Consumer 都曾经晓得本人要生产那个分区了。
如上图,每个消费者都找了本人要生产的分区状况
3.4.2 生产信息
生产信息次要蕴含了以下几个步骤:
1)拉取音讯
罕用的音讯队列的生产音讯个别有两种,推送或者拉取,Kafka 在此处用的是拉取模式。
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {consumer.close();
}
通过设置定时工夫,每隔多长时间拉取一次音讯。
2)反序列化与生产音讯
在下面的代码中,咱们拿到的就是 ConsumerRecord
对象,然而实际上这个是消费者客户端帮咱们做的反序列化的操作,将字节数组 (byte[]) 反序列化成了对象。参考 3.3.2 咱们也能够自定义反序列化器。
3)提交音讯位移
例如当音讯队列中有 100 条音讯,消费者第一次 生产了 20 条音讯,那么第二次生产的地位必定是要从第 21 条音讯开始生产,而记录第 21 条音讯的信息称之为 offset,offset 为曾经生产地位 +1.
在之前版本的客户端,offset 数据被存在 zk 中,每次都须要申请 zk 获取数据,而 zk 并不适宜作为高并发的申请。因而在当初的版本中,kafka 通过建设一个 Topic 来记录所有消费者生产的 offset,这个 Topic 是 __consumer_offsets
。每一个消费者在生产数据之前(即 pol() 办法中),都会把上一次生产数据中最大的 offset 提交到该 Topic 中,即此时是作为生产者的身份投递信息。
kafka 中有几种 offset 提交模式,默认的是主动提交:
enable.auto.commit
设置为 true 时,每隔 auto.commit.interval.ms
工夫会主动提交曾经曾经拉取到的音讯中最大的 offset。
然而默认的主动提交也会带来反复生产与音讯失落的问题:
- 反复生产。例如从 offset 为 21 开始拉取数据,拉取到了 40,然而当消费者解决到第 30 条数据的时候零碎宕机了,那么此时曾经提交的 offset 仍为 21,当节点从新连贯时,仍会从 21 生产,那么此时 21-30 的数据就会被从新生产。还有一种状况是再平衡时,例如有新节点退出也会引发相似的问题。
- 音讯失落。
手动同步提交
public static void main(String[] args) {while (true) {
// 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
// 模仿音讯的解决逻辑
System.out.println("revice: key ===" + record.key() + "value ====" + record.value() + "topic ===" + record.topic());
});
try {
// 解决完以后批次的音讯,在轮询更多的音讯之前,调用 commitSync 办法提交以后批次最新的音讯
consumer.commitSync();} catch (CommitFailedException e) {
//todo 事务回滚
e.printStackTrace();}
}
}
手动同步提交能够在任何时候提交 offset,例如能够每生产一条进行一次提交。提交失败之后会抛出异样,能够在异样中做出弥补机制,例如事务回滚等操作。
然而因为手动同步提交是阻塞性质的,所以不倡议太高的频率进行提交。
手动异步提交
异步提交有三种形式,区别在于有没有回调的形式。
@Test
public void asynCommit1(){while (true) {
// 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + "value ====" + record.value() + "topic ===" + record.topic());
});
consumer.commitAsync();}
}
@Test
public void asynCommit2(){while (true) {
// 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + "value ====" + record.value() + "topic ===" + record.topic());
});
// 异步回调机制
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){System.out.println(String.format("提交失败:%s", offsets.toString()));
}
}
});
}
}
@Test
public void asynCommit3(){while (true) {
// 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + "value ====" + record.value() + "topic ===" + record.topic());
});
consumer.commitAsync((offsets, exception) ->{if (exception!=null){System.out.println(String.format("提交失败:%s", offsets.toString()));
}
});
}
}
异步提交 commitAsync()
与同步提交 commitSync()
最大的区别在于异步提交不会进行重试,同步提交会始终进行主动重试,当然也能够通过再产生异样时持续提交的形式来实现此性能。
同步 + 异步
能够应用同步 + 异步的模式保证数据可能精确提交:
while (true) {ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {log.trace("Kafka 生产信息 ConsumerRecord={}",record.toString());
}
try {
// 先应用异步提交机制
consumer.commitAsync();} catch (CommitFailedException e) {
// todo 弥补机制
log.error("commitAsync failed", e)
} finally{
try {
// 再应用同步提交机制
consumer.commitSync();} catch (CommitFailedException e) {
// todo 弥补机制
log.error("commitAsync failed", e)
} finally{consumer.close();
}
}
}
四、异样场景实际
4.1 异样重试
咱们零碎之前遇到过消费者在生产音讯时,短时间内间断报错。依据景象认为是零碎呈现问题,后续发现所有报错都是同一条音讯,排查后发现是 解决音讯过程中存在未捕捉的异样,导致音讯重试,雷同的问题引发了间断报错。
JMQ 在生产过程中如果有 未捕捉的异样 会认为音讯生产失败,会首先在本地重试两次后放入重试队列中,进入重试队列的音讯,会有过期逻辑,当超过重试工夫或者超过最大重试次数后 (默认 3 天过期), 音讯将会被抛弃。因而在解决音讯时须要思考如果出现异常后的解决场景,抉择是重试还是疏忽还是记录数据后告警。
因而咱们在生产音讯的过程中,尤其是采纳 pull 模式,肯定要依据业务场景留神异样的捕捉。否则小则影响本条音讯,大则 本批次后续所有音讯都可能失落。
// 每隔 1min 拉取音讯
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60L));
for (ConsumerRecord<String, String> record : records) {
try {//doing} catch (Exception e) {
// 如果此处未捕捉音讯,会间接导致 for 循环退出,后续所有音讯都将失落
log.error("Bdp 监听工作执行失败, taskName:{}", taskName, e);
}
}
4.2 本地重试与服务端重试
零碎还遇到过在 JMQ 服务端配置了生产失败重试的逻辑,例如重试多少次距离多久,然而在生产失败之后,发现重试的逻辑并没有依照配置的逻辑走。分割运维帮忙排查后发现:
重试分为本地重试和服务端重试
依据 4.1 咱们晓得生产失败后,会首先在 本地重试 ,本地重试失败后会放入重试队列,则此时进入 服务端重试,两套重试须要两套配置,本地的重试配置在本地的配置文件中。
本地配置如下:
<jmq:consumer id="apiConsumer" transport="jmq.apilog.transport">
<!-- 配置距离 1 秒,重试 3 次 -->
<jmq:listener topic="${jmq.topic.apilog}" listener="apiLogMessageListener" retryDelay="1000" maxRetrys="3"/>
</jmq:consumer>
服务端重试配置:
作者:京东科技 韩国凯
起源:京东云开发者社区 转载请注明起源