服务幂等性架构设计
- 作者: 博学谷狂野架构师
GitHub:GitHub地址 (有我精心筹备的130本电子书PDF)
只分享干货、不吹水,让咱们一起加油!
防重表实现幂等
对于避免数据反复提交,还有一种解决方案就是通过防重表实现。
防重表的实现思路也非常简单,首先创立一张表作为防重表,同时在该表中建设一个或多个字段的惟一索引作为防重字段,用于保障并发状况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则示意是反复数据。
为什么不必乐观锁
对于防重表的解决方案,可能有人会说为什么不应用乐观锁,乐观锁在应用的过程中也是会产生死锁的。
乐观锁是通过锁表的形式实现的,假如当初一个用户A拜访表A(锁住了表A),而后试图拜访表B;
另一个用户B拜访表B(锁住了表B),而后试图拜访表A。 这时对于用户A来说,因为表B曾经被用户B锁住了,所以用户A必须等到用户B开释表B能力拜访。
同时对于用户B来说,因为表A曾经被用户A锁住了,所以用户B必须等到用户A开释表A能力拜访。此时死锁就曾经产生了。
浏览业余类书籍是Java程序员必备的学习形式之一。通过一直学习和积攒,能够一直进步本人的技术能力和职业程度,实现职业倒退的指标。
十分倡议大家重视浏览,并抉择一些有深度、有价值的书籍一直晋升本人的技术水平和能力。这些书籍包含Java编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与利用等等。
对于一位2-3年的Java程序员来说,浏览业余类书籍是更加的重要,因为它们能够帮忙你扩大技术广度和深度,进步你的技术能力和职业程度。以下是我给这些程序员的一些倡议:
学会寻找优良的书籍:在抉择书籍时,要抉择那些被宽泛认可和举荐的经典书籍。能够通过搜寻网上的书籍举荐列表,向其余经验丰富的程序员求教,或者参考公司外部的学习打算,来找到好的书籍。
浏览无关设计模式和架构的书籍:对于Java程序员来说,把握设计模式和架构准则是十分重要的。能够抉择浏览《设计模式:可复用面向对象软件的根底》、《大话设计模式》、《Java程序员修炼之道》等书籍来深刻学习。
学习新的技术和框架:Java技术一直倒退,新的技术和框架也不断涌现。因而,Java程序员应该定期浏览无关新技术和框架的书籍,比方Spring、Spring Boot、MyBatis、Netty等。
学习算法和数据结构:算法和数据结构是编程根底,把握这些常识能够进步代码的品质和效率。能够抉择浏览《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源我的项目和源代码:浏览开源我的项目和源代码是十分无益的,能够学习到其余程序员的编码技巧和设计思路。能够抉择一些出名的开源我的项目,如Spring、MyBatis等来进行学习。
当然,我也晓得,光是倡议是不足以激发大家学习的能源的,所以,书也我也帮大家整顿好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你本人了。
以下这份蕴含46本Java程序员必备经典的书单(豆瓣评分8分+),是我破费一个月工夫整顿的:GitHub:GitHub地址
惟一主键实现幂等
数据库惟一主键的实现次要是利用数据库中主键惟一束缚的个性,一般来说惟一主键比拟实用于“插入”时的幂等性,其能保障一张表中只能存在一条带该惟一主键的记录。
应用数据库惟一主键实现幂等性时须要留神的是,该主键一般来说并不是应用数据库中自增主键,而是应用分布式 ID 充当主键,这样能力能保障在分布式环境下 ID 的全局唯一性。
对于一些后盾零碎,并发量并不高的状况下,对于幂等的实现非常简单,通过这种思维即可实现幂等管制。
实用场景
- 插入操作
- 删除操作
应用限度
- 须要生成全局惟一主键 ID;
次要流程
次要流程如下:
- 客户端执行创立申请,调用服务端接口。
- 服务端执行业务逻辑,生成一个分布式
ID
,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的SQL
语句。 服务端将该条数据插入数据库中,如果插入胜利则示意没有反复调用接口。如果抛出主键反复异样,则示意数据库中曾经存在该条记录,返回错误信息到客户端。
在业务执行前,先判断是否曾经操作过,如果没有则执行,否则判断为反复操作。
成果演示
在并发下拜访时,因为是基于id进行判断,那id值就必须要保障在屡次提交时,须要惟一。拜访流程如下:
@Override@Transactional(rollbackFor = Exception.class)public String addOrder(Order order) { order.setCreateTime(new Date()); order.setUpdateTime(new Date()); //查问 Order orderResult = orderMapper.selectByPrimaryKey(order.getId()); Optional<Order> orderOptional = Optional.ofNullable(orderResult); if (orderOptional.isPresent()){ return "repeat request"; } int result = orderMapper.insert(order); if (result != 1){ return "fail"; } return "success";}
对于上述性能实现,在并发下,并不能实现幂等性管制。通过jemeter测试,模仿50个并发,能够发现,插入了反复数据。产生了脏数据。
要解决这个问题,非常简单,在数据库层面增加惟一索引即可,将id设置为惟一索引,也是最容易想到的形式,一旦id呈现反复,就会出现异常,防止了脏数据的产生也能够解决永久性幂等。但该计划无奈用于分库分表状况,其只实用于单表状况。
乐观锁实现幂等性
数据库乐观锁计划个别只能实用于执行更新操作的过程,咱们能够提前在对应的数据表中多增加一个字段,充当以后数据的版本标识。
这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。
实用操作
- 更新操作
应用限度
- 须要数据库对应业务表中增加额定字段
问题抛出
扣减库存数据谬误
通过jemeter进行测试,能够发现。当模仿一万并发时,最终的库存数量是谬误的。这次要是因为当多线程拜访时,一个线程读取到了另外线程未提交的数据造成。
synchronized生效问题
对于当初的问题,暂不思考秒杀设计、队列申请串行化等,只思考如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized。
依据synchronized定义,当多线程并发拜访时,会对以后加锁的办法产生阻塞,从而保障线程平安,防止脏数据。然而,真的能如预期的一样吗?
@Servicepublic class StockServiceImpl implements StockService { @Autowired private StockMapper stockMapper; @Override @Transactional(rollbackFor = Exception.class) public synchronized int lessInventory(String goodsId, int num) { return stockMapper.lessInventory(goodsId, num); }}
以后曾经在在办法上增加了synchronized,对以后办法对象进行了锁定。 通过Jemeter,模仿一万并发对其进行拜访。能够发现,依然呈现了脏数据。
事务导致锁生效
该问题的产生起因,就在于在办法上synchronized搭配应用了@Transactional。
首先synchronized锁定的是以后办法对象,而@Transactional会对以后办法进行AOP加强,动静代理出一个代理对象,在办法执行前开启事务,执行后提交事务。
所以synchronized和@Transactional其实操作的是两个不同的对象,换句话说就是@Transactional的事务操作并不在synchronized锁定范畴之内。
假如A线程执行完扣减库存办法,会开释锁并提交事务。但A线程开释锁但还没提交事务前,B线程执行扣减库存办法,B线程执行后,和A线程一起提交事务,就呈现了线程平安问题,造成脏数据的呈现。
乐观锁保障幂等
基于版本号实现
MySQL乐观锁是基于数据库实现分布式锁的一种实现,实现的形式有两种:
- 基于版本号
- 基于条件
然而实现思维都是基于MySQL的行锁思维来实现的。
- 批改数据表,增加version字段,默认值为0
- 批改StockMapper增加基于版本批改数据办法
@Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
- 测试模仿一万并发进行数据批改,此时能够发现以后版本号从0变为1,且库存量正确。
基于条件实现
通过版本号管制是一种十分常见的形式,适宜于大多数场景。
但当初库存扣减的场景来说,通过版本号管制就是多人并发拜访购买时,查问时显示能够购买,但最终只有一个人能胜利,这也是不能够的。其实最终只有商品库存不产生超卖就能够。那此时就能够通过条件来进行管制。
- 批改StockMapper:
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
- 批改StockController:
@PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){ int result = stockService.lessInventoryByVersionOut(goodsId, num); if (result == 1){ System.out.println("购买胜利"); return "success"; } System.out.println("购买失败"); return "fail";}
- 通过jemeter进行测试,能够发现当多人并发扣减库存时,管制住了商品超卖的问题。
乐观锁实现幂等性
在零碎中,不光要保障客户端拜访的幂等性,同时还要保障服务间幂等。
比拟常见的状况,当服务间进行调用时,因为网络抖动等起因呈现超时,则很有可能呈现数据谬误。此时在分布式环境下,就须要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。
feign超时重试成果演示
以上图为例,当客户端要生成订单时,能够基于token机制保障生成订单的幂等性,接着订单生成胜利后,还会基于feign调用库存服务进行库存扣减,此时则很有可能呈现,库存服务执行扣减库存胜利,然而当后果返回时,呈现网络抖动超时了,那么上游的订单服务则很有可能会发动重试,此时如果不进行扣减库存的幂等性保障的话,则呈现扣减库存执行屡次。
那能够先来演示当上游服务呈现提早,上游服务基于feign进行重试的成果。
- 以后是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时工夫与重试次数。
/** * 自定义feign超时工夫、重试次数 * 默认超时为10秒,不会进行重试。 */@Configurationpublic class FeignConfigure { //超时工夫,工夫单位毫秒 public static int connectTimeOutMillis = 5000; public static int readTimeOutMillis = 5000; @Bean public Request.Options options() { return new Request.Options(connectTimeOutMillis, readTimeOutMillis); } //自定义重试次数 @Bean public Retryer feignRetryer(){ Retryer retryer = new Retryer.Default(100, 1000, 4); return retryer; }}
stock服务的StockController中demo办法会提早六秒。
通过这种形式模仿超时成果。此时在order中调用stock服务,能够发现,order服务会对stock服务调用四次。
这里就演示了服务间调用超时的成果,当上游服务超时,上游服务会进行重试。
服务调用超时库存屡次扣减
根据上述演示,当上游服务超时,上游服务就会进行重试。
那么联合以后的业务场景,当用户下单胜利去调用库存服务扣减库存时, 如果库存服务执行扣减库存胜利但返回后果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会呈现同一订单商品库存被屡次扣减。
- 在订单服务中生成订单,并调用库存服务扣减库存
@Idemptent@PostMapping("/genOrder")public String genOrder(@RequestBody Order order){ String orderId = String.valueOf(idWorker.nextId()); order.setId(orderId); order.setCreateTime(new Date()); order.setUpdateTime(new Date()); int result = orderService.addOrder(order); if (result != 1){ System.out.println("fail"); return "fail"; } //生成订单详情信息 List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class); goodsIdArray.stream().forEach(goodsId->{ //插入订单详情 OrderDetail orderDetail = new OrderDetail(); orderDetail.setId(String.valueOf(idWorker.nextId())); orderDetail.setGoodsId(goodsId); orderDetail.setOrderId(orderId); orderDetail.setGoodsName("heima"); orderDetail.setGoodsNum(1); orderDetail.setGoodsPrice(1); orderDetailService.addOrderDetail(orderDetail); //扣减库存(不思考锁) stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum()); }); return "success";}
- 库存服务间接基于商品信息进行库存扣减
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num); @PutMapping("/reduceStockNoLock/{goodsId}/{num}")public String reduceStockNoLock(@PathVariable("goodsId") String goodsId, @PathVariable("num") Integer num) throws InterruptedException { System.out.println("reduce stock"); int result = stockService.reduceStockNoLock(goodsId, num); if (result != 1){ return "reduce stock fail"; } //提早 TimeUnit.SECONDS.sleep(6000); return "reduce stock success"; }
- 执行生成订单扣减库存,此时能够发现扣减库存办法被执行屡次,并且库存数量也被扣减了屡次
{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
乐观锁解决服务间重试保障幂等
- 批改StockMapper,增加乐观锁管制管制库存
@Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
- 批改StockController,增加乐观锁扣减库存办法
/** * 乐观锁扣减库存 * @param goodsId * @param num * @param version * @return */@PutMapping("/reduceStock/{goodsId}/{num}/{version}")public int reduceStock(@PathVariable("goodsId") String goodsId, @PathVariable("num") Integer num, @PathVariable("version") Integer version) throws InterruptedException { System.out.println("exec reduce stock"); int result = stockService.reduceStock(goodsId, num, version); if (result != 1){ //扣减失败 return result; } //提早 TimeUnit.SECONDS.sleep(6000); return result;}
- 测试,能够发现尽管产生多次重试,然而库存只会被扣减胜利一次。保障了服务间的幂等性。
ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的提早每一次都是超过超时工夫的,所以最终在order服务才会呈现read timeout异样提醒。
音讯幂等
在零碎中当应用音讯队列时,无论做哪种技术选型,有很多问题是无论如何也不能漠视的,如:音讯必达、音讯幂等等。本章节以典型的RabbitMQ为例,解说如何保障音讯幂等的可施行解决方案,其余MQ选型均可参考。
音讯重试演示
音讯队列的音讯幂等性,次要是由MQ重试机制引起的。
因为音讯生产者将音讯发送到MQ-Server后,MQ-Server会将音讯推送到具体的音讯消费者。假如因为网络抖动或出现异常时,MQ-Server依据重试机制就会将音讯从新向音讯消费者推送,造成音讯消费者屡次收到雷同音讯,造成数据不统一。
在RabbitMQ中,音讯重试机制是默认开启的,但只会在consumer出现异常时,才会反复推送。在应用中,异样的呈现有可能是因为生产方又去调用第三方接口,因为网络抖动而造成异样,然而这个异样有可能是临时的。所以当消费者出现异常,能够让其重试几次,如果重试几次后,依然有异样,则须要进行数据弥补。
数据弥补计划:当重试屡次后依然出现异常,则让此条音讯进入死信队列,最终进入到数据库中,接着设置定时job查问这些数据,进行手动弥补。
本节中以consumer生产异样为演示主体,因而须要批改RabbitMQ配置文件。
批改配置文件
批改consumer一方的配置文件
# 消费者监听相干配置 listener: simple: retry: # 开启消费者(程序出现异常)重试机制,默认开启并始终重试 enabled: true # 最大重试次数 max-attempts: 5 # 重试间隔时间(毫秒) initial-interval: 3000
设置生产异样
当consumer音讯监听类中增加异样,最终承受音讯时,能够发现,音讯在接管五次后,最终出现异常。
音讯幂等解决
要保障音讯幂等性的话,其实最终要解决的就是保障屡次操作,造成的影响是雷同的。那么其解决方案的思路与服务间幂等的思路其实根本都是统一的。
- 音讯防重表,解决思路与服务间幂等的防重表统一。
- redis:利用redis防重。
这两种计划是最常见的解决方案。其实现思路其实都是统一的。
代码实现
批改OrderController
/** * 此处为了不便演示,不做根底增加数据库操作 * @return */@PostMapping("/addOrder")public String addOrder(){ String uniqueKey = String.valueOf(idWorker.nextId()); MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(uniqueKey); messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding("utf-8"); Message message = new Message("1271700536000909313".getBytes(),messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message); return "success";}
批改stockApplication
@Beanpublic JedisPool jedisPool(){ return new JedisPool("192.168.200.150",6379);}
新增音讯监听类
@Componentpublic class ReduceStockListener { @Autowired private StockService stockService; @Autowired private JedisPool jedisPool; @Autowired private StockFlowService stockFlowService; @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE) @Transactional public void receiveMessage(Message message){ //获取音讯id String messageId = message.getMessageProperties().getMessageId(); Jedis jedis = jedisPool.getResource(); System.out.println(messageId); try { //redis锁去重校验 if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){ System.out.println("反复申请"); return; } //mysql状态校验 if (!(stockFlowService.findByFlag(messageId).size() == 0)){ System.out.println("数据已解决"); return; } String goodsId = null; try { //获取音讯体中goodsId goodsId = new String(message.getBody(),"utf-8"); stockService.reduceStock(goodsId,messageId); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } int nextInt = new Random().nextInt(100); System.out.println("随机数:"+nextInt); if (nextInt%2 ==0){ int i= 1/0; } } catch (RuntimeException e) { //解锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId)); System.out.println("出现异常了"); System.out.println(messageId+":开释锁"); throw e; } }}
音讯缓冲区
对于RabbitMQ的应用,默认状况下,每条音讯都会进行别离的ack告诉,生产完一条后,再来生产下一条。然而这样就会造成大量音讯的阻塞状况。所以为了晋升消费者对于音讯的生产速度,能够减少consumer数据或者对音讯进行批量生产。MQ接管到producer发送的音讯后,不会间接推送给consumer。而是积攒到肯定数量后,再进行音讯的发送。 这种形式的实现,能够了解为是一种缓冲区的实现,晋升了音讯的生产速度,然而会在肯定水平上舍弃后果返回的实时性。
对于批量生产来说,也是须要思考幂等的。对于幂等性的解决方案,沿用方才的思路即可解决。
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!