语言:java (spring boot),单台kafka
场景:同一个组,两个消费者同时生产一个topic
实现过程:
首先批改这个topic的partitions./kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic topic-name而后批改我的项目配置文件spring: kafka: listener: concurrency: 2
生产者代码
@SpringBootTestclass ProducerApplicationTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test void contextLoads() { for (int i = 1; i <= 1000; i++) { kafkaTemplate.send("test_0105_jcy", String.valueOf(i)); System.out.println(i); } }}
消费者代码(X2)
@Slf4j@SpringBootApplicationpublic class Customer1Application { @KafkaListener(topics = "test_0105_jcy") public void consumerListener(String msg) throws InterruptedException { log.info(msg); Thread.sleep(1 * 1000); } public static void main(String[] args) { SpringApplication.run(Customer1Application.class, args); }}
我的项目构造
生产者发送1000条音讯,两个消费者生产状况
依据默认策略将音讯发送到两个partitions中,如果有特殊要求的话,能够通过重载的send办法指定partitions,send(String topic, Integer partition, K key, V data),也能够通过自定义分区器来实现。