乐趣区

关于java:一次说透4大服务性幂等场景架构设计方案

服务幂等性架构设计

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)

    只分享干货、不吹水,让咱们一起加油!😄

防重表实现幂等

对于避免数据反复提交,还有一种解决方案就是通过防重表实现。

防重表的实现思路也非常简单,首先创立一张表作为防重表,同时在该表中建设一个或多个字段的惟一索引作为防重字段,用于保障并发状况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则示意是反复数据。

为什么不必乐观锁

对于防重表的解决方案,可能有人会说为什么不应用乐观锁,乐观锁在应用的过程中也是会产生死锁的。

乐观锁是通过锁表的形式实现的,假如当初一个用户 A 拜访表 A(锁住了表 A),而后试图拜访表 B;

另一个用户 B 拜访表 B(锁住了表 B),而后试图拜访表 A。这时对于用户 A 来说,因为表 B 曾经被用户 B 锁住了,所以用户 A 必须等到用户 B 开释表 B 能力拜访。

同时对于用户 B 来说,因为表 A 曾经被用户 A 锁住了,所以用户 B 必须等到用户 A 开释表 A 能力拜访。此时死锁就曾经产生了。

浏览业余类书籍是 Java 程序员必备的学习形式之一。通过一直学习和积攒,能够一直进步本人的技术能力和职业程度,实现职业倒退的指标。

十分倡议大家重视浏览,并抉择一些有深度、有价值的书籍一直晋升本人的技术水平和能力。这些书籍包含 Java 编程语言、数据结构和算法、面向对象设计、设计模式、框架原理与利用等等。

对于一位 2 - 3 年的 Java 程序员来说,浏览业余类书籍是更加的重要,因为它们能够帮忙你扩大技术广度和深度,进步你的技术能力和职业程度。以下是我给这些程序员的一些倡议:

学会寻找优良的书籍:在抉择书籍时,要抉择那些被宽泛认可和举荐的经典书籍。能够通过搜寻网上的书籍举荐列表,向其余经验丰富的程序员求教,或者参考公司外部的学习打算,来找到好的书籍。
浏览无关设计模式和架构的书籍:对于 Java 程序员来说,把握设计模式和架构准则是十分重要的。能够抉择浏览《设计模式:可复用面向对象软件的根底》、《大话设计模式》、《Java 程序员修炼之道》等书籍来深刻学习。
学习新的技术和框架:Java 技术一直倒退,新的技术和框架也不断涌现。因而,Java 程序员应该定期浏览无关新技术和框架的书籍,比方 Spring、Spring Boot、MyBatis、Netty 等。
学习算法和数据结构:算法和数据结构是编程根底,把握这些常识能够进步代码的品质和效率。能够抉择浏览《算法》、《算法导论》等书籍来学习算法和数据结构。
参考开源我的项目和源代码:浏览开源我的项目和源代码是十分无益的,能够学习到其余程序员的编码技巧和设计思路。能够抉择一些出名的开源我的项目,如 Spring、MyBatis 等来进行学习。
当然,我也晓得,光是倡议是不足以激发大家学习的能源的,所以,书也我也帮大家整顿好了,把饭喂到嘴里了,我只能帮你到这里了,剩下的就靠你本人了。

以下这份蕴含 46 本 Java 程序员必备经典的书单(豆瓣评分 8 分 +),是我破费一个月工夫整顿的:GitHub:GitHub 地址

惟一主键实现幂等

数据库 惟一主键 的实现次要是利用数据库中主键惟一束缚的个性,一般来说惟一主键比拟实用于“插入”时的幂等性,其能保障一张表中只能存在一条带该惟一主键的记录。

应用数据库惟一主键实现幂等性时须要留神的是,该主键一般来说并不是应用数据库中自增主键,而是应用分布式 ID 充当主键,这样能力能保障在分布式环境下 ID 的全局唯一性。

对于一些后盾零碎,并发量并不高的状况下,对于幂等的实现非常简单,通过这种思维即可实现幂等管制。

实用场景

  • 插入操作
  • 删除操作

应用限度

  • 须要生成全局惟一主键 ID;

次要流程

次要流程如下:

  1. 客户端执行创立申请,调用服务端接口。
  2. 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL 语句。
  3. 服务端将该条数据插入数据库中,如果插入胜利则示意没有反复调用接口。如果抛出主键反复异样,则示意数据库中曾经存在该条记录,返回错误信息到客户端。

    在业务执行前,先判断是否曾经操作过,如果没有则执行,否则判断为反复操作。

成果演示

在并发下拜访时,因为是基于 id 进行判断,那 id 值就必须要保障在屡次提交时,须要惟一。拜访流程如下:

 @Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {order.setCreateTime(new Date());
    order.setUpdateTime(new Date());

    // 查问
    Order orderResult = orderMapper.selectByPrimaryKey(order.getId());

    Optional<Order> orderOptional = Optional.ofNullable(orderResult);
    if (orderOptional.isPresent()){return "repeat request";}

    int result = orderMapper.insert(order);
    if (result != 1){return "fail";}

    return "success";
}

对于上述性能实现,在并发下,并不能实现幂等性管制。通过 jemeter 测试,模仿 50 个并发,能够发现,插入了反复数据。产生了脏数据。

要解决这个问题,非常简单,在数据库层面增加 惟一索引 即可,将 id 设置为惟一索引,也是最容易想到的形式,一旦 id 呈现反复,就会出现异常,防止了脏数据的产生也能够解决永久性幂等。但该计划无奈用于分库分表状况,其只实用于单表状况。

乐观锁实现幂等性

数据库乐观锁计划个别只能实用于执行 更新操作 的过程,咱们能够提前在对应的数据表中多增加一个字段,充当以后数据的版本标识。

这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。

实用操作

  • 更新操作

应用限度

  • 须要数据库对应业务表中增加额定字段

问题抛出

扣减库存数据谬误

通过 jemeter 进行测试,能够发现。当模仿一万并发时,最终的库存数量是谬误的。这次要是因为当多线程拜访时,一个线程读取到了另外线程未提交的数据造成。

synchronized 生效问题

对于当初的问题,暂不思考秒杀设计、队列申请串行化等,只思考如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized

依据 synchronized 定义,当多线程并发拜访时,会对以后加锁的办法产生阻塞,从而保障线程平安,防止脏数据。然而,真的能如预期的一样吗?

 @Service
public class StockServiceImpl implements StockService {

    @Autowired
    private StockMapper stockMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public synchronized int lessInventory(String goodsId, int num) {return stockMapper.lessInventory(goodsId, num);
    }
}

以后曾经在在办法上增加了 synchronized,对以后办法对象进行了锁定。通过 Jemeter,模仿一万并发对其进行拜访。能够发现,依然呈现了脏数据。

事务导致锁生效

该问题的产生起因,就在于在办法上 synchronized 搭配应用了@Transactional

首先 synchronized 锁定的是以后办法对象,而 @Transactional 会对以后办法进行 AOP 加强,动静代理出一个代理对象,在办法执行前开启事务,执行后提交事务。

所以 synchronized@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional 的事务操作并不在 synchronized 锁定范畴之内。

假如 A 线程执行完扣减库存办法,会开释锁并提交事务。但 A 线程开释锁但还没提交事务前,B 线程执行扣减库存办法,B 线程执行后,和 A 线程一起提交事务,就呈现了线程平安问题,造成脏数据的呈现。

乐观锁保障幂等

基于版本号实现

MySQL 乐观锁是基于数据库实现分布式锁的一种实现,实现的形式有两种:

  • 基于版本号
  • 基于条件

然而实现思维都是基于 MySQL 的行锁思维来实现的。

  1. 批改数据表,增加 version 字段,默认值为 0
  2. 批改 StockMapper 增加基于版本批改数据办法
 @Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
  1. 测试模仿一万并发进行数据批改,此时能够发现以后版本号从 0 变为 1,且库存量正确。
基于条件实现

通过版本号管制是一种十分常见的形式,适宜于大多数场景。

但当初库存扣减的场景来说,通过版本号管制就是多人并发拜访购买时,查问时显示能够购买,但最终只有一个人能胜利,这也是不能够的。其实最终只有商品库存不产生超卖就能够。那此时就能够通过条件来进行管制。

  1. 批改StockMapper
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
  1. 批改StockController
 @PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){int result = stockService.lessInventoryByVersionOut(goodsId, num);
    if (result == 1){System.out.println("购买胜利");
        return "success";
    }

    System.out.println("购买失败");
    return "fail";
}
  1. 通过 jemeter 进行测试,能够发现当多人并发扣减库存时,管制住了商品超卖的问题。

乐观锁实现幂等性

在零碎中,不光要保障客户端拜访的幂等性,同时还要保障服务间幂等。

比拟常见的状况,当服务间进行调用时,因为网络抖动等起因呈现超时,则很有可能呈现数据谬误。此时在分布式环境下,就须要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中 MySQL 乐观锁就是其中一种实现。

feign 超时重试成果演示

以上图为例,当客户端要生成订单时,能够基于 token 机制保障生成订单的幂等性,接着订单生成胜利后,还会基于 feign 调用库存服务进行库存扣减,此时则很有可能呈现,库存服务执行扣减库存胜利,然而当后果返回时,呈现网络抖动超时了,那么上游的订单服务则很有可能会发动重试,此时如果不进行扣减库存的幂等性保障的话,则呈现扣减库存执行屡次。

那能够先来演示当上游服务呈现提早,上游服务基于 feign 进行重试的成果。

  1. 以后是 order 调用 feign,所以在 order 中会存在 feignConfigure 配置类,用于配置超时工夫与重试次数。
 /**
 * 自定义 feign 超时工夫、重试次数
 * 默认超时为 10 秒,不会进行重试。*/
@Configuration
public class FeignConfigure {

    // 超时工夫,工夫单位毫秒
    public static int connectTimeOutMillis = 5000;
    public static int readTimeOutMillis = 5000;

    @Bean
    public Request.Options options() {return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
    }

    // 自定义重试次数
    @Bean
    public Retryer feignRetryer(){Retryer retryer = new Retryer.Default(100, 1000, 4);
        return retryer;
    }
}
  1. stock 服务的 StockController 中 demo 办法会提早六秒。

    通过这种形式模仿超时成果。此时在 order 中调用 stock 服务,能够发现,order 服务会对 stock 服务调用四次。

这里就演示了服务间调用超时的成果,当上游服务超时,上游服务会进行重试。

服务调用超时库存屡次扣减

根据上述演示,当上游服务超时,上游服务就会进行重试。

那么联合以后的业务场景,当用户下单胜利去调用库存服务扣减库存时,如果库存服务执行扣减库存胜利但返回后果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会呈现同一订单商品库存被屡次扣减。

  1. 在订单服务中生成订单,并调用库存服务扣减库存
 @Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){String orderId = String.valueOf(idWorker.nextId());
    order.setId(orderId);
    order.setCreateTime(new Date());
    order.setUpdateTime(new Date());
    int result = orderService.addOrder(order);

    if (result != 1){System.out.println("fail");
        return "fail";
    }

    // 生成订单详情信息
    List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);

    goodsIdArray.stream().forEach(goodsId->{
        // 插入订单详情
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setId(String.valueOf(idWorker.nextId()));
        orderDetail.setGoodsId(goodsId);
        orderDetail.setOrderId(orderId);
        orderDetail.setGoodsName("heima");
        orderDetail.setGoodsNum(1);
        orderDetail.setGoodsPrice(1);
        orderDetailService.addOrderDetail(orderDetail);

        // 扣减库存(不思考锁)
        stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());

    });


    return "success";
}
  1. 库存服务间接基于商品信息进行库存扣减
 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
 @PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
                                    @PathVariable("num") Integer num) throws InterruptedException {System.out.println("reduce stock");
        int result = stockService.reduceStockNoLock(goodsId, num);

        if (result != 1){return "reduce stock fail";}

        // 提早
        TimeUnit.SECONDS.sleep(6000);
        return "reduce stock success";
    }
  1. 执行生成订单扣减库存,此时能够发现扣减库存办法被执行屡次,并且库存数量也被扣减了屡次
 {"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
乐观锁解决服务间重试保障幂等
  1. 批改 StockMapper,增加乐观锁管制管制库存
 @Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
  1. 批改 StockController,增加乐观锁扣减库存办法
 /**
     * 乐观锁扣减库存
     * @param goodsId
     * @param num
     * @param version
     * @return
     */
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
                       @PathVariable("num") Integer num,
                       @PathVariable("version") Integer version) throws InterruptedException {System.out.println("exec reduce stock");
    int result = stockService.reduceStock(goodsId, num, version);
    if (result != 1){
        // 扣减失败
        return result;
    }
    // 提早
    TimeUnit.SECONDS.sleep(6000);
    return result;
}
  1. 测试,能够发现尽管产生多次重试,然而库存只会被扣减胜利一次。保障了服务间的幂等性。
ps:order 服务出现异常,是因为 order 服务会超时重试四次,但 stock 服务的提早每一次都是超过超时工夫的,所以最终在 order 服务才会呈现 read timeout 异样提醒。

音讯幂等

在零碎中当应用音讯队列时,无论做哪种技术选型,有很多问题是无论如何也不能漠视的,如:音讯必达、音讯幂等等。本章节以典型的 RabbitMQ 为例,解说如何保障音讯幂等的可施行解决方案,其余 MQ 选型均可参考。

音讯重试演示

音讯队列的音讯幂等性,次要是由 MQ 重试机制引起的。

因为音讯生产者将音讯发送到 MQ-Server 后,MQ-Server 会将音讯推送到具体的音讯消费者。假如因为网络抖动或出现异常时,MQ-Server 依据重试机制就会将音讯从新向音讯消费者推送,造成音讯消费者屡次收到雷同音讯,造成数据不统一。

在 RabbitMQ 中,音讯重试机制是默认开启的,但只会在 consumer 出现异常时,才会反复推送。在应用中,异样的呈现有可能是因为生产方又去调用第三方接口,因为网络抖动而造成异样,然而这个异样有可能是临时的。所以当消费者出现异常,能够让其重试几次,如果重试几次后,依然有异样,则须要进行数据弥补。

数据弥补计划:当重试屡次后依然出现异常,则让此条音讯进入死信队列,最终进入到数据库中,接着设置定时 job 查问这些数据,进行手动弥补。

本节中以 consumer 生产异样为演示主体,因而须要批改 RabbitMQ 配置文件。

批改配置文件

批改 consumer 一方的配置文件

 # 消费者监听相干配置
    listener:
      simple:
        retry:
          # 开启消费者 (程序出现异常) 重试机制,默认开启并始终重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
设置生产异样

当 consumer 音讯监听类中增加异样,最终承受音讯时,能够发现,音讯在接管五次后,最终出现异常。

音讯幂等解决

要保障音讯幂等性的话,其实最终要解决的就是保障屡次操作,造成的影响是雷同的。那么其解决方案的思路与服务间幂等的思路其实根本都是统一的。

  1. 音讯防重表,解决思路与服务间幂等的防重表统一。
  2. redis:利用 redis 防重。

这两种计划是最常见的解决方案。其实现思路其实都是统一的。

代码实现

批改 OrderController
 /**
     * 此处为了不便演示,不做根底增加数据库操作
     * @return
     */
@PostMapping("/addOrder")
public String addOrder(){String uniqueKey = String.valueOf(idWorker.nextId());

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(uniqueKey);
    messageProperties.setContentType("text/plain");
    messageProperties.setContentEncoding("utf-8");
    Message message = new Message("1271700536000909313".getBytes(),messageProperties);
    rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);

    return "success";
}
批改 stockApplication
 @Bean
public JedisPool jedisPool(){return new JedisPool("192.168.200.150",6379);
}
新增音讯监听类
 @Component
public class ReduceStockListener {

    @Autowired
    private StockService stockService;

    @Autowired
    private JedisPool jedisPool;

    @Autowired
    private StockFlowService stockFlowService;

    @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
    @Transactional
    public void receiveMessage(Message message){

        // 获取音讯 id
        String messageId = message.getMessageProperties().getMessageId();

        Jedis jedis = jedisPool.getResource();

        System.out.println(messageId);
        try {

            //redis 锁去重校验
            if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){System.out.println("反复申请");
                return;
            }

            //mysql 状态校验
            if (!(stockFlowService.findByFlag(messageId).size() == 0)){System.out.println("数据已解决");
                return;
            }

            String goodsId = null;
            try {
                // 获取音讯体中 goodsId
                goodsId = new String(message.getBody(),"utf-8");
                stockService.reduceStock(goodsId,messageId);
            } catch (UnsupportedEncodingException e) {e.printStackTrace();
            }

            int nextInt = new Random().nextInt(100);
            System.out.println("随机数:"+nextInt);
            if (nextInt%2 ==0){int i= 1/0;}


        } catch (RuntimeException e) {
            // 解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
            System.out.println("出现异常了");
            System.out.println(messageId+":开释锁");
            throw e;
        }
    }
}

音讯缓冲区

对于 RabbitMQ 的应用,默认状况下,每条音讯都会进行别离的 ack 告诉,生产完一条后,再来生产下一条。然而这样就会造成大量音讯的阻塞状况。所以为了晋升消费者对于音讯的生产速度,能够减少 consumer 数据或者对音讯进行批量生产。MQ 接管到 producer 发送的音讯后,不会间接推送给 consumer。而是积攒到肯定数量后,再进行音讯的发送。这种形式的实现,能够了解为是一种缓冲区的实现,晋升了音讯的生产速度,然而会在肯定水平上舍弃后果返回的实时性。

对于批量生产来说,也是须要思考幂等的。对于幂等性的解决方案,沿用方才的思路即可解决。

本文由 传智教育博学谷狂野架构师 教研团队公布。

如果本文对您有帮忙,欢送 关注 点赞 ;如果您有任何倡议也可 留言评论 私信,您的反对是我保持创作的能源。

转载请注明出处!

退出移动版