Kafka保证消息传输

生产2000W条数据

在本地机器生产2000W条数据,单条数据中消息部分在40~50个字节左右. 大概耗时在2分钟多发送完场.代码如下

private static Properties initConfig() {    final Properties properties = new Properties();    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");    return properties;}public static void main(String[] args) {    long start = System.currentTimeMillis();    final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(initConfig());    for (int i = 0; i < 20000000; i++) {        // 用utf8编码生成45个字节. 一个字符占一个字节.c        final ProducerRecord<String, String> record = new ProducerRecord<>("TwentyMillion", String.valueOf(i), UUID.randomUUID() + " " + i);        kafkaProducer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception exception) {                if (exception == null) {                    System.out.println(metadata.toString());                } else {                    exception.printStackTrace();                }            }        });    }    System.out.println(System.currentTimeMillis() - start + " ms");}

在上面的例子中,一共发送了2000W条数据,但是最后只收到了19,458,873条数据,很明显缺少了几万条数据,这是由于没有加 ACKS_CONFIG, 只需要加上一行props.put(ProducerConfig.ACKS_CONFIG, "1");,就能保证partition leader肯定会接收到数据. 但是不能100%保证消息不会丢失. 如果写入的消息还没同步到partition replica,此时partition leader挂了,重新选举partition leader之后,则会造成消息丢失.

消费2000W条数据