Kafka保证消息传输
生产2000W条数据
在本地机器生产2000W条数据,单条数据中消息部分在40~50个字节左右. 大概耗时在2分钟多发送完场.代码如下
private static Properties initConfig() { final Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); return properties;}public static void main(String[] args) { long start = System.currentTimeMillis(); final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(initConfig()); for (int i = 0; i < 20000000; i++) { // 用utf8编码生成45个字节. 一个字符占一个字节.c final ProducerRecord<String, String> record = new ProducerRecord<>("TwentyMillion", String.valueOf(i), UUID.randomUUID() + " " + i); kafkaProducer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(metadata.toString()); } else { exception.printStackTrace(); } } }); } System.out.println(System.currentTimeMillis() - start + " ms");}
在上面的例子中,一共发送了2000W条数据,但是最后只收到了19,458,873条数据,很明显缺少了几万条数据,这是由于没有加 ACKS_CONFIG, 只需要加上一行props.put(ProducerConfig.ACKS_CONFIG, "1");
,就能保证partition leader肯定会接收到数据. 但是不能100%保证消息不会丢失. 如果写入的消息还没同步到partition replica,此时partition leader挂了,重新选举partition leader之后,则会造成消息丢失.