关于java:一起讨论下消息幂等去重通用解决方案

24次阅读

共计 6864 个字符,预计需要花费 18 分钟才能阅读完成。

消息中间件是分布式系统罕用的组件,无论是异步化、解耦、削峰等都有宽泛的利用价值。咱们通常会认为,消息中间件是一个牢靠的组件——这里所谓的牢靠是指,只有我把音讯胜利投递到了消息中间件,音讯就不会失落,即音讯必定会至多保障音讯能被消费者胜利生产一次,这是消息中间件最根本的个性之一,也就是咱们常说的“AT LEAST ONCE”,即音讯至多会被“胜利生产一遍”。

举个例子,一个音讯 M 发送到了消息中间件,音讯投递到了生产程序 A,A 承受到了音讯,而后进行生产,但在生产到一半的时候程序重启了,这时候这个音讯并没有标记为生产胜利,这个音讯还会持续投递给这个消费者,直到其生产胜利了,消息中间件才会进行投递。

然而这种牢靠的个性导致,音讯可能被屡次地投递。举个例子,还是刚刚这个例子,程序 A 承受到这个音讯 M 并实现生产逻辑之后,正想告诉消息中间件“我曾经生产胜利了”的时候,程序就重启了,那么对于消息中间件来说,这个音讯并没有胜利生产过,所以他还会持续投递。这时候对于应用程序 A 来说,看起来就是这个音讯明明生产胜利了,然而消息中间件还在反复投递。

这在 RockectMQ 的场景来看,就是同一个 messageId 的音讯反复投递下来了。

基于音讯的投递牢靠(音讯不丢)是优先级更高的,所以音讯不重的工作就会转移到应用程序自我实现,这也是为什么 RocketMQ 的文档里强调的,生产逻辑须要自我实现幂等。背地的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但音讯反复是有解决方案的,而音讯失落是很麻烦的。

简略的音讯去重解决方案

例如:假如咱们业务的音讯生产逻辑是:插入某张订单表的数据,而后更新库存:

insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现音讯的幂等,咱们可能会采取这样的计划:

select * from t_order where order_no = 'order123'

if(order  != null) {return ;// 音讯反复,间接返回}

这对于很多状况下,确实能起到不错的成果,然而在并发场景下,还是会有问题。

并发反复音讯

假如这个生产的所有代码加起来须要 1 秒,有反复的音讯在这 1 秒内(假如 100 毫秒)内达到(例如生产者疾速重发,Broker 重启等),那么很可能,下面去重代码外面会发现,数据仍然是空的(因为上一条音讯还没生产完,还没胜利更新订单状态),

那么就会穿透掉查看的挡板,最初导致反复的音讯生产逻辑进入到非幂等平安的业务代码中,从而引发反复生产的问题(如主键抵触抛出异样、库存被反复扣减而没开释等)

并发去重的解决方案之一

要解决下面并发场景下的音讯幂等问题,一个可取的计划是开启事务把 select 改成 select for update 语句,把记录进行锁定。

select * from t_order where order_no = 'THIS_ORDER_NO' for update  // 开启事务
if(order.status != null) {return ;// 音讯反复,间接返回}

但这样生产的逻辑会因为引入了事务包裹而导致整个音讯生产可能变长,并发度降落。

当然还有其余更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则音讯从新生产之类的。但这须要针对具体业务场景做更简单和粗疏的代码开发、库表设计,不在本文探讨的范畴。

但无论是 select for update,还是乐观锁这种解决方案,实际上都是基于业务表自身做去重,这无疑减少了业务开发的复杂度,一个业务零碎外面很大部分的申请解决都是依赖 MQ 的,如果每个生产逻辑自身都须要基于业务自身而做去重 / 幂等的开发的话,这是繁琐的工作量。本文心愿摸索出一个通用的音讯幂等解决的办法,从而形象出肯定的工具类用以实用各个业务场景。

Exactly Once

在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即音讯必定会被胜利生产,并且只会被生产一次。以下是阿里云里对 Exactly Once 的解释:

Exactly-Once 是指发送到音讯零碎的音讯只能被生产端解决且仅解决一次,即便生产端重试音讯发送导致某音讯反复投递,该音讯在生产端也只被生产一次。

在咱们业务音讯幂等解决的畛域内,能够认为业务音讯的代码必定会被执行,并且只被执行一次,那么咱们能够认为是 Exactly Once。

但这在分布式的场景下想找一个通用的计划简直是不可能的。不过如果是针对基于数据库事务的生产逻辑,实际上是可行的。

基于关系数据库事务插入音讯表

假如咱们业务的音讯生产逻辑是:更新 MySQL 数据库的某张订单表的状态:

update t_order set status = 'SUCCESS' where order_no= 'order123';

要实现 Exaclty Once 即这个音讯只被生产一次(并且必定要保障能生产一次),咱们能够这样做:在这个数据库中减少一个音讯生产记录表,把音讯插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保障音讯只会被生产一遍了。

1、开启事务
2、插入音讯表(解决好主键抵触的问题)
3、更新订单表(原生产逻辑)
4、提交事务

阐明:

1、这时候如果音讯生产胜利并且事务提交了,那么音讯表就插入胜利了,这时候就算 RocketMQ 还没有收到生产位点的更新再次投递,也会插入音讯失败而视为曾经生产过,后续就间接更新生产位点了。这保障咱们生产代码只会执行一次。2、如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,音讯表也没插入胜利;而对于 RocketMQ 服务端来说,生产位点也没更新,所以音讯还会持续投递下来,投递下来发现这个音讯插入音讯表也是胜利的,所以能够持续生产。这保障了音讯不失落。

事实上,阿里云 ONS 的 EXACTLY-ONCE 语义的实现上,就是相似这个计划基于数据库的事务个性实现的。更多详情可参考:https://help.aliyun.com/document\_detail/102777.html

基于这种形式,确实这是有能力拓展到不同的利用场景,因为他的实现计划与具体业务自身无关——而是依赖一个音讯表。

然而这里有它的局限性

1、音讯的生产逻辑必须是依赖于关系型数据库事务。如果生产的生产过程中还波及其余数据的批改,例如 Redis 这种不反对事务个性的数据源,则这些数据是不可回滚的。
2、数据库的数据必须是在一个库,跨库无奈解决

注:业务上,音讯表的设计不应该以音讯 ID 作为标识,而应该以业务的业务主键作为标识更为正当,以应答生产者的重发。阿里云上的音讯去重只是 RocketMQ 的 messageId,在生产者因为某些起因手动重发(例如上游针对一个交易反复申请了)的场景下起不到去重 / 幂等的成果(因音讯 id 不同)。

更简单的业务场景

如上所述,这种形式 Exactly Once 语义的实现,实际上有很多局限性,这种局限性使得这个计划根本不具备广泛应用的价值。并且因为基于事务,可能导致锁表工夫过长等性能问题。

例如咱们以一个比拟常见的一个订单申请的音讯来举例,可能有以下几步(以下统称为步骤 X):

1、查看库存(RPC)
2、锁库存(RPC)
3、开启事务,插入订单表(MySQL)
4、调用某些其余上游服务(RPC)
5、更新订单状态
6、commit 事务(MySQL)

这种状况下,咱们如果采取音讯表 + 本地事务的实现形式,音讯生产过程中很多子过程是不反对回滚的,也就是说就算咱们加了事务,实际上这背地的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经验了第二步锁库存的时候,服务重启了,这时候实际上库存是曾经在另外的服务里被锁定了,这并不能被回滚。当然音讯还会再次投递下来,要保障音讯能至多生产一遍,换句话说,锁库存的这个 RPC 接口自身仍旧要反对“幂等”。

再者,如果在这个比拟耗时的长链条场景下退出事务的包裹,将大大的升高零碎的并发。所以通常状况下,咱们解决这种场景的音讯去重的办法还是会应用一开始说的业务本人实现去重逻辑的形式,如后面加 select for update,或者应用乐观锁。

那咱们有没有办法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

拆解音讯执行过程

其中一个思路是把下面的几步,拆解成几个不同的子音讯,例如:

1、库存零碎生产 A:查看库存并做锁库存,发送音讯 B 给订单服务
2、订单零碎生产音讯 B:插入订单表(MySQL),发送音讯 C 给本人(上游零碎)生产
3、上游零碎生产音讯 C:解决局部逻辑,发送音讯 D 给订单零碎
4、订单零碎生产音讯 D:更新订单状态

注:上述步骤须要保障本地事务和音讯是一个事务的(至多是最终一致性的),这其中波及到分布式事务音讯相干的话题,不在本文阐述。

能够看到这样的解决办法会使得每一步的操作都比拟原子,而原子则意味着是小事务,小事务则意味着应用音讯表 + 事务的计划显得可行。

然而,这太简单了!这把一个原本间断的代码逻辑割裂成多个零碎屡次音讯交互!那还不如业务代码层面上加锁实现呢。

更通用的解决方案

下面音讯表 + 本地事务的计划之所以有其局限性和并发的短板,究其基本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个音讯生产的环节。

如果咱们能不依赖事务而实现音讯的去重,那么计划就能推广到更简单的场景例如:RPC、跨库等。

例如,咱们仍旧应用音讯表,然而不依赖事务,而是针对音讯表减少生产状态,是否能够解决问题呢?

基于音讯幂等表的非事务计划

67\_1.png

以上是去事务化后的音讯幂等计划的流程,能够看到,此计划是无事务的,而是针对音讯表自身做了状态的辨别:生产中、生产实现。只有生产实现的音讯才会被幂等解决掉。而对于已有生产中的音讯,前面反复的音讯会触发提早生产(在 RocketMQ 的场景下即发送到 RETRY TOPIC),之所以触发提早生产是为了管制并发场景下,第二条音讯在第一条音讯没实现的过程中,去管制音讯不丢(如果间接幂等,那么会失落音讯(同一个音讯 id 的话),因为上一条音讯如果没有生产实现的时候,第二条音讯你曾经通知 broker 胜利了,那么第一条音讯这时候失败 broker 也不会从新投递了)

下面的流程不再细说,后文有 github 源码的地址,读者能够参考源码的实现,这里咱们回头看看咱们一开始想解决的问题是否解决了:

1、音讯曾经生产胜利了,第二条音讯将被间接幂等解决掉(生产胜利)。
2、并发场景下的音讯,仍旧能满足不会呈现音讯反复,即穿透幂等挡板的问题。
3、反对上游业务生产者重发的业务反复的音讯幂等问题。

对于第一个问题曾经很显著曾经解决了,在此就不探讨了。

对于第二个问题是如何解决的?次要是依附插入音讯表的这个动作做管制的,假如咱们用 MySQL 作为音讯表的存储媒介(设置音讯的惟一 ID 为主键),那么插入的动作只有一条音讯会胜利,前面的音讯插入会因为主键抵触而失败,走向提早生产的分支,而后前面提早生产的时候就会变成下面第一个场景的问题。

对于第三个问题,只有咱们设计去重的音讯键让其反对业务的主键(例如订单号、申请流水号等),而不仅仅是 messageId 即可。所以也不是问题。

此计划是否有音讯失落的危险?

如果仔细的读者可能会发现这里实际上是有逻辑破绽的,问题出在下面聊到的个三问题中的第 2 个问题(并发场景),在并发场景下咱们依赖于音讯状态是做并发管制使得第 2 条音讯反复的音讯会一直提早生产(重试)。但如果这时候第 1 条音讯也因为一些异样起因(例如机器重启了、内部异样导致生产失败)没有胜利生产胜利呢?也就是说这时候提早生产实际上每次下来看到的都是_生产中_的状态,最初生产就会被视为生产失败而被投递到死信 Topic 中(RocketMQ 默认能够反复生产 16 次)。

有这种顾虑是正确的!对于此,咱们解决的办法是,插入的音讯表必须要带一个最长生产过期工夫,例如 10 分钟,意思是如果一个音讯处于_生产中_超过 10 分钟,就须要从音讯表中删除(须要程序自行实现)。所以最初这个音讯的流程会是这样的:

67\_2.png

更灵便的音讯表存储媒介

咱们这个计划实际上没有事务的,只须要一个存储的核心媒介,那么天然咱们能够抉择更灵便的存储媒介,例如 Redis。应用 Redis 有两个益处:

1、性能上损耗更低
2、下面咱们讲到的超时工夫能够间接利用 Redis 自身的 ttl 实现

当然 Redis 存储的数据可靠性、一致性等方面是不如 MySQL 的,须要用户本人取舍。

源码:RocketMQDedupListener

以上计划针对 RocketMQ 的 Java 实现曾经开源放到 Github 中,具体的应用文档能够参考 https://github.com/Jaskey/Roc…,

以下仅贴一个 Readme 中利用 Redis 去重的应用样例,用以意业务中如果应用此工具退出音讯去重幂等的是如许简略:

// 利用 Redis 做幂等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");

String appName = consumer.getConsumerGroup();// 大部分状况下可间接应用 consumer group 名
StringRedisTemplate stringRedisTemplate = null;// 这里省略获取 StringRedisTemplate 的过程
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

consumer.registerMessageListener(messageListener);
consumer.start();

以上代码大部分是原始 RocketMQ 的必须代码,惟一须要批改的仅仅是创立一个 DedupConcurrentListener 示例,在这个示例中指明你的生产逻辑和去重的业务键(默认是 messageId)。

更多应用详情请参考 Github 上的阐明。

这种实现是否一劳永逸?

实现到这里,仿佛计划挺完满的,所有的音讯都能疾速的接入去重,且与具体业务实现也齐全解耦。那么这样是否就完满的实现去重的所有工作呢?

很惋惜,其实不是的。起因很简略:因为要保障音讯至多被胜利生产一遍,那么音讯就有机会生产到一半的时候失败触发音讯重试的可能。还是以下面的订单流程 X:

1、查看库存(RPC)
2、锁库存(RPC)
3、开启事务,插入订单表(MySQL)
4、调用某些其余上游服务(RPC)
5、更新订单状态
6、commit 事务(MySQL)

当音讯生产到步骤 3 的时候,咱们假如 MySQL 异样导致失败了,触发音讯重试。因为在重试前咱们会删除幂等表的记录,所以音讯重试的时候就会从新进入生产代码,那么步骤 1 和步骤 2 就会从新再执行一遍。如果步骤 2 自身不是幂等的,那么这个业务音讯生产仍旧没有做好残缺的幂等解决。

本实现形式的价值?

那么既然这个并不能残缺的实现音讯幂等,还有什么价值呢?价值可就大了!尽管这不是解决音讯幂等的银弹(事实上,软件工程畛域里根本没有银弹),然而他能以便捷的伎俩解决:

1、各种因为 Broker、负载平衡等起因导致的音讯重投递的反复问题

2、各种上游生产者导致的业务级别音讯反复问题

3、反复音讯并发生产的管制窗口问题,就算反复,反复也不可能同一时间进入生产逻辑

一些其余的音讯去重的倡议

也就是说,应用这个办法能保障失常的生产逻辑场景下(无异样,无异样退出),音讯的幂等工作全副都能解决,无论是业务反复,还是 rocketmq 个性带来的反复。

事实上,这曾经能解决 99% 的音讯反复问题了,毕竟异样的场景必定是多数的。那么如果心愿异样场景下也能解决好幂等的问题,能够做以下工作升高问题率:

1、音讯生产失败做好回滚解决。如果音讯生产失败自身是带回滚机制的,那么音讯重试天然就没有副作用了。
2、消费者做好优雅退出解决。这是为了尽可能防止音讯生产到一半程序退出导致的音讯重试。
3、一些无奈做到幂等的操作,至多要做到终止生产并告警。例如锁库存的操作,如果对立的业务流水锁胜利了一次库存,再触发锁库存,如果做不到幂等的解决,至多要做到音讯生产触发异样(例如主键抵触导致生产异样等)
4、在 #3 做好的前提下,做好音讯的生产监控,发现音讯重试一直失败的时候,手动做好#1 的回滚,使得下次重试生产胜利。

正文完
 0