深刻利用
4.1 springboot-kafka
1)配置文件
kafka:
bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
producer: # producer 生产者
retries: 0 # 重试次数
acks: 1 # 应答级别: 多少个分区正本备份实现时向生产者发送 ack 确认(可选 0、1、all/-1)
batch-size: 16384 # 一次最多发送数据量
buffer-memory: 33554432 # 生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # consumer 消费者
group-id: javagroup # 默认的生产组 ID
enable-auto-commit: true # 是否主动提交 offset
auto-commit-interval: 100 # 提交 offset 延时(接管到音讯后多久提交 offset)
auto-offset-reset: latest #earliest,latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2)启动信息
4.2 音讯发送
4.2.1 发送类型
KafkaTemplate 调用 send 时默认采纳异步发送,如果须要同步获取发送后果,调用 get 办法
具体代码参考:AsyncProducer.java
消费者应用:KafkaConsumer.java
1)同步发送
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
// 留神,能够设置等待时间,超出后,不再等待后果
SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());
通过 swagger 发送,控制台能够失常打印 send result
2)阻断
在服务器上,将 kafka 暂停服务
docker-compose pause kafka-1 kafka-2
在 swagger 发送音讯
调同步发送:申请被阻断,始终期待,超时后返回谬误
而调异步发送的(默认发送接口),申请立即返回。
那么,异步发送的音讯怎么确认发送状况呢???往下看!
3)注册监听
代码参考:KafkaListener.java
能够给 kafkaTemplate 设置 Listener 来监听音讯发送状况,实现外部的对应办法
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});
查看控制台,期待一段时间后,异步发送失败的音讯会被回调给注册过的 listener
com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}
启动 kafka
docker-compose unpause kafka-1 kafka-2
再次发送音讯时,同步异步均能够失常收发,并且监听进入 success 回调
com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}
com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]
能够看到,在内部类 KafkaListener$1 中,即注册的 Listener 的音讯。
4.2.2 序列化
消费者应用:KafkaConsumer.java
1)序列化详解
- 后面用到的是 Kafka 自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
- 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
- 这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)
- 基本上,能够满足绝大多数场景
2)自定义序列化
本人实现,实现对应的接口即可,有以下办法:
public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) { }
// 实践上,只实现这个即可失常运行
byte[] serialize(String var1, T var2);
// 默认调下面的办法
default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);
}
default void close() {}
}
案例,参考: MySerializer.java
在 yaml 中配置本人的编码器
value-serializer: com.itheima.demo.config.MySerializer
从新发送,发现:音讯发送端编码回调一切正常。然而生产端音讯内容不对!
com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}
com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"
怎么办?
3)解码
发送端有编码并且咱们本人定义了编码,那么接收端天然要装备对应的解码策略
代码参考:MyDeserializer.java,实现形式与编码器简直一样!
在 yaml 中配置本人的解码器
value-deserializer: com.itheima.demo.config.MyDeserializer
再次收发,音讯失常
com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}
com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}
4.2.3 分区策略
分区策略决定了音讯依据 key 投放到哪个分区,也是程序生产保障的基石。
- 给定了分区号,间接将数据发送到指定的分区外面去
- 没有给定分区号,给定数据的 key 值,通过 key 取上 hashCode 进行分区
- 既没有给定分区号,也没有给定 key 值,间接轮循进行分区
- 自定义分区,你想怎么做就怎么做
1)验证默认分区规定
发送者代码参考:PartitionProducer.java
消费者代码应用:PartitionConsumer.java
通过 swagger 拜访 setKey:
看控制台:
再拜访 setPartition 来设置分区号 0 来发送
看控制台:
2)自定义分区
你想本人定义规定,依据我的要求,把音讯投放到对应的分区去?能够!
参考代码:MyPartitioner.java , MyPartitionTemplate.java ,
发送应用:MyPartitionProducer.java
应用 swagger,发送 0 结尾和非 0 结尾两种 key 试一试!
备注:
本人定义 config 参数,比拟麻烦,须要突破默认的 KafkaTemplate 设置
能够将 KafkaConfiguration.java 中的 getTemplate 加上 @Bean 注解来笼罩零碎默认 bean
这里为了防止混同,采纳 @Autowire 注入
4.3 音讯生产
4.3.1 音讯组别
发送者应用:KafkaProducer.java
1)代码参考:GroupConsumer.java,Listener 拷贝 3 份,别离赋予两组 group,验证分组生产:
2)启动
3)通过 swagger 发送 2 条音讯
- 同一 group 下的两个消费者,在 group1 均分音讯
- group2 下只有一个消费者,失去全副音讯
4)生产端闲置
留神分区数与消费者数的搭配,如果(消费者数 > 分区数量),将会呈现消费者闲置,浪费资源!
验证形式:
停掉我的项目,删掉 test 主题,从新建一个,这次只给它调配一个分区。
从新发送两条音讯,试一试
解析:
group2 能够生产到 1、2 两条音讯
group1 下有两个消费者,然而只调配给了 -1,- 2 这个过程被闲置
4.3.2 位移提交
1)主动提交
后面的案例中,咱们设置了以下两个选项,则 kafka 会按延时设置主动提交
enable-auto-commit: true # 是否主动提交 offset
auto-commit-interval: 100 # 提交 offset 延时(接管到音讯后多久提交 offset)
2)手动提交
有些时候,咱们须要手动管制偏移量的提交机会,比方确保音讯严格生产后再提交,以避免失落或反复。
上面咱们本人定义配置,笼罩下面的参数
代码参考:MyOffsetConfig.java
通过在生产端的 Consumer 来提交偏移量,有如下几种形式:
代码参考:MyOffsetConsumer.java
同步提交、异步提交:manualCommit(),同步异步的差异,上面会具体讲到。
指定偏移量提交:offset()
3)反复生产问题
如果手动提交模式被关上,肯定不要遗记提交偏移量。否则会造成反复生产!
代码参考和比照:manualCommit() , noCommit()
验证过程:
用 km 将 test 主题删除,新建一个 test 空主题。不便察看音讯偏移
正文掉其余 Consumer 的 Component 注解,只保留以后 MyOffsetConsumer.java
启动我的项目,应用 swagger 的 KafkaProducer 发送间断几条音讯
留心控制台,都能生产,没问题:
然而!重启试试:
无论重启多少次,不提交偏移量的生产组,会反复生产一遍!!!
再通过命令行查问偏移量试试:
4)教训与总结
commitSync()办法,即同步提交,会提交最初一个偏移量。在胜利提交或碰到无怯复原的谬误之前,commitSync()会始终重试,然而 commitAsync()不会。这就造成一个陷阱:如果异步提交,针对偶然呈现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为长期问题导致的,那么后续的提交总会有胜利的。只有胜利一次,偏移量就会提交下来。然而!如果这是产生在敞开消费者时的最初一次提交,就要确保可能提交胜利,如果还没提交完就停掉了过程。就会造成反复生产!因而,在消费者敞开前个别会组合应用 commitAsync()和 commitSync()。具体代码参考:MyOffsetConsumer.manualOffset()
本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源