服务幂等性架构设计
- 作者: 博学谷狂野架构师
-
GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)
只分享干货、不吹水,让咱们一起加油!😄
防重表实现幂等
对于避免数据反复提交,还有一种解决方案就是通过防重表实现。
防重表的实现思路也非常简单,首先创立一张表作为防重表,同时在该表中建设一个或多个字段的惟一索引作为防重字段,用于保障并发状况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则示意是反复数据。
为什么不必乐观锁
对于防重表的解决方案,可能有人会说为什么不应用乐观锁,乐观锁在应用的过程中也是会产生死锁的。
乐观锁是通过锁表的形式实现的,假如当初一个用户 A 拜访表 A(锁住了表 A),而后试图拜访表 B;
另一个用户 B 拜访表 B(锁住了表 B),而后试图拜访表 A。这时对于用户 A 来说,因为表 B 曾经被用户 B 锁住了,所以用户 A 必须等到用户 B 开释表 B 能力拜访。
同时对于用户 B 来说,因为表 A 曾经被用户 A 锁住了,所以用户 B 必须等到用户 A 开释表 A 能力拜访。此时死锁就曾经产生了。
浏览业余类书籍是 Java 程序员必备的学习形式之一。通过一直学习和积攒,能够一直进步本人的技术能力和职业程度,实现职业倒退的指标。
十分倡议大家重视浏览,并抉择一些有深度、有价值的书籍一直晋升本人的技术水平和能力。这些书籍包含 Java 编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与利用等等。
对于一位 2 - 3 年的 Java 程序员来说,浏览业余类书籍是更加的重要,因为它们能够帮忙你扩大技术广度和深度,进步你的技术能力和职业程度。以下是我给这些程序员的一些倡议:
学会寻找优良的书籍:在抉择书籍时,要抉择那些被宽泛认可和举荐的经典书籍。能够通过搜寻网上的书籍举荐列表,向其余经验丰富的程序员求教,或者参考公司外部的学习打算,来找到好的书籍。
浏览无关设计模式和架构的书籍:对于 Java 程序员来说,把握设计模式和架构准则是十分重要的。能够抉择浏览《设计模式:可复用面向对象软件的根底》、《大话设计模式》、《Java 程序员修炼之道》等书籍来深刻学习。
学习新的技术和框架:Java 技术一直倒退,新的技术和框架也不断涌现。因而,Java 程序员应该定期浏览无关新技术和框架的书籍,比方 Spring、Spring Boot、MyBatis、Netty 等。
学习算法和数据结构:算法和数据结构是编程根底,把握这些常识能够进步代码的品质和效率。能够抉择浏览《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源我的项目和源代码:浏览开源我的项目和源代码是十分无益的,能够学习到其余程序员的编码技巧和设计思路。能够抉择一些出名的开源我的项目,如 Spring、MyBatis 等来进行学习。
当然,我也晓得,光是倡议是不足以激发大家学习的能源的,所以,书也我也帮大家整顿好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你本人了。
以下这份蕴含 46 本 Java 程序员必备经典的书单(豆瓣评分 8 分 +),是我破费一个月工夫整顿的:GitHub:GitHub 地址
惟一主键实现幂等
数据库 惟一主键 的实现次要是利用数据库中主键惟一束缚的个性,一般来说惟一主键比拟实用于“插入”时的幂等性,其能保障一张表中只能存在一条带该惟一主键的记录。
应用数据库惟一主键实现幂等性时须要留神的是,该主键一般来说并不是应用数据库中自增主键,而是应用分布式 ID 充当主键,这样能力能保障在分布式环境下 ID 的全局唯一性。
对于一些后盾零碎,并发量并不高的状况下,对于幂等的实现非常简单,通过这种思维即可实现幂等管制。
实用场景
- 插入操作
- 删除操作
应用限度
- 须要生成全局惟一主键 ID;
次要流程
次要流程如下:
- 客户端执行创立申请,调用服务端接口。
- 服务端执行业务逻辑,生成一个分布式
ID
,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的SQL
语句。 -
服务端将该条数据插入数据库中,如果插入胜利则示意没有反复调用接口。如果抛出主键反复异样,则示意数据库中曾经存在该条记录,返回错误信息到客户端。
在业务执行前,先判断是否曾经操作过,如果没有则执行,否则判断为反复操作。
成果演示
在并发下拜访时,因为是基于 id 进行判断,那 id 值就必须要保障在屡次提交时,须要惟一。拜访流程如下:
@Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {order.setCreateTime(new Date());
order.setUpdateTime(new Date());
// 查问
Order orderResult = orderMapper.selectByPrimaryKey(order.getId());
Optional<Order> orderOptional = Optional.ofNullable(orderResult);
if (orderOptional.isPresent()){return "repeat request";}
int result = orderMapper.insert(order);
if (result != 1){return "fail";}
return "success";
}
对于上述性能实现,在并发下,并不能实现幂等性管制。通过 jemeter 测试,模仿 50 个并发,能够发现,插入了反复数据。产生了脏数据。
要解决这个问题,非常简单,在数据库层面增加 惟一索引 即可,将 id 设置为惟一索引,也是最容易想到的形式,一旦 id 呈现反复,就会出现异常,防止了脏数据的产生也能够解决永久性幂等。但该计划无奈用于分库分表状况,其只实用于单表状况。
乐观锁实现幂等性
数据库乐观锁计划个别只能实用于执行 更新操作 的过程,咱们能够提前在对应的数据表中多增加一个字段,充当以后数据的版本标识。
这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。
实用操作
- 更新操作
应用限度
- 须要数据库对应业务表中增加额定字段
问题抛出
扣减库存数据谬误
通过 jemeter 进行测试,能够发现。当模仿一万并发时,最终的库存数量是谬误的。这次要是因为当多线程拜访时,一个线程读取到了另外线程未提交的数据造成。
synchronized 生效问题
对于当初的问题,暂不思考秒杀设计、队列申请串行化等,只思考如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized。
依据 synchronized 定义,当多线程并发拜访时,会对以后加锁的办法产生阻塞,从而保障线程平安,防止脏数据。然而,真的能如预期的一样吗?
@Service
public class StockServiceImpl implements StockService {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public synchronized int lessInventory(String goodsId, int num) {return stockMapper.lessInventory(goodsId, num);
}
}
以后曾经在在办法上增加了 synchronized,对以后办法对象进行了锁定。通过 Jemeter,模仿一万并发对其进行拜访。能够发现,依然呈现了脏数据。
事务导致锁生效
该问题的产生起因,就在于在办法上 synchronized 搭配应用了@Transactional。
首先 synchronized 锁定的是以后办法对象,而 @Transactional 会对以后办法进行 AOP 加强,动静代理出一个代理对象,在办法执行前开启事务,执行后提交事务。
所以 synchronized 和@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional 的事务操作并不在 synchronized 锁定范畴之内。
假如 A 线程执行完扣减库存办法,会开释锁并提交事务。但 A 线程开释锁但还没提交事务前,B 线程执行扣减库存办法,B 线程执行后,和 A 线程一起提交事务,就呈现了线程平安问题,造成脏数据的呈现。
乐观锁保障幂等
基于版本号实现
MySQL 乐观锁是基于数据库实现分布式锁的一种实现,实现的形式有两种:
- 基于版本号
- 基于条件
然而实现思维都是基于 MySQL 的行锁思维来实现的。
- 批改数据表,增加 version 字段,默认值为 0
- 批改 StockMapper 增加基于版本批改数据办法
@Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
- 测试模仿一万并发进行数据批改,此时能够发现以后版本号从 0 变为 1,且库存量正确。
基于条件实现
通过版本号管制是一种十分常见的形式,适宜于大多数场景。
但当初库存扣减的场景来说,通过版本号管制就是多人并发拜访购买时,查问时显示能够购买,但最终只有一个人能胜利,这也是不能够的。其实最终只有商品库存不产生超卖就能够。那此时就能够通过条件来进行管制。
- 批改StockMapper:
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
- 批改StockController:
@PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){int result = stockService.lessInventoryByVersionOut(goodsId, num);
if (result == 1){System.out.println("购买胜利");
return "success";
}
System.out.println("购买失败");
return "fail";
}
- 通过 jemeter 进行测试,能够发现当多人并发扣减库存时,管制住了商品超卖的问题。
乐观锁实现幂等性
在零碎中,不光要保障客户端拜访的幂等性,同时还要保障服务间幂等。
比拟常见的状况,当服务间进行调用时,因为网络抖动等起因呈现超时,则很有可能呈现数据谬误。此时在分布式环境下,就须要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中 MySQL 乐观锁就是其中一种实现。
feign 超时重试成果演示
以上图为例,当客户端要生成订单时,能够基于 token 机制保障生成订单的幂等性,接着订单生成胜利后,还会基于 feign 调用库存服务进行库存扣减,此时则很有可能呈现,库存服务执行扣减库存胜利,然而当后果返回时,呈现网络抖动超时了,那么上游的订单服务则很有可能会发动重试,此时如果不进行扣减库存的幂等性保障的话,则呈现扣减库存执行屡次。
那能够先来演示当上游服务呈现提早,上游服务基于 feign 进行重试的成果。
- 以后是 order 调用 feign,所以在 order 中会存在 feignConfigure 配置类,用于配置超时工夫与重试次数。
/**
* 自定义 feign 超时工夫、重试次数
* 默认超时为 10 秒,不会进行重试。*/
@Configuration
public class FeignConfigure {
// 超时工夫,工夫单位毫秒
public static int connectTimeOutMillis = 5000;
public static int readTimeOutMillis = 5000;
@Bean
public Request.Options options() {return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
}
// 自定义重试次数
@Bean
public Retryer feignRetryer(){Retryer retryer = new Retryer.Default(100, 1000, 4);
return retryer;
}
}
-
stock 服务的 StockController 中 demo 办法会提早六秒。
通过这种形式模仿超时成果。此时在 order 中调用 stock 服务,能够发现,order 服务会对 stock 服务调用四次。
这里就演示了服务间调用超时的成果,当上游服务超时,上游服务会进行重试。
服务调用超时库存屡次扣减
根据上述演示,当上游服务超时,上游服务就会进行重试。
那么联合以后的业务场景,当用户下单胜利去调用库存服务扣减库存时,如果库存服务执行扣减库存胜利但返回后果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会呈现同一订单商品库存被屡次扣减。
- 在订单服务中生成订单,并调用库存服务扣减库存
@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);
if (result != 1){System.out.println("fail");
return "fail";
}
// 生成订单详情信息
List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);
goodsIdArray.stream().forEach(goodsId->{
// 插入订单详情
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setGoodsId(goodsId);
orderDetail.setOrderId(orderId);
orderDetail.setGoodsName("heima");
orderDetail.setGoodsNum(1);
orderDetail.setGoodsPrice(1);
orderDetailService.addOrderDetail(orderDetail);
// 扣减库存(不思考锁)
stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());
});
return "success";
}
- 库存服务间接基于商品信息进行库存扣减
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num) throws InterruptedException {System.out.println("reduce stock");
int result = stockService.reduceStockNoLock(goodsId, num);
if (result != 1){return "reduce stock fail";}
// 提早
TimeUnit.SECONDS.sleep(6000);
return "reduce stock success";
}
- 执行生成订单扣减库存,此时能够发现扣减库存办法被执行屡次,并且库存数量也被扣减了屡次
{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
乐观锁解决服务间重试保障幂等
- 批改 StockMapper,增加乐观锁管制管制库存
@Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
- 批改 StockController,增加乐观锁扣减库存办法
/**
* 乐观锁扣减库存
* @param goodsId
* @param num
* @param version
* @return
*/
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num,
@PathVariable("version") Integer version) throws InterruptedException {System.out.println("exec reduce stock");
int result = stockService.reduceStock(goodsId, num, version);
if (result != 1){
// 扣减失败
return result;
}
// 提早
TimeUnit.SECONDS.sleep(6000);
return result;
}
- 测试,能够发现尽管产生多次重试,然而库存只会被扣减胜利一次。保障了服务间的幂等性。
ps:order 服务出现异常,是因为 order 服务会超时重试四次,但 stock 服务的提早每一次都是超过超时工夫的,所以最终在 order 服务才会呈现 read timeout 异样提醒。
音讯幂等
在零碎中当应用音讯队列时,无论做哪种技术选型,有很多问题是无论如何也不能漠视的,如:音讯必达、音讯幂等等。本章节以典型的 RabbitMQ 为例,解说如何保障音讯幂等的可施行解决方案,其余 MQ 选型均可参考。
音讯重试演示
音讯队列的音讯幂等性,次要是由 MQ 重试机制引起的。
因为音讯生产者将音讯发送到 MQ-Server 后,MQ-Server 会将音讯推送到具体的音讯消费者。假如因为网络抖动或出现异常时,MQ-Server 依据重试机制就会将音讯从新向音讯消费者推送,造成音讯消费者屡次收到雷同音讯,造成数据不统一。
在 RabbitMQ 中,音讯重试机制是默认开启的,但只会在 consumer 出现异常时,才会反复推送。在应用中,异样的呈现有可能是因为生产方又去调用第三方接口,因为网络抖动而造成异样,然而这个异样有可能是临时的。所以当消费者出现异常,能够让其重试几次,如果重试几次后,依然有异样,则须要进行数据弥补。
数据弥补计划:当重试屡次后依然出现异常,则让此条音讯进入死信队列,最终进入到数据库中,接着设置定时 job 查问这些数据,进行手动弥补。
本节中以 consumer 生产异样为演示主体,因而须要批改 RabbitMQ 配置文件。
批改配置文件
批改 consumer 一方的配置文件
# 消费者监听相干配置
listener:
simple:
retry:
# 开启消费者 (程序出现异常) 重试机制,默认开启并始终重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
设置生产异样
当 consumer 音讯监听类中增加异样,最终承受音讯时,能够发现,音讯在接管五次后,最终出现异常。
音讯幂等解决
要保障音讯幂等性的话,其实最终要解决的就是保障屡次操作,造成的影响是雷同的。那么其解决方案的思路与服务间幂等的思路其实根本都是统一的。
- 音讯防重表,解决思路与服务间幂等的防重表统一。
- redis:利用 redis 防重。
这两种计划是最常见的解决方案。其实现思路其实都是统一的。
代码实现
批改 OrderController
/**
* 此处为了不便演示,不做根底增加数据库操作
* @return
*/
@PostMapping("/addOrder")
public String addOrder(){String uniqueKey = String.valueOf(idWorker.nextId());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
return "success";
}
批改 stockApplication
@Bean
public JedisPool jedisPool(){return new JedisPool("192.168.200.150",6379);
}
新增音讯监听类
@Component
public class ReduceStockListener {
@Autowired
private StockService stockService;
@Autowired
private JedisPool jedisPool;
@Autowired
private StockFlowService stockFlowService;
@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message){
// 获取音讯 id
String messageId = message.getMessageProperties().getMessageId();
Jedis jedis = jedisPool.getResource();
System.out.println(messageId);
try {
//redis 锁去重校验
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){System.out.println("反复申请");
return;
}
//mysql 状态校验
if (!(stockFlowService.findByFlag(messageId).size() == 0)){System.out.println("数据已解决");
return;
}
String goodsId = null;
try {
// 获取音讯体中 goodsId
goodsId = new String(message.getBody(),"utf-8");
stockService.reduceStock(goodsId,messageId);
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
int nextInt = new Random().nextInt(100);
System.out.println("随机数:"+nextInt);
if (nextInt%2 ==0){int i= 1/0;}
} catch (RuntimeException e) {
// 解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出现异常了");
System.out.println(messageId+":开释锁");
throw e;
}
}
}
音讯缓冲区
对于 RabbitMQ 的应用,默认状况下,每条音讯都会进行别离的 ack 告诉,生产完一条后,再来生产下一条。然而这样就会造成大量音讯的阻塞状况。所以为了晋升消费者对于音讯的生产速度,能够减少 consumer 数据或者对音讯进行批量生产。MQ 接管到 producer 发送的音讯后,不会间接推送给 consumer。而是积攒到肯定数量后,再进行音讯的发送。这种形式的实现,能够了解为是一种缓冲区的实现,晋升了音讯的生产速度,然而会在肯定水平上舍弃后果返回的实时性。
对于批量生产来说,也是须要思考幂等的。对于幂等性的解决方案,沿用方才的思路即可解决。
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!