关于kafka:消息队列的消费幂等性如何保证

35次阅读

共计 5740 个字符,预计需要花费 15 分钟才能阅读完成。

什么是幂等?

任意屡次执行所产生的影响均与一次执行的影响雷同就能够称为幂等

什么是音讯幂等?

当呈现消费者对某条音讯反复生产的状况时,反复生产的后果与生产一次的后果是雷同的,并且屡次生产并未对业务零碎产生任何负面影响

为什么咱们要保障幂等性,不保障幂等性,会不会有问题?

这个问题其实没法精确答复。答复这个问题的本源得从 业务场景 上进行剖析。比方失常业务状况下,咱们是不容许同个订单反复领取, 这种业务场景咱们就须要确保幂等性。再比方日志记录,这种业务场景,咱们可能就不须要做幂等判断。

因而是否要保障幂等性,得基于 业务 进行考量

音讯队列的生产幂等性如何保障?

没法保障。后面说了要保障幂等性,得基于 业务场景 进行考量。音讯队列他自身就不是给你用来做业务幂等性用的。如果你要实现业务幂等性,靠音讯队列是没法帮你实现的,你本人得依据本身业务场景,来实现幂等。

罕用的业务幂等性保障办法

1、利用数据库的惟一束缚实现幂等

比方将订单表中的订单编号设置为惟一索引,创立订单时,依据订单编号就能够保障幂等

2、去重表

这个计划实质也是依据数据库的唯一性束缚来实现。其实现大体思路是:首先在去重表上建惟一索引,其次操作时把业务表和去重表放在同个本地事务中,如果呈现重现反复生产,数据库会抛惟一束缚异样,操作就会回滚

3、利用 redis 的原子性

每次操作都间接 set 到 redis 外面,而后将 redis 数据定时同步到数据库中

4、多版本(乐观锁)管制

此计划多用于更新的场景下。其实现的大体思路是:给业务数据减少一个版本号属性,每次更新数据前,比拟以后数据的版本号是否和音讯中的版本统一,如果不统一则回绝更新数据,更新数据的同时将版本号 +1

5、状态机机制

此计划多用于更新且业务场景存在多种状态流转的场景

6、token 机制

生产者发送每条数据的时候,减少一个全局惟一的 id,这个 id 通常是 业务的惟一标识,比方订单编号。在生产端生产时,则验证该 id 是否被生产过,如果还没生产过,则进行业务解决。解决完结后,在把该 id 存入 redis,同时设置状态为已生产。如果曾经生产过了,则不进行解决。

演示

例子应用 springboot2 加 kafka 来演示一下应用 token 机制如何实现生产端幂等

1、application.yml

spring:
  redis:
    host: localhost
    port: 6379
    # 连贯超时工夫(毫秒)timeout: 10000
    jedis:
      pool:
        # 连接池中的最大闲暇连贯
        max-idle: 8
        # 连接池中的最小闲暇连贯
        min-idle: 10
        # 连接池最大连接数(应用负值示意没有限度)max-active: 100
        # 连接池最大阻塞等待时间(应用负值示意没有限度)max-wait: -1
    password:
  kafka:
    # 以逗号分隔的地址列表,用于建设与 Kafka 集群的初始连贯(kafka 默认的端口号为 9092)
    bootstrap-servers: localhost:9092
    producer:
      # 产生谬误后,音讯重发的次数。retries: 0
      #当有多个音讯须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次能够应用的内存大小,依照字节数计算。batch-size: 16384
      # 设置生产者内存缓冲区的大小。buffer-memory: 33554432
      # 键的序列化形式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化形式
      value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer
      # acks=0:生产者在胜利写入音讯之前不会期待任何来自服务器的响应。# acks=1:只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。# acks=all:只有当所有参加复制的节点全副收到音讯时,生产者才会收到一个来自服务器的胜利响应。acks: 1
    consumer:
      # 主动提交的工夫距离 在 spring boot 2.X 版本中这里采纳的是值的类型为 Duration 须要合乎特定的格局,如 1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下该作何解决:# latest(默认值)在偏移量有效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest:在偏移量有效的状况下,消费者将从起始地位读取分区的记录
      auto-offset-reset: earliest
      # 是否主动提交偏移量,默认值是 true, 为了避免出现反复数据和数据失落,能够把它设置为 false, 而后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化形式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化形式
      value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer
    listener:
      # 在侦听器容器中运行的线程数。concurrency: 1
      #listner 负责 ack,每调用一次,就立刻 commit
      ack-mode: manual_immediate

2、实现 kafka 的自定义序列和反序列

:kakfa 默认的序列化和反序列形式是 StringSerializer 和 StringDeserializer。咱们要革新成反对对象的序列化和反序列化

a、序列化

public class ObjectSerializer implements Serializer<Object> {


    @Override
    public byte[] serialize(String topic, Object object) {return BeanUtils.serialize(object);
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map, boolean b) {}

b、反序列化

public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] bytes) {return BeanUtils.deserialize(bytes);
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map, boolean b) {}}

3、音讯对象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDTO<T> implements Serializable {

    private String messageId;


    private T data;
}

4、生产者

:本例子简略模仿生产者屡次生产同个音讯,进而达到屡次生产的成果

@Slf4j
@Component
public class KafkaProducer implements CommandLineRunner {


    @Autowired
    private KafkaTemplate kafkaTemplate;

    private int threadNum = 2;

    private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

    private CountDownLatch countDownLatch = new CountDownLatch(threadNum);


    @Override
    public void run(String... args) throws Exception {send();
    }


    private void send(){for(int i = 0; i < threadNum; i++){executorService.submit(()->{
                try {countDownLatch.await();
                } catch (InterruptedException e) {log.error(e.getMessage(),e);
                }
                String messageId = "b14701b8-4b08-4bbd-8a4e-70f76a432e99";

                MessageDTO messageDTO = MessageDTO.builder().messageId(messageId).data("hello").build();
                kafkaTemplate.send(Constant.TOPIC,messageDTO);
            });

            countDownLatch.countDown();}

    }
}

5、消费者

@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private RedisUtils redisUtils;

    @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    public void receive(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){boolean isRepeateConsume = checkRepeateConsume(record.value().getMessageId());
        if(isRepeateConsume){log.error("反复生产。。。。");
            // 手工确认
            ack.acknowledge();
            return;
        }


       doBiz(record,ack);
    }

    private boolean checkRepeateConsume(String messageId){Object consumeStatus = redisUtils.get(messageId);
        /**
         * 1、如果 redis 存在音讯 ID,且生产状态为已生产,则阐明是反复生产,此时生产端抛弃该音讯
         */
        if(Objects.nonNull(consumeStatus) && "已生产".equals(consumeStatus.toString())){// log.error("反复生产。。。。");
            return true;
        }

        /**
         * 2、如果 redis 不存在音讯 id,或者状态不是已生产,则从业务方面进行判重
         *
         *  业务判重的能够思考如下办法:
         *  如果该业务是存在状态流转,则采纳状态机策略进行判重。*  如果该业务不是状态流转类型,则在新增时,依据业务设置一个惟一的属性,比方依据订单编号的唯一性;*  更新时,能够采纳多版本策略,在须要更新的业务表上加上版本号
         */
        return checkRepeateByBiz(messageId);
    }



    /**
     * 模仿业务生产
     * @param messageDTO
     * @param ack
     */
    private void doBiz(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){System.out.println("------ 模仿业务解决 -----------");
        System.out.println("-------- 执行业务解决:"+record.value()+"------------");
        System.out.println("--------------1、业务处理完毕 -----------");
        try {redisUtils.setEx(record.value().getMessageId(), "已生产",12, TimeUnit.HOURS);
            System.out.println("-------------2、业务处理完毕后,把全局 ID 存入 redis,并设置值为已生产");
        } catch (Exception e) {e.printStackTrace();
        }
        System.out.println("----------3、业务处理完毕后,生产端手工确认");
        // 手工确认
        ack.acknowledge();}

}

6、成果

2020-08-09 16:25:32.701  INFO 9552 --- [msgId-0-C-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
------ 模仿业务解决 -----------
-------- 执行业务解决:MessageDTO(messageId=b14701b8-4b08-4bbd-8a4e-70f76a432e99, data=hello)------------
--------------1、业务处理完毕 -----------
-------------2、业务处理完毕后,把全局 ID 存入 redis,并设置值为已生产
----------3、业务处理完毕后,生产端手工确认
2020-08-09 16:25:36.021 ERROR 9552 --- [msgId-0-C-1] c.g.l.kafka.consumer.KafkaConsumer       : 反复生产。。。。

总结

音讯队列没法帮你做到生产端的幂等性,生产端的幂等性得基于 业务场景 进行实现。不过音讯队列必须得 保障音讯不能丢,至多保障被生产一次 ,不然音讯都丢了,没数据搞啥业务幂等。在实现生产端解决业务时,要确保生产端是采纳 手工确认应答机制,而不是自动应答机制。这样可能确保生产端一旦业务解决失败,生产者还能再次发送同个音讯给生产端

demo 链接

https://github.com/lyb-geek/s…

正文完
 0