关于java:60spring-消费kafka-推送消费到kafka

46次阅读

共计 4091 个字符,预计需要花费 11 分钟才能阅读完成。

spring 生产 kafka

消费者配置:

@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaConsumerConfig {@Value("${kafka.zdry.consumer.autoStartup}")
    private Boolean autoStartup;
    @Value("${kafka.zdry.consumer.servers}")
    private String servers;
    @Value("${kafka.zdry.consumer.topic}")
    private String topic;
    @Value("${kafka.zdry.consumer.group.id}")
    private String groupId;
    @Value("${kafka.zdry.consumer.enable.auto.commit}")
    private String enableAutoCommit;

    @Value("${kafka.zdry.consumer.auto.commit.interval.ms}")
    private String autoCommitIntervalMs;
    @Value("${kafka.zdry.consumer.session.timeout.ms}")
    private String sessionTimeoutMs;
    @Value("${kafka.zdry.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.zdry.consumer.max.poll.records}")
    private String maxPollRecords;
    @Value("${kafka.zdry.consumer.concurrency}")
    private Integer concurrency;


    /**
     * 消费者批量工厂 人员轨迹
     */
    @Bean("zdry_person_track")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);
        factory.setAutoStartup(autoStartup);
        return factory;
    }

    /**
     * 消费者工厂
     */
    public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消费者配置
     */
    public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return propsMap;
    }

}

生产监听:

@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track")
    public void consumeToJson(List<ConsumerRecord<String, String>> records){int count = records.size();
        log.info("count={}",count);
        JSONArray arr = new JSONArray();
        for (ConsumerRecord<String, String> record : records) {JSONObject obj = JSON.parseObject(record.value());
            arr.add(obj);
        }

    }

推送生产到 kafka

生产者 kafka 配置:

@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaProducerConfig {@Value("${kafka.zdry.consumer.servers}")
    private String servers;

    @Bean("zdryProducerTemplate")
    public KafkaTemplate<String, List<Object>> createTemplate(){Map<String, Object> pros = producerProps();
        ProducerFactory<String, List<Object>> pf = new DefaultKafkaProducerFactory<String, List<Object>>(pros);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
        return template;
    }

    public Map<String, Object> producerProps() {Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;

    }


}

产生生产并推送到 kafka

    @Autowired
    private KafkaTemplate zdryProducerTemplate;

    @Value("${kafka.zdry.consumer.push.topic}")
    private String pushTopic;

办法:

try{zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));
}catch(Exception e){e.printStackTrace();
}
zdryProducerTemplate.flush();

yml 配置

kafka:
  ## 重点人员抓拍轨迹
  zdry:
    consumer:
      autoStartup: true
      servers: x.x.x.x:6667
      topic: person_track
      group.id: aa_1
      enable.auto.commit: true
      auto.commit.interval.ms: 100
      session.timeout.ms: 10000
      auto.offset.reset: earliest
      max.poll.records: 100
      concurrency: 1
      push.topic: ai_warn

正文完
 0