深刻利用

4.1 springboot-kafka

1)配置文件

  kafka:    bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904    producer: # producer 生产者      retries: 0 # 重试次数      acks: 1 # 应答级别:多少个分区正本备份实现时向生产者发送ack确认(可选0、1、all/-1)      batch-size: 16384 # 一次最多发送数据量      buffer-memory: 33554432 # 生产端缓冲区大小      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer: # consumer消费者      group-id: javagroup # 默认的生产组ID      enable-auto-commit: true # 是否主动提交offset      auto-commit-interval: 100 # 提交offset延时(接管到音讯后多久提交offset)      auto-offset-reset: latest  #earliest,latest      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2)启动信息

4.2 音讯发送

4.2.1 发送类型

KafkaTemplate调用send时默认采纳异步发送,如果须要同步获取发送后果,调用get办法

具体代码参考:AsyncProducer.java

消费者应用:KafkaConsumer.java

1)同步发送

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));                //留神,能够设置等待时间,超出后,不再等待后果        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);        logger.info("send result:{}",result.getProducerRecord().value());

通过swagger发送,控制台能够失常打印send result

2)阻断

在服务器上,将kafka暂停服务

docker-compose pause kafka-1 kafka-2

在swagger发送音讯

调同步发送:申请被阻断,始终期待,超时后返回谬误

而调异步发送的(默认发送接口),申请立即返回。

那么,异步发送的音讯怎么确认发送状况呢???往下看!

3)注册监听

代码参考: KafkaListener.java

能够给kafkaTemplate设置Listener来监听音讯发送状况,实现外部的对应办法

 kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});

查看控制台,期待一段时间后,异步发送失败的音讯会被回调给注册过的listener

com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}

启动kafka

docker-compose unpause kafka-1 kafka-2

再次发送音讯时,同步异步均能够失常收发,并且监听进入success回调

com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]

能够看到,在内部类 KafkaListener$1 中,即注册的Listener的音讯。

4.2.2 序列化

消费者应用:KafkaConsumer.java

1)序列化详解

  • 后面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
  • 基本上,能够满足绝大多数场景

2)自定义序列化

本人实现,实现对应的接口即可,有以下办法:

public interface Serializer<T> extends Closeable {    default void configure(Map<String, ?> configs, boolean isKey) {    }      //实践上,只实现这个即可失常运行    byte[] serialize(String var1, T var2);      //默认调下面的办法    default byte[] serialize(String topic, Headers headers, T data) {        return this.serialize(topic, data);    }    default void close() {    }}

案例,参考: MySerializer.java

在yaml中配置本人的编码器

value-serializer: com.itheima.demo.config.MySerializer

从新发送,发现:音讯发送端编码回调一切正常。然而生产端音讯内容不对!

com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"

怎么办?

3)解码

发送端有编码并且咱们本人定义了编码,那么接收端天然要装备对应的解码策略

代码参考:MyDeserializer.java,实现形式与编码器简直一样!

在yaml中配置本人的解码器

value-deserializer: com.itheima.demo.config.MyDeserializer

再次收发,音讯失常

com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}

4.2.3 分区策略

分区策略决定了音讯依据key投放到哪个分区,也是程序生产保障的基石。

  • 给定了分区号,间接将数据发送到指定的分区外面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,间接轮循进行分区
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规定

发送者代码参考:PartitionProducer.java

消费者代码应用:PartitionConsumer.java

通过swagger拜访setKey:


看控制台:

再拜访setPartition来设置分区号0来发送


看控制台:

2)自定义分区

你想本人定义规定,依据我的要求,把音讯投放到对应的分区去? 能够!

参考代码:MyPartitioner.java , MyPartitionTemplate.java ,

发送应用:MyPartitionProducer.java

应用swagger,发送0结尾和非0结尾两种key试一试!

备注:

本人定义config参数,比拟麻烦,须要突破默认的KafkaTemplate设置

能够将KafkaConfiguration.java中的getTemplate加上@Bean注解来笼罩零碎默认bean

这里为了防止混同,采纳@Autowire注入

4.3 音讯生产

4.3.1 音讯组别

发送者应用:KafkaProducer.java

1)代码参考:GroupConsumer.java,Listener拷贝3份,别离赋予两组group,验证分组生产:

2)启动

3)通过swagger发送2条音讯

  • 同一group下的两个消费者,在group1均分音讯
  • group2下只有一个消费者,失去全副音讯

4)生产端闲置

留神分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会呈现消费者闲置,浪费资源!

验证形式:

停掉我的项目,删掉test主题,从新建一个 ,这次只给它调配一个分区。

从新发送两条音讯,试一试

解析:

group2能够生产到1、2两条音讯

group1下有两个消费者,然而只调配给了 -1 , -2这个过程被闲置

4.3.2 位移提交

1)主动提交

后面的案例中,咱们设置了以下两个选项,则kafka会按延时设置主动提交

enable-auto-commit: true # 是否主动提交offsetauto-commit-interval: 100  # 提交offset延时(接管到音讯后多久提交offset)

2)手动提交

有些时候,咱们须要手动管制偏移量的提交机会,比方确保音讯严格生产后再提交,以避免失落或反复。

上面咱们本人定义配置,笼罩下面的参数

代码参考:MyOffsetConfig.java

通过在生产端的Consumer来提交偏移量,有如下几种形式:

代码参考:MyOffsetConsumer.java

同步提交、异步提交:manualCommit() ,同步异步的差异,上面会具体讲到。

指定偏移量提交:offset()

3)反复生产问题

如果手动提交模式被关上,肯定不要遗记提交偏移量。否则会造成反复生产!

代码参考和比照:manualCommit() , noCommit()

验证过程:

用km将test主题删除,新建一个test空主题。不便察看音讯偏移
正文掉其余Consumer的Component注解,只保留以后MyOffsetConsumer.java
启动我的项目,应用swagger的KafkaProducer发送间断几条音讯
留心控制台,都能生产,没问题:


然而!重启试试:


无论重启多少次,不提交偏移量的生产组,会反复生产一遍!!!

再通过命令行查问偏移量试试:

4)教训与总结

commitSync()办法,即同步提交,会提交最初一个偏移量。在胜利提交或碰到无怯复原的谬误之前,commitSync()会始终重试,然而commitAsync()不会。这就造成一个陷阱:如果异步提交,针对偶然呈现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为长期问题导致的,那么后续的提交总会有胜利的。只有胜利一次,偏移量就会提交下来。然而!如果这是产生在敞开消费者时的最初一次提交,就要确保可能提交胜利,如果还没提交完就停掉了过程。就会造成反复生产!因而,在消费者敞开前个别会组合应用commitAsync()和commitSync()。具体代码参考:MyOffsetConsumer.manualOffset()

本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源