关于java:Springboot集成kafka高级应用实战

40次阅读

共计 4934 个字符,预计需要花费 13 分钟才能阅读完成。

深刻利用

4.1 springboot-kafka

1)配置文件

  kafka:
    bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别: 多少个分区正本备份实现时向生产者发送 ack 确认(可选 0、1、all/-1)
      batch-size: 16384 # 一次最多发送数据量
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer: # consumer 消费者
      group-id: javagroup # 默认的生产组 ID
      enable-auto-commit: true # 是否主动提交 offset
      auto-commit-interval: 100 # 提交 offset 延时(接管到音讯后多久提交 offset)
      auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2)启动信息

4.2 音讯发送

4.2.1 发送类型

KafkaTemplate 调用 send 时默认采纳异步发送,如果须要同步获取发送后果,调用 get 办法

具体代码参考:AsyncProducer.java

消费者应用:KafkaConsumer.java

1)同步发送

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
                // 留神,能够设置等待时间,超出后,不再等待后果
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());

通过 swagger 发送,控制台能够失常打印 send result

2)阻断

在服务器上,将 kafka 暂停服务

docker-compose pause kafka-1 kafka-2

在 swagger 发送音讯

调同步发送:申请被阻断,始终期待,超时后返回谬误

而调异步发送的(默认发送接口),申请立即返回。

那么,异步发送的音讯怎么确认发送状况呢???往下看!

3)注册监听

代码参考:KafkaListener.java

能够给 kafkaTemplate 设置 Listener 来监听音讯发送状况,实现外部的对应办法

 kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});

查看控制台,期待一段时间后,异步发送失败的音讯会被回调给注册过的 listener

com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}

启动 kafka

docker-compose unpause kafka-1 kafka-2

再次发送音讯时,同步异步均能够失常收发,并且监听进入 success 回调

com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}
com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]

能够看到,在内部类 KafkaListener$1 中,即注册的 Listener 的音讯。

4.2.2 序列化

消费者应用:KafkaConsumer.java

1)序列化详解

  • 后面用到的是 Kafka 自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)
  • 基本上,能够满足绝大多数场景

2)自定义序列化

本人实现,实现对应的接口即可,有以下办法:

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) { }

      // 实践上,只实现这个即可失常运行
    byte[] serialize(String var1, T var2);

      // 默认调下面的办法
    default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);
    }

    default void close() {}
}

案例,参考: MySerializer.java

在 yaml 中配置本人的编码器

value-serializer: com.itheima.demo.config.MySerializer

从新发送,发现:音讯发送端编码回调一切正常。然而生产端音讯内容不对!

com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}
com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"

怎么办?

3)解码

发送端有编码并且咱们本人定义了编码,那么接收端天然要装备对应的解码策略

代码参考:MyDeserializer.java,实现形式与编码器简直一样!

在 yaml 中配置本人的解码器

value-deserializer: com.itheima.demo.config.MyDeserializer

再次收发,音讯失常

com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}
com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}

4.2.3 分区策略

分区策略决定了音讯依据 key 投放到哪个分区,也是程序生产保障的基石。

  • 给定了分区号,间接将数据发送到指定的分区外面去
  • 没有给定分区号,给定数据的 key 值,通过 key 取上 hashCode 进行分区
  • 既没有给定分区号,也没有给定 key 值,间接轮循进行分区
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规定

发送者代码参考:PartitionProducer.java

消费者代码应用:PartitionConsumer.java

通过 swagger 拜访 setKey:

看控制台:

再拜访 setPartition 来设置分区号 0 来发送

看控制台:

2)自定义分区

你想本人定义规定,依据我的要求,把音讯投放到对应的分区去?能够!

参考代码:MyPartitioner.java , MyPartitionTemplate.java ,

发送应用:MyPartitionProducer.java

应用 swagger,发送 0 结尾和非 0 结尾两种 key 试一试!

备注:

本人定义 config 参数,比拟麻烦,须要突破默认的 KafkaTemplate 设置

能够将 KafkaConfiguration.java 中的 getTemplate 加上 @Bean 注解来笼罩零碎默认 bean

这里为了防止混同,采纳 @Autowire 注入

4.3 音讯生产

4.3.1 音讯组别

发送者应用:KafkaProducer.java

1)代码参考:GroupConsumer.java,Listener 拷贝 3 份,别离赋予两组 group,验证分组生产:

2)启动

3)通过 swagger 发送 2 条音讯

  • 同一 group 下的两个消费者,在 group1 均分音讯
  • group2 下只有一个消费者,失去全副音讯

4)生产端闲置

留神分区数与消费者数的搭配,如果(消费者数 > 分区数量),将会呈现消费者闲置,浪费资源!

验证形式:

停掉我的项目,删掉 test 主题,从新建一个,这次只给它调配一个分区。

从新发送两条音讯,试一试

解析:

group2 能够生产到 1、2 两条音讯

group1 下有两个消费者,然而只调配给了 -1,- 2 这个过程被闲置

4.3.2 位移提交

1)主动提交

后面的案例中,咱们设置了以下两个选项,则 kafka 会按延时设置主动提交

enable-auto-commit: true # 是否主动提交 offset
auto-commit-interval: 100  # 提交 offset 延时(接管到音讯后多久提交 offset)

2)手动提交

有些时候,咱们须要手动管制偏移量的提交机会,比方确保音讯严格生产后再提交,以避免失落或反复。

上面咱们本人定义配置,笼罩下面的参数

代码参考:MyOffsetConfig.java

通过在生产端的 Consumer 来提交偏移量,有如下几种形式:

代码参考:MyOffsetConsumer.java

同步提交、异步提交:manualCommit(),同步异步的差异,上面会具体讲到。

指定偏移量提交:offset()

3)反复生产问题

如果手动提交模式被关上,肯定不要遗记提交偏移量。否则会造成反复生产!

代码参考和比照:manualCommit() , noCommit()

验证过程:

用 km 将 test 主题删除,新建一个 test 空主题。不便察看音讯偏移
正文掉其余 Consumer 的 Component 注解,只保留以后 MyOffsetConsumer.java
启动我的项目,应用 swagger 的 KafkaProducer 发送间断几条音讯
留心控制台,都能生产,没问题:

然而!重启试试:

无论重启多少次,不提交偏移量的生产组,会反复生产一遍!!!

再通过命令行查问偏移量试试:

4)教训与总结

commitSync()办法,即同步提交,会提交最初一个偏移量。在胜利提交或碰到无怯复原的谬误之前,commitSync()会始终重试,然而 commitAsync()不会。这就造成一个陷阱:如果异步提交,针对偶然呈现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为长期问题导致的,那么后续的提交总会有胜利的。只有胜利一次,偏移量就会提交下来。然而!如果这是产生在敞开消费者时的最初一次提交,就要确保可能提交胜利,如果还没提交完就停掉了过程。就会造成反复生产!因而,在消费者敞开前个别会组合应用 commitAsync()和 commitSync()。具体代码参考:MyOffsetConsumer.manualOffset()

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

正文完
 0