Kafka保证消息传输

Kafka保证消息传输

生产2000W条数据

在本地机器生产2000W条数据,单条数据中消息部分在40~50个字节左右. 大概耗时在2分钟多发送完场.代码如下

private static Properties initConfig() {
    final Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    return properties;
}
public static void main(String[] args) {
    long start = System.currentTimeMillis();
    final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(initConfig());
    for (int i = 0; i < 20000000; i++) {
        // 用utf8编码生成45个字节. 一个字符占一个字节.c
        final ProducerRecord<String, String> record = new ProducerRecord<>("TwentyMillion", String.valueOf(i), UUID.randomUUID() + " " + i);
        kafkaProducer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        });
    }
    System.out.println(System.currentTimeMillis() - start + " ms");
}

在上面的例子中,一共发送了2000W条数据,但是最后只收到了19,458,873条数据,很明显缺少了几万条数据,这是由于没有加 ACKS_CONFIG, 只需要加上一行props.put(ProducerConfig.ACKS_CONFIG, "1");,就能保证partition leader肯定会接收到数据. 但是不能100%保证消息不会丢失. 如果写入的消息还没同步到partition replica,此时partition leader挂了,重新选举partition leader之后,则会造成消息丢失.

消费2000W条数据

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理