一、什么是Kafka
MQ音讯队列作为最罕用的中间件之一,其次要个性有:解耦、异步、限流/削峰。
Kafka 和传统的音讯零碎(也称作消息中间件)都具备零碎解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等性能。与此同时,Kafka 还提供了大多数音讯零碎难以实现的音讯程序性保障及回溯生产的性能。
二、Kafka罕用概念
2.1 Topic与Partition
Topic(主题)是一个逻辑概念,在物理上并不存储。次要用于形容一个类型的音讯。例如咱们有一个业务零碎会发送一个形容用户订单状态的音讯,那么这一个类型外面所有的音讯就是一个Topic,又比方这个业务零碎同时还会发送形容会员余额的音讯,那么这个就是一个新的音讯类型,也就是一个新的Topic。
**Partition(分区)**是一个物理概念,是理论存在于物理设施上的。一个Topic由多个Partition独特组成。Partition的存在是为了进步音讯的性能与吞吐量,多个分区多个过程音讯处理速度必定要比单分区快的多。
2.2 Broker与Partition
Broker作为分布式的实现,其实能够间接简略了解为一个Kafka过程就是一个Broker。
咱们之前提到Partition是物理存在的,其物理的存在的地位就在Broker中。同时,为了服务具备肯定的可靠性,每一个分区都有几个正本,每个正本存在于不同的Broker中。
咱们之前提到的Topic是逻辑概念即在于此,并没有物理存在,图中每个TopicA-x都是一个Partition,其中前面的数字代表了一个分区中的第几个正本,每个Broker中都有不同的正本,目标就是当有Broker宕机时,其余的正本还存在保证系统的可用性。
此外,多个正本Partition中会选取一个作为leader,其余的作为follower。咱们的生产者在发送数据的时候,是间接发送到leader partition外面,而后follower partition会去leader那里自行同步数据,消费者生产数据的时候,也是从leader那去生产数据的。
正本处于不同的 broker 中,当 leader 正本呈现故障时,从 follower 正本中从新选举新的 leader 正本对外提供服务。Kafka 通过多正本机制实现了故障的主动转移,当 Kafka 集群中某个 broker 生效时依然能保障服务可用。
2.3 生产者消费者与ZooKeeper
产生音讯的角色或零碎称之为生产者,例如上述某个业务零碎产生了对于订单状态的相干音讯,那么该业务零碎即为生产者。
消费者则是负责接管或者应用音讯的角色或零碎。
ZooKeeper 是 Kafka 用来负责集群元数据的治理、控制器 的选举等操作的。Producer 将音讯发送到 Broker,Broker 负责将收到的音讯存储到磁盘中,而 Consumer 负责从 Broker 订阅并生产音讯。
在每一个Broker在启动时都会像向ZK注册信息,ZK会选取一个最早注册的Broker作为Controller,前面Controller会与ZK进行数据交互获取元数据(即整个Kafka集群的信息,例如有那些Broker,每个Broker中有那些Partition等信息),而后其余Broker再与Controller交互进而所有的Broker都能感知到整个集群的所有信息.
2.4 消费者组
目前大部分业务零碎架构都是分布式的,即一个利用会部署多个节点。失常来说,一条音讯只应该被其中某一个节点生产掉,而不应该是所有被所有的消费者同时生产一遍。因而就产生了消费者组的概念,在一个消费者组中,一条音讯只会被消费者组中的一个消费者所生产。
从应用上来说,个别配置为一个利用为一个消费者组,或一个利用中不同的环境也能够配置不必的消费者组。例如生产环境的节点与预发环境的节点能够配置两套消费者组,这样在有新的改变部署在预发时,即时本次改变批改了生产动作的相干逻辑,也不会影响生产的数据。
消费者与生产组这种模型能够让整体的生产能力具备横向伸缩性,咱们能够减少(或缩小) 消费者的个数来进步(或升高)整体的生产能力。对于分区数固定的状况,一味地减少消费者 并不会让生产能力始终失去晋升,如果消费者过多,呈现了消费者的个数大于分区个数的状况, 就会有消费者调配不到任何分区。参考下图(右下),一共有 8 个消费者,7 个分区,那么最初的生产 者 C7 因为调配不到任何分区而无奈生产任何音讯。
2.5 ISR、HW、LEO
Kafka通过ISR机制尽量保障音讯不会失落。
一个Partition中所有正本称为AR(Assigned Replicas),所有与 leader 正本放弃肯定水平同步的正本(包含 leader 正本在内)组成 **ISR (In-Sync Replicas)。**咱们上文提到,follower 正本只负责音讯的同步,很多时候 follower 正本中的音讯绝对 leader 正本而言会有肯定的滞后,而及时与leader正本保持数据统一的就能够成为ISR成员。与 leader 正本同步滞后过多的正本(不包含 leader 正本)组成OSR (Out-of-Sync Replicas),由此可见,AR=ISR+OSR。 在失常状况下,所有的 follower 正本都应该与 leader 正本放弃肯定水平的同步,即 AR=ISR, OSR 汇合为空。
leader正本会监听所有follower正本,当其与leader正本数据统一时会将其退出ISR成员,当与leader正本相差太多或宕机时会将其踢出ISR,也会再其追上leader正本后重新加入ISR。
当leader正本宕机或不可用时,只有ISR成员能力有机会被抉择为新的leader正本,这样就能确保新的leader与曾经宕机的leader数据统一,而如果抉择OSR中的正本作为leader时会造成局部未同步的数据失落。
上图状况中,P1正本首先入选了leader,且只有P2正本同步了P1的数据,offset都为110,那么此时的ISR只有P1与P2,OSR有P3和P4。当P3同步数据到110后,也会被leader退出到ISR中,若此时leader宕机,则会从ISR中选出一个新的leader,并将P0踢出ISR中。
那么leader是如何感知到其余正本是否与本人数据统一呢?靠的就是HW与LEO机制。
LEO 是 Log End Offset 的缩写,它标识以后日志文件中下一条待写入音讯的 offset,LEO 的大小相当于以后日志分区中最初一条音讯的 offset 值加 1。分区 ISR 汇合中的每个正本都会保护本身的 LEO,而 ISR 汇合中最小的 LEO 即为分区的 HW,HW 是 High Watermark 的缩写,俗称高水位,它标识 了一个特定的音讯偏移量(offset),消费者只能拉取到这个 offset 之前的音讯。
上图中,因为所有正本音讯都是统一的,所以所有LEO都是3,HW也为3,当有新的音讯产生时,即leader正本新插入了3/4两条音讯,此时leader的LEO为5,两个follower的此时未同步音讯,所以LEO仍未3,HW抉择最小的LEO是3.
当follower1同步实现leader的数据后,LEO未5,但follower2未同步,所以此时HW仍未3。尔后follower2同步实现后,其LEO为5,所有正本的LEO都未5,此时HW抉择最小的为5。
通过这种机制,leader正本就能晓得那些正本是满足ISR条件的(该正本LEO是否等于leader正本LEO)。
三、Kafka全流程梳理
3.1 注册信息
Kafka强依赖与ZooKeeper以保护整个集群的信息,因而在启动前应该先启动ZooKeeper。
在ZK启动实现之后,所有的Broker(即所有的Kafka过程)都会向ZK注册信息,而后争取/controller
的监听权,获取到监听权的Broker称为Controller,尔后由Controller与ZK进行信息替换,所有的Broker与Controller进行音讯替换。进而放弃整个Kafka集群的信息一致性。
3.2 创立主题
在所有的Broker注册结束后,须要注册主题(Topic)以持续后续流程。
其中某个客户端接管到创立Topic申请后,会将申请中的分区计划(有几个分区、几个正本等)通知ZK,ZK再将信息同步至Controller,尔后所有的Broker与Controller替换完元数据,至此所有的Broker都曾经晓得该Topic的分区计划了,而后依照该分区计划创立本人的分区或正本即可。
以上就是某一个broker上面的某一个主题的散布状况
3.3 生产者发送数据
在创立完想要的Topic之后,生产者就能够开始发送数据。
3.3.1 封装ProducerRecord
首先生产者会将信息封装成ProducerRecord
private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;
其中次要包好了要发送的Topic名称,要发送至那个分区,以及要发送的数据和key。
其余的都比拟好了解,key的作用是如果key存在的话,就会对key进行hash,而后依据不同的后果发送至不同的分区,这样当有雷同的key时,所有雷同的key都会发送到同一个分区,咱们之前也提到,所有的新音讯都会被增加到分区的尾部,进而保障了数据的程序性。
例如咱们有个对于会员的业务零碎,其中生产者会产生对于某个会员积分的信息,消费者拿到这个音讯之后会理论对积分进行操作。如果某个会员先取得了100积分,而后又生产了50积分。因而生产者会发送两个MQ音讯,然而如果没有应用key的性能,这两个音讯被发送到了不同的分区,因为每个分区的消费水平不一样(例如取得积分的逻辑耗时比拟长而某个分区又都是取得积分的MQ),就有可能造成生产50积分的MQ会先被消费者收到。
而如果此时会员积分为0的状况下再去生产50积分显著是不合理且逻辑谬误的,会造成业务零碎异样。因而在生产者发送MQ时如果音讯有程序性要求则肯定要将key赋值,具体的能够是某些有唯一性标识例如此处能够是会员ID。
3.3.2 序列化数据、获取元数据、确定分区
首先生产则客户端的序列化器会将要发送的ProducerRecord对象序列化成字节数组 ,而后发送到生产端后生产端的反序列化器会将字节数组再转换成对应的生产对象。罕用的序列化器有String、Doule、Long等等。
其次也能够自定义序列化器与反序列化器,例如能够将将字节数组进行加密后再进行传输,以此保证数据的安全性。
数据都筹备实现之后就能够开始获取broker元数据,例如host等,以不便后续确定要发送的地位。
确定要发送至那个分区有几种状况:
- 如果ProducerRecord中指定了要发往那个分区,则抉择用户应用的分区
- 如果没有指定分区,则查看ProducerRecord中key是否为空,如果不为空则对key进行计算以获取应用那个分区
- 如果key也为空,则依照轮询的形式发送至不同的分区
也能够通过自定义分区器的形式确定发送那个分区。
3.3.3 写入缓冲区、分批分送音讯
生产者发送的MQ并不会间接通过网络发送至broker,而是会先保留在生产者的缓冲区。
而后由生产者的Sender线程分批次将数据发送进来,分批次发送的起因是能够节俭肯定的网络耗费与晋升速度,因为一次发送一万条与一万次发送一条必定效率不太一样。
分批次发送次要有两个参数,批次量与等待时间。两个参数次要是解决两个问题,一个是避免一次发送的音讯量过大,比方一次可能发送几十mb的数据。另一个解决的问题是避免长时间没有足够音讯产生而导致的音讯始终不发送。因而当上述两个条件任意满足其一就会触发这一批次的发送。
Kafka的网络模型用的是加强版的reactor网络模型
首先客户端发送申请全副会先发送给一个Acceptor,broker外面会存在3个线程(默认是3个),这3个线程都是叫做processor,Acceptor不会对客户端的申请做任何的解决,间接封装成一个个socketChannel发送给这些processor造成一个队列,发送的形式是轮询,就是先给第一个processor发送,而后再给第二个,第三个,而后又回到第一个。消费者线程去生产这些socketChannel时,会获取一个个request申请,这些request申请中就会随同着数据。
线程池外面默认有8个线程,这些线程是用来解决request的,解析申请,如果request是写申请,就写到磁盘里。读的话返回后果。 processor会从response中读取响应数据,而后再返回给客户端。这就是Kafka的网络三层架构。
所以如果咱们须要对kafka进行加强调优,减少processor并减少线程池外面的解决线程,就能够达到成果。request和response那一块局部其实就是起到了一个缓存的成果,是思考到processor们生成申请太快,线程数不够不能及时处理的问题。
3.4 消费者生产数据
消费者生产也次要分为两个阶段:
- 信息注册阶段,即整个消费者组向集群注册生产信息等
- 信息生产阶段,开始信息音讯,确保音讯可靠性等
3.4.1 信息注册
首先消费者组内所有消费者都会向集群寻找本人的Coordinator(以消费者组id做平衡)。找到Coordinator后,所有的Consumer都会向Coordinator发动join group
退出消费者组的申请,Coordinator会抉择一个最早发动申请的Consumer作为leader Consumer,其余的Consumer作为follower。
leader会依据要生产的Topic及分区状况制订一个生产计划,告知给Coordinator,Coordinator再将此生产计划告知给各个follower。
自此,所有的Consumer都曾经晓得本人要生产那个分区了。
如上图,每个消费者都找了本人要生产的分区状况
3.4.2 生产信息
生产信息次要蕴含了以下几个步骤:
1)拉取音讯
罕用的音讯队列的生产音讯个别有两种,推送或者拉取,Kafka在此处用的是拉取模式。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<String, String> record : records) { int updateCount = 1; if (map.containsKey(record.value())) { updateCount = (int) map.get(record.value() + 1); } map.put(record.value(), updateCount); } }}finally { consumer.close();}
通过设置定时工夫,每隔多长时间拉取一次音讯。
2)反序列化与生产音讯
在下面的代码中,咱们拿到的就是ConsumerRecord
对象,然而实际上这个是消费者客户端帮咱们做的反序列化的操作,将字节数组(byte[])反序列化成了对象。参考3.3.2咱们也能够自定义反序列化器。
3)提交音讯位移
例如当音讯队列中有100条音讯,消费者第一次生产了20条音讯,那么第二次生产的地位必定是要从第21条音讯开始生产,而记录第21条音讯的信息称之为offset,offset为曾经生产地位+1.
在之前版本的客户端,offset数据被存在zk中,每次都须要申请zk获取数据,而zk并不适宜作为高并发的申请。因而在当初的版本中,kafka通过建设一个Topic来记录所有消费者生产的offset,这个Topic是__consumer_offsets
。每一个消费者在生产数据之前(即pol()办法中),都会把上一次生产数据中最大的offset提交到该Topic中,即此时是作为生产者的身份投递信息。
kafka中有几种offset提交模式,默认的是主动提交:
enable.auto.commit
设置为true时,每隔auto.commit.interval.ms
工夫会主动提交曾经曾经拉取到的音讯中最大的offset。
然而默认的主动提交也会带来反复生产与音讯失落的问题:
- 反复生产。例如从offset为21开始拉取数据,拉取到了40,然而当消费者解决到第30条数据的时候零碎宕机了,那么此时曾经提交的offset仍为21,当节点从新连贯时,仍会从21生产,那么此时21-30的数据就会被从新生产。还有一种状况是再平衡时,例如有新节点退出也会引发相似的问题。
- 音讯失落。
手动同步提交
public static void main(String[] args) { while (true) { // 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { // 模仿音讯的解决逻辑 System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); try { //解决完以后批次的音讯,在轮询更多的音讯之前,调用commitSync办法提交以后批次最新的音讯 consumer.commitSync(); } catch (CommitFailedException e) { //todo 事务回滚 e.printStackTrace(); } }}
手动同步提交能够在任何时候提交offset,例如能够每生产一条进行一次提交。提交失败之后会抛出异样,能够在异样中做出弥补机制,例如事务回滚等操作。
然而因为手动同步提交是阻塞性质的,所以不倡议太高的频率进行提交。
手动异步提交
异步提交有三种形式,区别在于有没有回调的形式。
@Testpublic void asynCommit1(){ while (true) { // 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync(); }}@Testpublic void asynCommit2(){ while (true) { // 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); // 异步回调机制 consumer.commitAsync(new OffsetCommitCallback(){ @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } } }); }}@Testpublic void asynCommit3(){ while (true) { // 这里的参数指的是轮询的工夫距离,也就是多长时间去拉一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000)); records.forEach((ConsumerRecord<String, String> record) -> { System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic()); }); consumer.commitAsync((offsets, exception) ->{ if (exception!=null){ System.out.println(String.format("提交失败:%s", offsets.toString())); } }); }}
异步提交commitAsync()
与同步提交commitSync()
最大的区别在于异步提交不会进行重试,同步提交会始终进行主动重试,当然也能够通过再产生异样时持续提交的形式来实现此性能。
同步+异步
能够应用同步+异步的模式保证数据可能精确提交:
while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { log.trace("Kafka生产信息ConsumerRecord={}",record.toString()); } try { //先应用异步提交机制 consumer.commitAsync(); } catch (CommitFailedException e) { // todo 弥补机制 log.error("commitAsync failed", e) } finally{ try { //再应用同步提交机制 consumer.commitSync(); } catch (CommitFailedException e) { // todo 弥补机制 log.error("commitAsync failed", e) } finally{ consumer.close(); } }}
四、异样场景实际
4.1 异样重试
咱们零碎之前遇到过消费者在生产音讯时,短时间内间断报错。依据景象认为是零碎呈现问题,后续发现所有报错都是同一条音讯,排查后发现是解决音讯过程中存在未捕捉的异样,导致音讯重试,雷同的问题引发了间断报错。
JMQ在生产过程中如果有未捕捉的异样会认为音讯生产失败,会首先在本地重试两次后放入重试队列中,进入重试队列的音讯,会有过期逻辑,当超过重试工夫或者超过最大重试次数后(默认3天过期),音讯将会被抛弃。因而在解决音讯时须要思考如果出现异常后的解决场景,抉择是重试还是疏忽还是记录数据后告警。
因而咱们在生产音讯的过程中,尤其是采纳pull模式,肯定要依据业务场景留神异样的捕捉。否则小则影响本条音讯,大则本批次后续所有音讯都可能失落。
//每隔1min拉取音讯ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60L));for (ConsumerRecord<String, String> record : records) { try { //doing } catch (Exception e) { //如果此处未捕捉音讯,会间接导致for循环退出,后续所有音讯都将失落 log.error("Bdp监听工作执行失败, taskName:{}", taskName, e); }}
4.2 本地重试与服务端重试
零碎还遇到过在JMQ服务端配置了生产失败重试的逻辑,例如重试多少次距离多久,然而在生产失败之后,发现重试的逻辑并没有依照配置的逻辑走。分割运维帮忙排查后发现:
重试分为本地重试和服务端重试
依据4.1咱们晓得生产失败后,会首先在本地重试,本地重试失败后会放入重试队列,则此时进入服务端重试,两套重试须要两套配置,本地的重试配置在本地的配置文件中。
本地配置如下:
<jmq:consumer id="apiConsumer" transport="jmq.apilog.transport"> <!--配置距离1秒,重试3次--> <jmq:listener topic="${jmq.topic.apilog}" listener="apiLogMessageListener" retryDelay="1000" maxRetrys="3"/></jmq:consumer>
服务端重试配置:
作者:京东科技 韩国凯
起源:京东云开发者社区 转载请注明起源