关于kafka:kafka-消费者负载均衡实现

语言:java (spring boot),单台kafka
场景:同一个组,两个消费者同时生产一个topic

实现过程:

首先批改这个topic的partitions

./kafka-topics.sh --bootstrap-server localhost:9092 --alter --partitions 2 --topic topic-name


而后批改我的项目配置文件

spring:
  kafka:
    listener:
      concurrency: 2

生产者代码

@SpringBootTest
class ProducerApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void contextLoads()  {
        for (int i = 1; i <= 1000; i++) {
            kafkaTemplate.send("test_0105_jcy", String.valueOf(i));
            System.out.println(i);
        }
    }

}

消费者代码(X2)

@Slf4j
@SpringBootApplication
public class Customer1Application {

    @KafkaListener(topics = "test_0105_jcy")
    public void consumerListener(String msg) throws InterruptedException {
        log.info(msg);
        Thread.sleep(1 * 1000);
    }


    public static void main(String[] args) {
        SpringApplication.run(Customer1Application.class, args);
    }

}

我的项目构造

生产者发送1000条音讯,两个消费者生产状况

依据默认策略将音讯发送到两个partitions中,如果有特殊要求的话,能够通过重载的send办法指定partitions,send(String topic, Integer partition, K key, V data),也能够通过自定义分区器来实现。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理