关于java:Kafka事务入门使用示例

33次阅读

共计 2230 个字符,预计需要花费 6 分钟才能阅读完成。

kafka 从 0.2.11 版本开始反对事务,本文档对 kafka 事务作一个简略的阐明,同时给出 java 代码示例,并对代码做一些简略的阐明,同时阐明相干的注意事项。心愿能对须要应用 kafka 事务的敌人有帮忙。

2017 年 6 月 28 日,Kafka 官网公布了 0.11.0.0 的版本,从这个版本开始,kafka 反对了事务。那么,什么是 kafka 中的事务呢?

kafka 事务反对生产者可能将一组音讯作为单个事务发送,该事务要么原子地胜利要么失败。举个例子,用户领取了某个订单,订单领取后,须要告诉库存模块去缩小库存,同时须要告诉优惠券模型去扣减优惠券,这两个音讯你须要让它们要么都胜利,要么都失败。这个时候就能够应用 kafka 事务。

咱们先看代码。
第一步:引入依赖,在 pom.xml 中减少 kafka client 的依赖

 <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

第二步:发送音讯相干代码

Properties props = new Properties();
props.put("bootstrap.servers", "10.20.200.166:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions();

int c =0;
while (true) {if(c++>=3){break;}
    try {producer.beginTransaction();

        ProducerRecord<String, String> record = new ProducerRecord<>("TEST1", "message a" + c);
        producer.send(record);
        System.out.println("send TEST1" + c);

        ProducerRecord<String, String> record2 = new ProducerRecord<>("TEST2", "message b" + c);
        producer.send(record2);
        System.out.println("send TEST2" + c);

        producer.commitTransaction();}catch (RuntimeException e){System.out.println(e.getMessage());
        producer.abortTransaction();}
}
  1. 后面 6 行,是为了构建一个 KafkaProducer 实例,在 spring boot 的工程里,这个总分通常会封装成一个办法,而后通过 @Bean 实现对象的注入治理。其中 bootstrap.servers 对应你的 kafka 服务器的地址,须要按你本地的理论状况批改,这一类地址通常也应该放到配置文件里。
  2. 第 8 行 producer.initTransactions(); 示意对 kafka 事务进行初始化,这个办法对于 每个 producer 调用一次就能够了。
  3. 接下来是一个循环管制,这里是想表白 能够重复使用 producer 对象,发送屡次事务音讯。对于每一次的 事务 以 producer.beginTransaction() 示意一次事务的开始,producer.send(record) 示意这一次事务里,须要发送的音讯,每个事务里,send 办法能够屡次调用,具休取决于业务需要,而后通过 producer.commitTransaction() 提交事务,如果产生了异样,则通过 producer.abortTransaction() 来勾销事务。

注意事项:

  1. transactional.id 的取值是不能反复的,如果你的环境里,只有繁多节点,那这个值间接用一个固定的字符串就能够了。然而如果你的程序须要反对横向扩大,比方:同时有两个或者更多服务器同时运行你的代码,这个时候就会出问题。对于同样的 transactional.id, 在一个新的 KafkaProducer 调用 initTransactions 后,原来的过程就会报错。只有最新的过程能失常工作。所以这个时候,你须要保障不同的节点运行时,取到的 transactional.id 的值是不一样的。你能够应用 UUID.randomUUID().toString() 来生成一个保障不反复的随机 ID,或者 间接在不同的实例的服务器里配置不同的 transactional.id。而对于同一个节点的运行,屡次事务,是能够应用同一个 KafkaProducer 的,也就能够应用一样的 transactional.id。

参考文档:
https://www.confluent.io/blog…

正文完
 0