一.springboot 主动配置形式整合 kafka:
springboot 提供主动配置整合 kafka 的形式,须要做一下步骤:
1. 引入 kafka 依赖包:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
2. 在 springboot 配置中退出 kafka 相干配置,springboot 启动时候会主动加载这些配置,实现链接 kafka, 创立 producer,consumer 等。
spring:
kafka:
# kafka 服务地址
bootstrap-servers: 127.0.0.1:9092
# 消费者配置
consumer:
bootstrap-servers: 127.0.0.1:9092
group-id: myGroup
enable-auto-commit: true
auto-offset-reset: earliest
auto-commit-interval: 1000
max-poll-records: 10
# 生产者配置
producer:
retries: 5
batch-size: 16384
buffer-memory: 33554432
acks: 1
3. 音讯发送端:
@Component
public class MqProviderImpl{
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void sendSkMessage(String message, Properties properties) {
// 发送音讯,注册一个回调事件
ListenableFuture<SendResult<String, String>> futureMessage = KafkaConfig.kafkaTemplateStatic.send("test_topic",
message);
futureMessage.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){
@Override
public void onSuccess(SendResult<String, String> sendResult) {log.info("rev"+sendResult.getProducerRecord().value());
}
@Override
public void onFailure(Throwable ex) {log.error("error"+ex.getMessage());
}
});
}}
4. 音讯生产端:
@KafkaListener(topics = {"test_topic"})
public void receiveSkMessageInfo(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) {log.info(record.value());
}
以上实现是最简略的形式,但应用 springboot 主动配置的形式,所有配置项必须当时写好在在 applicantion.yml 的 spring.kafka 上面,试想在分布式的场景中,如果某一项产生变动,每个利用上面的配置都须要批改,这就须要将这些配置应用服务治理对立治理起来,这里就须要一种自定义配置的形式来解决。
springboot 主动配置 kafka 是在 KafkaAutoConfiguration 这个类中实现的,它有一个成员 KafkaProperties,这个 properties 中保留所有对于 kafka 的配置。
// 主动配置是在 KafkaAutoConfiguration 类实现的
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {private final KafkaProperties properties;
KafkaProperties 类的注解能够看出,配置都是从 yml 里的 spring.kafka 配置读出来的
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
二.springboot 手动配置形式整合 kafka,应用 zk 做配置核心:
在分布式的环境下,须要应用服务治理把 yml 里的配置对立治理起来,这里应用 zookeeper 来对立治理 kafka 的配置。如果将原有的配置放到 zk 中,来实现从 zk 上读取配置,让 springboot 接管到,这里就须要从新定义 kafka 的配置类,不能应用原有的 KafkaAutoConfiguration 了。
1. 从 zk 上拉取配置,这里应用当当开源的 Config Toolkit,还自带一个操作 zk 的治理界面,引入 pom:
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>config-toolkit</artifactId>
<version>3.3.2-RELEASE</version>
</dependency>
2. 在 yml 中增加链接 zk 的配置,有这些配置能力保障利用能链接 zk:`
configs:
# zk 地址
address: 192.168.1.30:2181
# 保留利用配置的节点名
env: /projectx/modulex
version: 1
# zk 数据组
groupdefault: groupdefault
3. 下载当当的 config-toolkit, 拜访 http://localhost:8080/,退出相干配置,github 上有具体阐明。4. 新建一个 ZKConfiguration 类,实现 EnvironmentAware 接口,实现 EnvironmentAware 接口的 setEnvironment 能够在我的项目启动时设置我的项目的环境变量,能够在这个类中联合 config-toolkit, 把 zk 的配置加载到我的项目环境变量当中:
@Component
public class ZKConfiguration implements EnvironmentAware {
@Autowired
private Environment env;
private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>();
public ZKConfiguration() {}
// 加载 zk 的根本配置
@Bean
public ZookeeperConfigProfile zookeeperConfigProfile() {
ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile(Objects.requireNonNull(this.env.getProperty("configs.address")),
Objects.requireNonNull(this.env.getProperty("configs.env")),
this.env.getProperty("configs.version"));
return configProfile;
}
// 失去具体组里的配置
@Bean({"groupPropDefault"})
public GeneralConfigGroup generalConfigGroupDefault() {ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();
GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault"));
return group;
}
/**
* 获取配置组
* @return
*/
public GeneralConfigGroup getConfigGroup(String group) {return GROUPMAP.get(group);
}
/**
*
* 我的项目启动时会调用这个办法,把 zk 里的配置组存在长期变量 GROUPMAP 里,当前会用到
* 所以 数据源初始化,就设置在这个办法里
* @param environment
*/
@Override
public void setEnvironment(Environment environment) {
this.env = environment;
ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();
GROUPMAP.put("groupdefault", new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault")));
}}
5. 取得所有配置项后,就是让 springboot 去建设 kafka 链接了,这里相当于要从新实现 KafkaAutoConfiguration
的配置。建设一个 KafkaConfig 配置类,这里次要是配置所有 kafka 须要的 bean:
@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@EnableKafka
public class KafkaConfig {
// 把刚刚加载 zk 配置的类注入进来
@Autowired
private ZKConfiguration zkConfiguration;
// 创立 消费者工厂
@Bean("consumerFactory")
@ConditionalOnMissingBean({ConsumerFactory.class})
public ConsumerFactory<String, String> consumerFactory() {
// 创立工厂须要三个参数:// 1. 消费者配置的 map
// 2. key 的反序列化实现类
// 3. value 的反序列化实现类
return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer());
}
// 创立生产者工厂
@Bean("producerFactory")
@ConditionalOnMissingBean({ProducerFactory.class})
public ProducerFactory<String, String> kafkaProducerFactory() {
// 生产者工厂的参数如消费者工厂
return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer());
}
// 创立 kafkaTemplate 这个 bean,有了这个 bean 能力在理论业务中应用 kafka
@Bean("kafkaTemplate")
@ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class})
public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, String> kafkaProducerFactory,
@Qualifier("producerListener") ProducerListener<String, Protobufable> producerListener) {KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(producerListener);
kafkaTemplate.setDefaultTopic("groupdefault");
return kafkaTemplate;
}
@Bean("producerListener")
@ConditionalOnMissingBean({ProducerListener.class})
public ProducerListener<String, Protobufable> kafkaProducerListener() {return new LoggingProducerListener();
}
@Bean
@ConditionalOnProperty(name = {"spring.kafka.producer.transaction-id-prefix"}
)
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager(producerFactory);
}
// zk 里拿到的配置取出来
private Map<String, Object> makeKafkaConfig() {
// 取得配置的 group
GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup("groupdefault");
Map<String, Object> kafkaConfig = new HashMap<>();
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get("spring.kafka.bootstrap-servers"));
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get("spring.kafka.consumer.group-id"));
kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get("spring.kafka.consumer.auto-offset-reset"));
kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get("spring.kafka.consumer.enable-auto-commit"));
kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.auto-commit-interval"));
kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.key-serializer"));
kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.value-serializer"));
kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-records"));
kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-interval-ms"));
kafkaConfig.put("ack-mode", configGroup.get("spring.kafka.listener.ack-mode"));
kafkaConfig.put("concurrency", configGroup.get("spring.kafka.listener.concurrency"));
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get("spring.kafka.producer.acks"));
kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get("spring.kafka.producer.batch-size"));
kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get("spring.kafka.producer.buffer-memory"));
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.key-serializer"));
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.value-serializer"));
kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get("spring.kafka.producer.retries"));
return kafkaConfig;
}
}
6. 将 kafka 须要的 bean 配置好后,就能在理论业务中应用 KafkaTemplate 操作音讯了
@Component
public class MqProviderImpl{
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;