共计 1291 个字符,预计需要花费 4 分钟才能阅读完成。
Kafka 保证消息传输
生产 2000W 条数据
在本地机器生产 2000W 条数据, 单条数据中消息部分在 40~50 个字节左右. 大概耗时在 2 分钟多发送完场. 代码如下
private static Properties initConfig() {final Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
return properties;
}
public static void main(String[] args) {long start = System.currentTimeMillis();
final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(initConfig());
for (int i = 0; i < 20000000; i++) {
// 用 utf8 编码生成 45 个字节. 一个字符占一个字节.c
final ProducerRecord<String, String> record = new ProducerRecord<>("TwentyMillion", String.valueOf(i), UUID.randomUUID() + " " + i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println(metadata.toString());
} else {exception.printStackTrace();
}
}
});
}
System.out.println(System.currentTimeMillis() - start + "ms");
}
在上面的例子中, 一共发送了 2000W 条数据, 但是最后只收到了 19,458,873 条数据, 很明显缺少了几万条数据, 这是由于没有加 ACKS_CONFIG, 只需要加上一行 props.put(ProducerConfig.ACKS_CONFIG, "1");
, 就能保证 partition leader 肯定会接收到数据. 但是不能 100% 保证消息不会丢失. 如果写入的消息还没同步到 partition replica, 此时 partition leader 挂了, 重新选举 partition leader 之后, 则会造成消息丢失.
消费 2000W 条数据
正文完
发表至: Kafka
2019-05-14