深刻利用
4.1 springboot-kafka
1)配置文件
kafka: bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生产者 retries: 0 # 重试次数 acks: 1 # 应答级别:多少个分区正本备份实现时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 # 一次最多发送数据量 buffer-memory: 33554432 # 生产端缓冲区大小 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # consumer消费者 group-id: javagroup # 默认的生产组ID enable-auto-commit: true # 是否主动提交offset auto-commit-interval: 100 # 提交offset延时(接管到音讯后多久提交offset) auto-offset-reset: latest #earliest,latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2)启动信息
4.2 音讯发送
4.2.1 发送类型
KafkaTemplate调用send时默认采纳异步发送,如果须要同步获取发送后果,调用get办法
具体代码参考:AsyncProducer.java
消费者应用:KafkaConsumer.java
1)同步发送
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message)); //留神,能够设置等待时间,超出后,不再等待后果 SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS); logger.info("send result:{}",result.getProducerRecord().value());
通过swagger发送,控制台能够失常打印send result
2)阻断
在服务器上,将kafka暂停服务
docker-compose pause kafka-1 kafka-2
在swagger发送音讯
调同步发送:申请被阻断,始终期待,超时后返回谬误
而调异步发送的(默认发送接口),申请立即返回。
那么,异步发送的音讯怎么确认发送状况呢???往下看!
3)注册监听
代码参考: KafkaListener.java
能够给kafkaTemplate设置Listener来监听音讯发送状况,实现外部的对应办法
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});
查看控制台,期待一段时间后,异步发送失败的音讯会被回调给注册过的listener
com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}
启动kafka
docker-compose unpause kafka-1 kafka-2
再次发送音讯时,同步异步均能够失常收发,并且监听进入success回调
com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]
能够看到,在内部类 KafkaListener$1 中,即注册的Listener的音讯。
4.2.2 序列化
消费者应用:KafkaConsumer.java
1)序列化详解
- 后面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
- 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
- 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
- 基本上,能够满足绝大多数场景
2)自定义序列化
本人实现,实现对应的接口即可,有以下办法:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, boolean isKey) { } //实践上,只实现这个即可失常运行 byte[] serialize(String var1, T var2); //默认调下面的办法 default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { }}
案例,参考: MySerializer.java
在yaml中配置本人的编码器
value-serializer: com.itheima.demo.config.MySerializer
从新发送,发现:音讯发送端编码回调一切正常。然而生产端音讯内容不对!
com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"
怎么办?
3)解码
发送端有编码并且咱们本人定义了编码,那么接收端天然要装备对应的解码策略
代码参考:MyDeserializer.java,实现形式与编码器简直一样!
在yaml中配置本人的解码器
value-deserializer: com.itheima.demo.config.MyDeserializer
再次收发,音讯失常
com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}
4.2.3 分区策略
分区策略决定了音讯依据key投放到哪个分区,也是程序生产保障的基石。
- 给定了分区号,间接将数据发送到指定的分区外面去
- 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
- 既没有给定分区号,也没有给定key值,间接轮循进行分区
- 自定义分区,你想怎么做就怎么做
1)验证默认分区规定
发送者代码参考:PartitionProducer.java
消费者代码应用:PartitionConsumer.java
通过swagger拜访setKey:
看控制台:
再拜访setPartition来设置分区号0来发送
看控制台:
2)自定义分区
你想本人定义规定,依据我的要求,把音讯投放到对应的分区去? 能够!
参考代码:MyPartitioner.java , MyPartitionTemplate.java ,
发送应用:MyPartitionProducer.java
应用swagger,发送0结尾和非0结尾两种key试一试!
备注:
本人定义config参数,比拟麻烦,须要突破默认的KafkaTemplate设置
能够将KafkaConfiguration.java中的getTemplate加上@Bean注解来笼罩零碎默认bean
这里为了防止混同,采纳@Autowire注入
4.3 音讯生产
4.3.1 音讯组别
发送者应用:KafkaProducer.java
1)代码参考:GroupConsumer.java,Listener拷贝3份,别离赋予两组group,验证分组生产:
2)启动
3)通过swagger发送2条音讯
- 同一group下的两个消费者,在group1均分音讯
- group2下只有一个消费者,失去全副音讯
4)生产端闲置
留神分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会呈现消费者闲置,浪费资源!
验证形式:
停掉我的项目,删掉test主题,从新建一个 ,这次只给它调配一个分区。
从新发送两条音讯,试一试
解析:
group2能够生产到1、2两条音讯
group1下有两个消费者,然而只调配给了 -1 , -2这个过程被闲置
4.3.2 位移提交
1)主动提交
后面的案例中,咱们设置了以下两个选项,则kafka会按延时设置主动提交
enable-auto-commit: true # 是否主动提交offsetauto-commit-interval: 100 # 提交offset延时(接管到音讯后多久提交offset)
2)手动提交
有些时候,咱们须要手动管制偏移量的提交机会,比方确保音讯严格生产后再提交,以避免失落或反复。
上面咱们本人定义配置,笼罩下面的参数
代码参考:MyOffsetConfig.java
通过在生产端的Consumer来提交偏移量,有如下几种形式:
代码参考:MyOffsetConsumer.java
同步提交、异步提交:manualCommit() ,同步异步的差异,上面会具体讲到。
指定偏移量提交:offset()
3)反复生产问题
如果手动提交模式被关上,肯定不要遗记提交偏移量。否则会造成反复生产!
代码参考和比照:manualCommit() , noCommit()
验证过程:
用km将test主题删除,新建一个test空主题。不便察看音讯偏移
正文掉其余Consumer的Component注解,只保留以后MyOffsetConsumer.java
启动我的项目,应用swagger的KafkaProducer发送间断几条音讯
留心控制台,都能生产,没问题:
然而!重启试试:
无论重启多少次,不提交偏移量的生产组,会反复生产一遍!!!
再通过命令行查问偏移量试试:
4)教训与总结
commitSync()办法,即同步提交,会提交最初一个偏移量。在胜利提交或碰到无怯复原的谬误之前,commitSync()会始终重试,然而commitAsync()不会。这就造成一个陷阱:如果异步提交,针对偶然呈现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为长期问题导致的,那么后续的提交总会有胜利的。只有胜利一次,偏移量就会提交下来。然而!如果这是产生在敞开消费者时的最初一次提交,就要确保可能提交胜利,如果还没提交完就停掉了过程。就会造成反复生产!因而,在消费者敞开前个别会组合应用commitAsync()和commitSync()。具体代码参考:MyOffsetConsumer.manualOffset()
本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源