spring 生产kafka

消费者配置:

@Configuration@EnableKafka@ConditionalOnResource(resources = "/special-run.txt")public class ZdryKafkaConsumerConfig {    @Value("${kafka.zdry.consumer.autoStartup}")    private Boolean autoStartup;    @Value("${kafka.zdry.consumer.servers}")    private String servers;    @Value("${kafka.zdry.consumer.topic}")    private String topic;    @Value("${kafka.zdry.consumer.group.id}")    private String groupId;    @Value("${kafka.zdry.consumer.enable.auto.commit}")    private String enableAutoCommit;    @Value("${kafka.zdry.consumer.auto.commit.interval.ms}")    private String autoCommitIntervalMs;    @Value("${kafka.zdry.consumer.session.timeout.ms}")    private String sessionTimeoutMs;    @Value("${kafka.zdry.consumer.auto.offset.reset}")    private String autoOffsetReset;    @Value("${kafka.zdry.consumer.max.poll.records}")    private String maxPollRecords;    @Value("${kafka.zdry.consumer.concurrency}")    private Integer concurrency;    /**     * 消费者批量工厂 人员轨迹     */    @Bean("zdry_person_track")    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(concurrency);        factory.getContainerProperties().setPollTimeout(1500);        factory.setBatchListener(true);        factory.setAutoStartup(autoStartup);        return factory;    }    /**     * 消费者工厂     */    public ConsumerFactory<String, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    /**     * 消费者配置     */    public Map<String, Object> consumerConfigs() {        Map<String, Object> propsMap = new HashMap<>();        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return propsMap;    }}

生产监听:

@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track")    public void consumeToJson(List<ConsumerRecord<String, String>> records){        int count = records.size();        log.info("count={}",count);        JSONArray arr = new JSONArray();        for (ConsumerRecord<String, String> record : records) {            JSONObject obj = JSON.parseObject(record.value());            arr.add(obj);        }    }

推送生产到kafka

生产者kafka配置:

@Configuration@EnableKafka@ConditionalOnResource(resources = "/special-run.txt")public class ZdryKafkaProducerConfig {    @Value("${kafka.zdry.consumer.servers}")    private String servers;    @Bean("zdryProducerTemplate")    public KafkaTemplate<String, List<Object>> createTemplate(){        Map<String, Object> pros = producerProps();        ProducerFactory<String, List<Object>> pf = new DefaultKafkaProducerFactory<String, List<Object>>(pros);        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);        return template;    }    public Map<String, Object> producerProps() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);        props.put(ProducerConfig.RETRIES_CONFIG, 0);        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }}

产生生产并推送到kafka

    @Autowired    private KafkaTemplate zdryProducerTemplate;    @Value("${kafka.zdry.consumer.push.topic}")    private String pushTopic;

办法:

try{    zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));}catch(Exception e){    e.printStackTrace();}zdryProducerTemplate.flush();

yml 配置

kafka:  ## 重点人员抓拍轨迹  zdry:    consumer:      autoStartup: true      servers: x.x.x.x:6667      topic: person_track      group.id: aa_1      enable.auto.commit: true      auto.commit.interval.ms: 100      session.timeout.ms: 10000      auto.offset.reset: earliest      max.poll.records: 100      concurrency: 1      push.topic: ai_warn