一.springboot主动配置形式整合kafka:

springboot提供主动配置整合kafka的形式,须要做一下步骤:

1. 引入kafka依赖包: 
   <dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.2.7.RELEASE</version>   </dependency>

2.在springboot配置中退出kafka相干配置,springboot启动时候会主动加载这些配置,实现链接kafka,创立producer,consumer等。

spring:  kafka:    # kafka服务地址    bootstrap-servers: 127.0.0.1:9092    # 消费者配置    consumer:      bootstrap-servers: 127.0.0.1:9092      group-id: myGroup      enable-auto-commit: true      auto-offset-reset: earliest      auto-commit-interval: 1000      max-poll-records: 10    # 生产者配置    producer:      retries: 5      batch-size: 16384      buffer-memory: 33554432      acks: 1

3.音讯发送端:

@Componentpublic class MqProviderImpl{    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;        @Override    public void sendSkMessage(String message, Properties properties) {        // 发送音讯,注册一个回调事件        ListenableFuture<SendResult<String, String>> futureMessage = KafkaConfig.kafkaTemplateStatic.send("test_topic",                message);        futureMessage.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){            @Override            public void onSuccess(SendResult<String, String> sendResult) {                log.info(" rev "+sendResult.getProducerRecord().value());            }            @Override            public void onFailure(Throwable ex) {                log.error(" error "+ex.getMessage());            }        });    }}

4.音讯生产端:

    @KafkaListener(topics = {"test_topic"})    public void receiveSkMessageInfo(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) {        log.info(record.value());    }

以上实现是最简略的形式,但应用springboot主动配置的形式,所有配置项必须当时写好在在applicantion.yml的spring.kafka上面,试想在分布式的场景中,如果某一项产生变动,每个利用上面的配置都须要批改,这就须要将这些配置应用服务治理对立治理起来,这里就须要一种自定义配置的形式来解决。

springboot主动配置kafka是在KafkaAutoConfiguration这个类中实现的,它有一个成员KafkaProperties,这个properties中保留所有对于kafka的配置。

// 主动配置是在KafkaAutoConfiguration类实现的@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import({ KafkaAnnotationDrivenConfiguration.class,        KafkaStreamsAnnotationDrivenConfiguration.class })public class KafkaAutoConfiguration {    private final KafkaProperties properties;

KafkaProperties类的注解能够看出,配置都是从yml里的spring.kafka配置读出来的

@ConfigurationProperties(prefix = "spring.kafka")public class KafkaProperties {

二.springboot手动配置形式整合kafka,应用zk做配置核心:

在分布式的环境下,须要应用服务治理把yml里的配置对立治理起来,这里应用zookeeper来对立治理kafka的配置。如果将原有的配置放到zk中,来实现从zk上读取配置,让springboot接管到,这里就须要从新定义kafka的配置类,不能应用原有的KafkaAutoConfiguration了。

1.从zk上拉取配置,这里应用当当开源的Config Toolkit,还自带一个操作zk的治理界面,引入pom:

    <dependency>      <groupId>com.dangdang</groupId>      <artifactId>config-toolkit</artifactId>      <version>3.3.2-RELEASE</version>    </dependency>

2.在yml中增加链接zk的配置,有这些配置能力保障利用能链接zk:`
configs:

# zk地址address: 192.168.1.30:2181# 保留利用配置的节点名env: /projectx/modulexversion: 1# zk数据组groupdefault: groupdefault
3.下载当当的config-toolkit,拜访http://localhost:8080/,退出相干配置,github上有具体阐明。4. 新建一个ZKConfiguration类,实现EnvironmentAware接口,实现EnvironmentAware接口的setEnvironment能够在我的项目启动时设置我的项目的环境变量,能够在这个类中联合config-toolkit,把zk的配置加载到我的项目环境变量当中:
@Componentpublic class ZKConfiguration implements EnvironmentAware {    @Autowired    private Environment env;        private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>();        public ZKConfiguration() {    }        // 加载zk的根本配置    @Bean    public ZookeeperConfigProfile zookeeperConfigProfile() {        ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile(                Objects.requireNonNull(this.env.getProperty("configs.address")),                Objects.requireNonNull(this.env.getProperty("configs.env")),                this.env.getProperty("configs.version"));        return configProfile;    }    //失去具体组里的配置    @Bean({"groupPropDefault"})    public GeneralConfigGroup generalConfigGroupDefault() {        ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();        GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault"));        return group;    }    /**    * 获取配置组    * @return    */    public GeneralConfigGroup getConfigGroup(String group) {        return GROUPMAP.get(group);    }        /**    *     * 我的项目启动时会调用这个办法,把zk里的配置组存在长期变量GROUPMAP里,当前会用到    * 所以 数据源初始化,就设置在这个办法里    * @param environment    */    @Override    public void setEnvironment(Environment environment) {        this.env = environment;        ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();        GROUPMAP.put("groupdefault", new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault")));    }}
5.取得所有配置项后,就是让springboot去建设kafka链接了,这里相当于要从新实现KafkaAutoConfiguration的配置。建设一个KafkaConfig配置类,这里次要是配置所有kafka须要的bean:

@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@EnableKafka
public class KafkaConfig {

// 把刚刚加载zk配置的类注入进来@Autowiredprivate ZKConfiguration zkConfiguration;// 创立 消费者工厂@Bean("consumerFactory")@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<String, String> consumerFactory() {    // 创立工厂须要三个参数:    // 1. 消费者配置的map    // 2. key的反序列化实现类    // 3. value的反序列化实现类    return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer());}// 创立生产者工厂@Bean("producerFactory")@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<String, String> kafkaProducerFactory() {    // 生产者工厂的参数如消费者工厂    return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer());}// 创立 kafkaTemplate 这个bean,有了这个bean能力在理论业务中应用kafka@Bean("kafkaTemplate")@ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class})public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, String> kafkaProducerFactory,                                                      @Qualifier("producerListener") ProducerListener<String, Protobufable> producerListener) {    KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);    kafkaTemplate.setProducerListener(producerListener);    kafkaTemplate.setDefaultTopic("groupdefault");    return kafkaTemplate;}@Bean("producerListener")@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<String, Protobufable> kafkaProducerListener() {    return new LoggingProducerListener();}@Bean@ConditionalOnProperty(        name = {"spring.kafka.producer.transaction-id-prefix"})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {    return new KafkaTransactionManager(producerFactory);}// zk里拿到的配置取出来private Map<String, Object> makeKafkaConfig() {    // 取得配置的group     GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup("groupdefault");    Map<String, Object> kafkaConfig = new HashMap<>();    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get("spring.kafka.bootstrap-servers"));    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get("spring.kafka.consumer.group-id"));    kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get("spring.kafka.consumer.auto-offset-reset"));    kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get("spring.kafka.consumer.enable-auto-commit"));    kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.auto-commit-interval"));    kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.key-serializer"));    kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.value-serializer"));    kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-records"));    kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-interval-ms"));    kafkaConfig.put("ack-mode", configGroup.get("spring.kafka.listener.ack-mode"));    kafkaConfig.put("concurrency", configGroup.get("spring.kafka.listener.concurrency"));    kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get("spring.kafka.producer.acks"));    kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get("spring.kafka.producer.batch-size"));    kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get("spring.kafka.producer.buffer-memory"));    kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.key-serializer"));    kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.value-serializer"));    kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get("spring.kafka.producer.retries"));    return kafkaConfig;}

}

6. 将kafka须要的bean配置好后,就能在理论业务中应用KafkaTemplate操作音讯了
@Componentpublic class MqProviderImpl{    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;