共计 3715 个字符,预计需要花费 10 分钟才能阅读完成。
pom 引入
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
yml 配置阐明
spring:
kafka:
# 以逗号分隔的地址列表,用于建设与 Kafka 集群的初始连贯 (kafka 默认的端口号为 9092)
bootstrap-servers: 192.168.1.200:9092
producer:
# 产生谬误后,音讯重发的次数。retries: 0
#当有多个音讯须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次能够应用的内存大小,依照字节数计算。batch-size: 16384
# 设置生产者内存缓冲区的大小。buffer-memory: 33554432
# 键的序列化形式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化形式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0:生产者在胜利写入音讯之前不会期待任何来自服务器的响应。# acks=1:只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。# acks=all:只有当所有参加复制的节点全副收到音讯时,生产者才会收到一个来自服务器的胜利响应。acks: 1
consumer:
# 主动提交的工夫距离 在 spring boot 2.X 版本中这里采纳的是值的类型为 Duration 须要合乎特定的格局,如 1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下该作何解决:# latest(默认值)在偏移量有效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest:在偏移量有效的状况下,消费者将从起始地位读取分区的记录
auto-offset-reset: earliest
# 是否主动提交偏移量,默认值是 true, 为了避免出现反复数据和数据失落,能够把它设置为 false, 而后手动提交偏移量
enable-auto-commit: false
# 键的反序列化形式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化形式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。concurrency: 5
#listner 负责 ack,每调用一次,就立刻 commit
ack-mode: manual_immediate
生产者实例:
import com.xy.kafka.constant.Topic;
import com.xy.kafka.dao.XyKafkaInMsgMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Date;
import java.util.UUID;
@Slf4j
@Component
@EnableScheduling
public class MsgProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private XyKafkaInMsgMapper inMsgMapper;
/**
* 定时工作生成音讯
*/
@Scheduled(cron = "0/10 * * * * ?")
public void send() {
// 要推送的音讯内容
String message = UUID.randomUUID().toString()+"生产的音讯";
/**
* 发送音讯且接管返回值
*ListenableFuture 类是 spring 对 java 原生 Future 的扩大加强, 是一个泛型接口, 用于监听异步办法的回调而对于 kafka send 办法返回值而言,这里的泛型所代表的理论类型就是 SendResult<K, V>, 而这里 K,V 的泛型实际上被用于 ProducerRecord<K, V> producerRecord, 即生产者发送音讯的 key,value 类型
*/
ListenableFuture listenableFuture = kafkaTemplate.send(Topic.SIMPLE,message);
listenableFuture.addCallback(o -> log.info("音讯发送胜利,{}", o.toString()),
throwable -> log.info("音讯发送失败,{}" + throwable.getMessage())
);
XyKafkaInMsg build = new XyKafkaInMsg();
build.setFwBh(System.currentTimeMillis());
build.setGmtCreate(new Date());
// 依据信息惟一批次号查问音讯是否已存在
XyKafkaInMsg inMsg = inMsgMapper.selectByFuBh(build.getFwBh());
if (inMsg == null){
// 发送的音讯入库
int saveMsgResult = inMsgMapper.insertSelective(build);
log.info("音讯插入后果:{}",saveMsgResult == 1 ? "胜利" : "失败");
}
}
}
消费者实例:
import com.xy.kafka.constant.Topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaSimpleConsumer {
// 简略消费者
@KafkaListener(groupId = "simpleGroup", topics = Topic.SIMPLE)
public void consumer1_1(ConsumerRecord<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) {log.info("独自消费者生产音讯,topic= {} ,content = {}",topic,record.value());
log.info("consumer content = {}",consumer);
// 音讯接管确认
ack.acknowledge();
/*
* 如果须要手工提交异步 consumer.commitSync();
* 手工同步提交 consumer.commitAsync()
*/
}
}
正文完