乐趣区

Kafka保证消息传输

Kafka 保证消息传输

生产 2000W 条数据

在本地机器生产 2000W 条数据, 单条数据中消息部分在 40~50 个字节左右. 大概耗时在 2 分钟多发送完场. 代码如下

private static Properties initConfig() {final Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    return properties;
}
public static void main(String[] args) {long start = System.currentTimeMillis();
    final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(initConfig());
    for (int i = 0; i < 20000000; i++) {
        // 用 utf8 编码生成 45 个字节. 一个字符占一个字节.c
        final ProducerRecord<String, String> record = new ProducerRecord<>("TwentyMillion", String.valueOf(i), UUID.randomUUID() + " " + i);
        kafkaProducer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println(metadata.toString());
                } else {exception.printStackTrace();
                }
            }
        });
    }
    System.out.println(System.currentTimeMillis() - start + "ms");
}

在上面的例子中, 一共发送了 2000W 条数据, 但是最后只收到了 19,458,873 条数据, 很明显缺少了几万条数据, 这是由于没有加 ACKS_CONFIG, 只需要加上一行 props.put(ProducerConfig.ACKS_CONFIG, "1");, 就能保证 partition leader 肯定会接收到数据. 但是不能 100% 保证消息不会丢失. 如果写入的消息还没同步到 partition replica, 此时 partition leader 挂了, 重新选举 partition leader 之后, 则会造成消息丢失.

消费 2000W 条数据

退出移动版