服务幂等性架构设计

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub地址 (有我精心筹备的130本电子书PDF)

    只分享干货、不吹水,让咱们一起加油!

防重表实现幂等

对于避免数据反复提交,还有一种解决方案就是通过防重表实现。

防重表的实现思路也非常简单,首先创立一张表作为防重表,同时在该表中建设一个或多个字段的惟一索引作为防重字段,用于保障并发状况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则示意是反复数据。

为什么不必乐观锁

对于防重表的解决方案,可能有人会说为什么不应用乐观锁,乐观锁在应用的过程中也是会产生死锁的。

乐观锁是通过锁表的形式实现的,假如当初一个用户A拜访表A(锁住了表A),而后试图拜访表B;

另一个用户B拜访表B(锁住了表B),而后试图拜访表A。 这时对于用户A来说,因为表B曾经被用户B锁住了,所以用户A必须等到用户B开释表B能力拜访。

同时对于用户B来说,因为表A曾经被用户A锁住了,所以用户B必须等到用户A开释表A能力拜访。此时死锁就曾经产生了。

浏览业余类书籍是Java程序员必备的学习形式之一。通过一直学习和积攒,能够一直进步本人的技术能力和职业程度,实现职业倒退的指标。

十分倡议大家重视浏览,并抉择一些有深度、有价值的书籍一直晋升本人的技术水平和能力。这些书籍包含Java编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与利用等等。

对于一位2-3年的Java程序员来说,浏览业余类书籍是更加的重要,因为它们能够帮忙你扩大技术广度和深度,进步你的技术能力和职业程度。以下是我给这些程序员的一些倡议:

学会寻找优良的书籍:在抉择书籍时,要抉择那些被宽泛认可和举荐的经典书籍。能够通过搜寻网上的书籍举荐列表,向其余经验丰富的程序员求教,或者参考公司外部的学习打算,来找到好的书籍。
浏览无关设计模式和架构的书籍:对于Java程序员来说,把握设计模式和架构准则是十分重要的。能够抉择浏览《设计模式:可复用面向对象软件的根底》、《大话设计模式》、《Java程序员修炼之道》等书籍来深刻学习。
学习新的技术和框架:Java技术一直倒退,新的技术和框架也不断涌现。因而,Java程序员应该定期浏览无关新技术和框架的书籍,比方Spring、Spring Boot、MyBatis、Netty等。
学习算法和数据结构:算法和数据结构是编程根底,把握这些常识能够进步代码的品质和效率。能够抉择浏览《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源我的项目和源代码:浏览开源我的项目和源代码是十分无益的,能够学习到其余程序员的编码技巧和设计思路。能够抉择一些出名的开源我的项目,如Spring、MyBatis等来进行学习。
当然,我也晓得,光是倡议是不足以激发大家学习的能源的,所以,书也我也帮大家整顿好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你本人了。

以下这份蕴含46本Java程序员必备经典的书单(豆瓣评分8分+),是我破费一个月工夫整顿的:GitHub:GitHub地址

惟一主键实现幂等

数据库惟一主键的实现次要是利用数据库中主键惟一束缚的个性,一般来说惟一主键比拟实用于“插入”时的幂等性,其能保障一张表中只能存在一条带该惟一主键的记录。

应用数据库惟一主键实现幂等性时须要留神的是,该主键一般来说并不是应用数据库中自增主键,而是应用分布式 ID 充当主键,这样能力能保障在分布式环境下 ID 的全局唯一性。

对于一些后盾零碎,并发量并不高的状况下,对于幂等的实现非常简单,通过这种思维即可实现幂等管制。

实用场景

  • 插入操作
  • 删除操作

应用限度

  • 须要生成全局惟一主键 ID;

次要流程

次要流程如下:
  1. 客户端执行创立申请,调用服务端接口。
  2. 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL 语句。
  3. 服务端将该条数据插入数据库中,如果插入胜利则示意没有反复调用接口。如果抛出主键反复异样,则示意数据库中曾经存在该条记录,返回错误信息到客户端。

    在业务执行前,先判断是否曾经操作过,如果没有则执行,否则判断为反复操作。

成果演示

在并发下拜访时,因为是基于id进行判断,那id值就必须要保障在屡次提交时,须要惟一。拜访流程如下:

 @Override@Transactional(rollbackFor = Exception.class)public String addOrder(Order order) {    order.setCreateTime(new Date());    order.setUpdateTime(new Date());    //查问    Order orderResult = orderMapper.selectByPrimaryKey(order.getId());    Optional<Order> orderOptional = Optional.ofNullable(orderResult);    if (orderOptional.isPresent()){        return "repeat request";    }    int result = orderMapper.insert(order);    if (result != 1){        return "fail";    }    return "success";}

对于上述性能实现,在并发下,并不能实现幂等性管制。通过jemeter测试,模仿50个并发,能够发现,插入了反复数据。产生了脏数据。

要解决这个问题,非常简单,在数据库层面增加惟一索引即可,将id设置为惟一索引,也是最容易想到的形式,一旦id呈现反复,就会出现异常,防止了脏数据的产生也能够解决永久性幂等。但该计划无奈用于分库分表状况,其只实用于单表状况。

乐观锁实现幂等性

数据库乐观锁计划个别只能实用于执行更新操作的过程,咱们能够提前在对应的数据表中多增加一个字段,充当以后数据的版本标识。

这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。

实用操作

  • 更新操作

应用限度

  • 须要数据库对应业务表中增加额定字段

问题抛出

扣减库存数据谬误
通过jemeter进行测试,能够发现。当模仿一万并发时,最终的库存数量是谬误的。这次要是因为当多线程拜访时,一个线程读取到了另外线程未提交的数据造成。

synchronized生效问题
对于当初的问题,暂不思考秒杀设计、队列申请串行化等,只思考如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized

依据synchronized定义,当多线程并发拜访时,会对以后加锁的办法产生阻塞,从而保障线程平安,防止脏数据。然而,真的能如预期的一样吗?

 @Servicepublic class StockServiceImpl implements StockService {    @Autowired    private StockMapper stockMapper;    @Override    @Transactional(rollbackFor = Exception.class)    public synchronized int lessInventory(String goodsId, int num) {        return stockMapper.lessInventory(goodsId, num);    }}

以后曾经在在办法上增加了synchronized,对以后办法对象进行了锁定。 通过Jemeter,模仿一万并发对其进行拜访。能够发现,依然呈现了脏数据。

事务导致锁生效
该问题的产生起因,就在于在办法上synchronized搭配应用了@Transactional

首先synchronized锁定的是以后办法对象,而@Transactional会对以后办法进行AOP加强,动静代理出一个代理对象,在办法执行前开启事务,执行后提交事务。

所以synchronized@Transactional其实操作的是两个不同的对象,换句话说就是@Transactional的事务操作并不在synchronized锁定范畴之内。

假如A线程执行完扣减库存办法,会开释锁并提交事务。但A线程开释锁但还没提交事务前,B线程执行扣减库存办法,B线程执行后,和A线程一起提交事务,就呈现了线程平安问题,造成脏数据的呈现。

乐观锁保障幂等

基于版本号实现

MySQL乐观锁是基于数据库实现分布式锁的一种实现,实现的形式有两种:

  • 基于版本号
  • 基于条件

然而实现思维都是基于MySQL的行锁思维来实现的。

  1. 批改数据表,增加version字段,默认值为0
  2. 批改StockMapper增加基于版本批改数据办法
 @Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
  1. 测试模仿一万并发进行数据批改,此时能够发现以后版本号从0变为1,且库存量正确。

基于条件实现
通过版本号管制是一种十分常见的形式,适宜于大多数场景。

但当初库存扣减的场景来说,通过版本号管制就是多人并发拜访购买时,查问时显示能够购买,但最终只有一个人能胜利,这也是不能够的。其实最终只有商品库存不产生超卖就能够。那此时就能够通过条件来进行管制。

  1. 批改StockMapper
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
  1. 批改StockController
 @PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){    int result = stockService.lessInventoryByVersionOut(goodsId, num);    if (result == 1){        System.out.println("购买胜利");        return "success";    }    System.out.println("购买失败");    return "fail";}
  1. 通过jemeter进行测试,能够发现当多人并发扣减库存时,管制住了商品超卖的问题。

乐观锁实现幂等性

在零碎中,不光要保障客户端拜访的幂等性,同时还要保障服务间幂等。

比拟常见的状况,当服务间进行调用时,因为网络抖动等起因呈现超时,则很有可能呈现数据谬误。此时在分布式环境下,就须要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。

feign超时重试成果演示

以上图为例,当客户端要生成订单时,能够基于token机制保障生成订单的幂等性,接着订单生成胜利后,还会基于feign调用库存服务进行库存扣减,此时则很有可能呈现,库存服务执行扣减库存胜利,然而当后果返回时,呈现网络抖动超时了,那么上游的订单服务则很有可能会发动重试,此时如果不进行扣减库存的幂等性保障的话,则呈现扣减库存执行屡次。

那能够先来演示当上游服务呈现提早,上游服务基于feign进行重试的成果。
  1. 以后是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时工夫与重试次数。
 /** * 自定义feign超时工夫、重试次数 * 默认超时为10秒,不会进行重试。 */@Configurationpublic class FeignConfigure {    //超时工夫,工夫单位毫秒    public static int connectTimeOutMillis = 5000;    public static int readTimeOutMillis = 5000;    @Bean    public Request.Options options() {        return new Request.Options(connectTimeOutMillis, readTimeOutMillis);    }    //自定义重试次数    @Bean    public Retryer feignRetryer(){        Retryer retryer = new Retryer.Default(100, 1000, 4);        return retryer;    }}
  1. stock服务的StockController中demo办法会提早六秒。

    通过这种形式模仿超时成果。此时在order中调用stock服务,能够发现,order服务会对stock服务调用四次。

这里就演示了服务间调用超时的成果,当上游服务超时,上游服务会进行重试。

服务调用超时库存屡次扣减
根据上述演示,当上游服务超时,上游服务就会进行重试。

那么联合以后的业务场景,当用户下单胜利去调用库存服务扣减库存时, 如果库存服务执行扣减库存胜利但返回后果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会呈现同一订单商品库存被屡次扣减。

  1. 在订单服务中生成订单,并调用库存服务扣减库存
 @Idemptent@PostMapping("/genOrder")public String genOrder(@RequestBody Order order){    String orderId = String.valueOf(idWorker.nextId());    order.setId(orderId);    order.setCreateTime(new Date());    order.setUpdateTime(new Date());    int result = orderService.addOrder(order);    if (result != 1){        System.out.println("fail");        return "fail";    }    //生成订单详情信息    List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);    goodsIdArray.stream().forEach(goodsId->{        //插入订单详情        OrderDetail orderDetail = new OrderDetail();        orderDetail.setId(String.valueOf(idWorker.nextId()));        orderDetail.setGoodsId(goodsId);        orderDetail.setOrderId(orderId);        orderDetail.setGoodsName("heima");        orderDetail.setGoodsNum(1);        orderDetail.setGoodsPrice(1);        orderDetailService.addOrderDetail(orderDetail);        //扣减库存(不思考锁)        stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());    });    return "success";}
  1. 库存服务间接基于商品信息进行库存扣减
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num); @PutMapping("/reduceStockNoLock/{goodsId}/{num}")public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,                                    @PathVariable("num") Integer num) throws InterruptedException {        System.out.println("reduce stock");        int result = stockService.reduceStockNoLock(goodsId, num);        if (result != 1){            return "reduce stock fail";        }        //提早        TimeUnit.SECONDS.sleep(6000);        return "reduce stock success";    }
  1. 执行生成订单扣减库存,此时能够发现扣减库存办法被执行屡次,并且库存数量也被扣减了屡次
 {"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}

乐观锁解决服务间重试保障幂等
  1. 批改StockMapper,增加乐观锁管制管制库存
 @Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
  1. 批改StockController,增加乐观锁扣减库存办法
 /**     * 乐观锁扣减库存     * @param goodsId     * @param num     * @param version     * @return     */@PutMapping("/reduceStock/{goodsId}/{num}/{version}")public int reduceStock(@PathVariable("goodsId") String goodsId,                       @PathVariable("num") Integer num,                       @PathVariable("version") Integer version) throws InterruptedException {    System.out.println("exec reduce stock");    int result = stockService.reduceStock(goodsId, num, version);    if (result != 1){        //扣减失败        return result;    }    //提早    TimeUnit.SECONDS.sleep(6000);    return result;}
  1. 测试,能够发现尽管产生多次重试,然而库存只会被扣减胜利一次。保障了服务间的幂等性。
ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的提早每一次都是超过超时工夫的,所以最终在order服务才会呈现read timeout异样提醒。

音讯幂等

在零碎中当应用音讯队列时,无论做哪种技术选型,有很多问题是无论如何也不能漠视的,如:音讯必达、音讯幂等等。本章节以典型的RabbitMQ为例,解说如何保障音讯幂等的可施行解决方案,其余MQ选型均可参考。

音讯重试演示

音讯队列的音讯幂等性,次要是由MQ重试机制引起的。

因为音讯生产者将音讯发送到MQ-Server后,MQ-Server会将音讯推送到具体的音讯消费者。假如因为网络抖动或出现异常时,MQ-Server依据重试机制就会将音讯从新向音讯消费者推送,造成音讯消费者屡次收到雷同音讯,造成数据不统一。

在RabbitMQ中,音讯重试机制是默认开启的,但只会在consumer出现异常时,才会反复推送。在应用中,异样的呈现有可能是因为生产方又去调用第三方接口,因为网络抖动而造成异样,然而这个异样有可能是临时的。所以当消费者出现异常,能够让其重试几次,如果重试几次后,依然有异样,则须要进行数据弥补。

数据弥补计划:当重试屡次后依然出现异常,则让此条音讯进入死信队列,最终进入到数据库中,接着设置定时job查问这些数据,进行手动弥补。

本节中以consumer生产异样为演示主体,因而须要批改RabbitMQ配置文件。

批改配置文件
批改consumer一方的配置文件
 # 消费者监听相干配置    listener:      simple:        retry:          # 开启消费者(程序出现异常)重试机制,默认开启并始终重试          enabled: true          # 最大重试次数          max-attempts: 5          # 重试间隔时间(毫秒)          initial-interval: 3000
设置生产异样

当consumer音讯监听类中增加异样,最终承受音讯时,能够发现,音讯在接管五次后,最终出现异常。

音讯幂等解决

要保障音讯幂等性的话,其实最终要解决的就是保障屡次操作,造成的影响是雷同的。那么其解决方案的思路与服务间幂等的思路其实根本都是统一的。
  1. 音讯防重表,解决思路与服务间幂等的防重表统一。
  2. redis:利用redis防重。
这两种计划是最常见的解决方案。其实现思路其实都是统一的。

代码实现

批改OrderController
 /**     * 此处为了不便演示,不做根底增加数据库操作     * @return     */@PostMapping("/addOrder")public String addOrder(){    String uniqueKey = String.valueOf(idWorker.nextId());    MessageProperties messageProperties = new MessageProperties();    messageProperties.setMessageId(uniqueKey);    messageProperties.setContentType("text/plain");    messageProperties.setContentEncoding("utf-8");    Message message = new Message("1271700536000909313".getBytes(),messageProperties);    rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);    return "success";}
批改stockApplication
 @Beanpublic JedisPool jedisPool(){    return new JedisPool("192.168.200.150",6379);}
新增音讯监听类
 @Componentpublic class ReduceStockListener {    @Autowired    private StockService stockService;    @Autowired    private JedisPool jedisPool;    @Autowired    private StockFlowService stockFlowService;    @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)    @Transactional    public void receiveMessage(Message message){        //获取音讯id        String messageId = message.getMessageProperties().getMessageId();        Jedis jedis = jedisPool.getResource();        System.out.println(messageId);        try {            //redis锁去重校验            if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){                System.out.println("反复申请");                return;            }            //mysql状态校验            if (!(stockFlowService.findByFlag(messageId).size() == 0)){                System.out.println("数据已解决");                return;            }            String goodsId = null;            try {                //获取音讯体中goodsId                goodsId = new String(message.getBody(),"utf-8");                stockService.reduceStock(goodsId,messageId);            } catch (UnsupportedEncodingException e) {                e.printStackTrace();            }            int nextInt = new Random().nextInt(100);            System.out.println("随机数:"+nextInt);            if (nextInt%2 ==0){                int i= 1/0;            }        } catch (RuntimeException e) {            //解锁            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";            jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));            System.out.println("出现异常了");            System.out.println(messageId+":开释锁");            throw e;        }    }}

音讯缓冲区

对于RabbitMQ的应用,默认状况下,每条音讯都会进行别离的ack告诉,生产完一条后,再来生产下一条。然而这样就会造成大量音讯的阻塞状况。所以为了晋升消费者对于音讯的生产速度,能够减少consumer数据或者对音讯进行批量生产。MQ接管到producer发送的音讯后,不会间接推送给consumer。而是积攒到肯定数量后,再进行音讯的发送。 这种形式的实现,能够了解为是一种缓冲区的实现,晋升了音讯的生产速度,然而会在肯定水平上舍弃后果返回的实时性。

对于批量生产来说,也是须要思考幂等的。对于幂等性的解决方案,沿用方才的思路即可解决。

本文由传智教育博学谷狂野架构师教研团队公布。

如果本文对您有帮忙,欢送关注点赞;如果您有任何倡议也可留言评论私信,您的反对是我保持创作的能源。

转载请注明出处!