RedisRocketMQ实现并发条件下库存的扣减增加秒杀库存控制

34次阅读

共计 13655 个字符,预计需要花费 35 分钟才能阅读完成。

前言

前面我的博客介绍了有关分布式锁,分布式事务相关的问题以及解决方案,但是还是不能解决并发下单,扣减的问题,并发的时候由于数据库的隔离级别 / 乐观锁 / 悲观锁 … 总是会出现一些问题。最近集成了一套方案解决此类型问题,并可以适用于一般情况的秒杀方案。欢迎拍砖 …

情景分析

前提条件:
商品 P 库存数量为 100
用户 A, 用户 B 同时 分别想买 P, 并且 A,B 分别买一个
数据库有版本号控制乐观锁
期望结果:
用户 A,B 都买到商品 P,并且商品 P 的库存变为 98

分析:1. 以前碰到这类问题的时候,我们说,可以添加分布式锁(也就是悲观锁),将商品 P 的 id 锁住,然后 A,B 提交订
单的时候分别预扣商品 P 的库存就好了(如方案一)。2. 是的,1 分析的有道理,我们的核心思路确实要将并行控制为串行,但是事与愿违。我们仔细想一想:如果,A,B 想买的不仅仅 P 一个商品,而是 P,Q 两个商品呢?我们该怎么锁?一次性将 P,Q 都锁住?显然不是
  很现实,并且实际上提交订单的时候都是从购物车下单,一个购物车里包括多个商品去下单也是很常见的,并且还有如下逻辑大家仔细思考:用户 A 下单 -> 用户 A 读到库存为 100
  用户 A 预扣库存,判断剩余库存是否 >=0,100-1=99>0, 则 A 预扣
  用户 A 的下单流程....
  此时 A 的事务没有提交,B 下单了:
  用户 B 下单 -> 用户 B 读到库存为 100(这里读到 100,是因为 A 还有其他逻辑在执行, 还未提交事务,B 读未提交了!)
  用户 B 预扣库存,判断剩余库存是否 >=0,100-1=99>0, 则 B 预扣
  用户 B 的下单流程....   
  
  最后不论 A / B 谁先提交事务,后面提交事务的就不会扣减库存成功。因为版本号不一致(就算没有乐观锁,修改的结果也会错,而且错的更离谱)。最终的结局就是库存是 99
  
3. 解决方案
  目前控制库存的方案有很多种,我这边介绍通过 redis 预减库存,通过 mq 发送消息同步的扣减数据库库存的
  方案。

方案一


@Transactional
public void createOrder(...){
    //1. 校验,扣减库存
    check(Item item);
    
    //2. 创建订单
    
    //3. 创建支付单

}

@RedisLock("pId")
public void check(Item item){
}

解决方案伪代码


  // 当然,我们管理平台新建商品时需要初始化到 redis 库存里,// 这里暂时就不介绍了
  
  
  // 下单部分
  @Transactional
    public void createOrder(...){
        //1. 校验,扣减库存
        check(Item item);
        
        //2. 创建订单
        
        //3. 创建支付单
        
        //4.redis 扣减库存
    
    }
    
    // 支付回调部分
    @Transactional
    public void wxCall(...){
        //1. 校验订单状态
        
        //2. 修改订单,支付单状态
        
        //3.mq 发送全局顺序消息 扣减库存

    }
    
    
    // 取消支付部分
    @Transactional
    public void cancelOrder(...){
        //1. 校验订单状态
        
        //2. 修改订单,支付单状态
        
        //3.redis 回退库存

    }
    
    
    // 退货 / 退款部分
       @Transactional
    public void returnOrder(...){
        //1. 校验订单状态
        
        //2. 修改订单,支付单状态
        
        //3.redis 回退库存
        
        //4.mq 发送全局顺序消息

    }  

代码部分

  • 实现思路

    • 我们使用 redis 的 lua 脚本来实现扣减库存
    • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
    • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存
  • 初始化库存回调函数(IStockCallback)

/**
 * 获取库存回调
 * create by liuliang
 * on 2019-11-13  10:45
 */
public interface IStockCallback {
    /**
     * 获取库存
     * @return
     */
    String getStock();}
  • 分布式锁控制初始化库存

/**
 *
 * Redis 分布式锁
 * 使用 SET resource-name anystring NX EX max-lock-time 实现
 * <p>
 * 该方案在 Redis 官方 SET 命令页有详细介绍。* http://doc.redisfans.com/string/set.html
 * <p>
 * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性,* 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:* <p>
 * EX seconds — 以秒为单位设置 key 的过期时间;* PX milliseconds — 以毫秒为单位设置 key 的过期时间;* NX — 将 key 的值设为 value,当且仅当 key 不存在,等效于 SETNX。* XX — 将 key 的值设为 value,当且仅当 key 存在,等效于 SETEX。* <p>
 * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。* <p>
 * 客户端执行以上的命令:* <p>
 * 如果服务器返回 OK,那么这个客户端获得锁。* 如果服务器返回 NIL,那么客户端获取锁失败,可以在稍后再重试。*
 *
 * create by liuliang
 * on 2019-11-13  10:49
 */

public class RedisStockLock {private static Logger logger = LoggerFactory.getLogger(RedisStockLock.class);

    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 将 key 的值设为 value,当且仅当 key 不存在,等效于 SETNX。*/
    public static final String NX = "NX";

    /**
     * seconds — 以秒为单位设置 key 的过期时间,等效于 EXPIRE key seconds
     */
    public static final String EX = "EX";

    /**
     * 调用 set 后的返回值
     */
    public static final String OK = "OK";

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;

    /**
     * 解锁的 lua 脚本
     */
    public static final String UNLOCK_LUA;

    static {StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1]");
        sb.append("then");
        sb.append("return redis.call(\"del\",KEYS[1])");
        sb.append("else");
        sb.append("return 0");
        sb.append("end");
        UNLOCK_LUA = sb.toString();}

    /**
     * 锁标志对应的 key
     */
    private String lockKey;

    /**
     * 记录到日志的锁标志对应的 key
     */
    private String lockKeyLog = "";

    /**
     * 锁对应的值
     */
    private String lockValue;

    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁标记
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默认的锁过期时间和请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的 key(Redis 的 Key)*/
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默认的请求锁的超时时间,指定锁的过期时间
     *
     * @param redisTemplate
     * @param lockKey       锁的 key(Redis 的 Key)* @param expireTime    锁的过期时间(单位:秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默认的锁的过期时间,指定请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的 key(Redis 的 Key)* @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 锁的过期时间和请求锁的超时时间都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       锁的 key(Redis 的 Key)* @param expireTime    锁的过期时间(单位:秒)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * 尝试获取锁 超时返回
     *
     * @return
     */
    public boolean tryLock() {
        // 生成随机 key
        lockValue = UUID.randomUUID().toString();
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();
        while ((System.nanoTime() - nowTime) < timeout) {if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
                locked = true;
                // 上锁成功结束请求
                return locked;
            }

            // 每次请求等待一段时间
            seleep(10, 50000);
        }
        return locked;
    }

    /**
     * 尝试获取锁 立即返回
     *
     * @return 是否成功获得锁
     */
    public boolean lock() {lockValue = UUID.randomUUID().toString();
        // 不存在则添加 且设置过期时间(单位 ms)String result = set(lockKey, lockValue, expireTime);
        locked = OK.equalsIgnoreCase(result);
        return locked;
    }

    /**
     * 以阻塞方式的获取锁
     *
     * @return 是否成功获得锁
     */
    public boolean lockBlock() {lockValue = UUID.randomUUID().toString();
        while (true) {
            // 不存在则添加 且设置过期时间(单位 ms)String result = set(lockKey, lockValue, expireTime);
            if (OK.equalsIgnoreCase(result)) {
                locked = true;
                return locked;
            }

            // 每次请求等待一段时间
            seleep(10, 50000);
        }
    }

    /**
     * 解锁
     * <p>
     * 可以通过以下修改,让这个锁实现更健壮:* <p>
     * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。* 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。* 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。*/
    public Boolean unlock() {
        // 只有加锁成功并且锁还有效才去释放锁
        // 只有加锁成功并且锁还有效才去释放锁
        if (locked) {return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {Object nativeConnection = connection.getNativeConnection();
                    Long result = 0L;

                    List<String> keys = new ArrayList<>();
                    keys.add(lockKey);
                    List<String> values = new ArrayList<>();
                    values.add(lockValue);

                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }

                    // 单机模式
                    if (nativeConnection instanceof Jedis) {result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    }

                    if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {logger.info("Redis 分布式锁,解锁 {} 失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis());
                    }

                    locked = result == 0;
                    return result == 1;
                }
            });
        }

        return true;
    }

    /**
     * 获取锁状态
     * @Title: isLock
     * @Description: TODO
     * @return
     * @author yuhao.wang
     */
    public boolean isLock() {return locked;}

    /**
     * 重写 redisTemplate 的 set 方法
     * <p>
     * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。* <p>
     * 客户端执行以上的命令:* <p>
     * 如果服务器返回 OK,那么这个客户端获得锁。* 如果服务器返回 NIL,那么客户端获取锁失败,可以在稍后再重试。*
     * @param key     锁的 Key
     * @param value   锁里面的值
     * @param seconds 过去时间(秒)* @return
     */
    private String set(final String key, final String value, final long seconds) {Assert.isTrue(!StringUtils.isEmpty(key), "key 不能为空");
        return (String) redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {Object nativeConnection = connection.getNativeConnection();
                String result = null;
                if (nativeConnection instanceof JedisCommands) {result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                }

                if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {logger.info("获取锁 {} 的时间:{}", lockKeyLog, System.currentTimeMillis());
                }

                return result;
            }
        });
    }

    /**
     * @param millis 毫秒
     * @param nanos  纳秒
     * @Title: seleep
     * @Description: 线程等待时间
     * @author yuhao.wang
     */
    private void seleep(long millis, int nanos) {
        try {Thread.sleep(millis, random.nextInt(nanos));
        } catch (InterruptedException e) {logger.info("获取分布式锁休眠被中断:", e);
        }
    }

    public String getLockKeyLog() {return lockKeyLog;}

    public void setLockKeyLog(String lockKeyLog) {this.lockKeyLog = lockKeyLog;}

    public int getExpireTime() {return expireTime;}

    public void setExpireTime(int expireTime) {this.expireTime = expireTime;}

    public long getTimeOut() {return timeOut;}

    public void setTimeOut(long timeOut) {this.timeOut = timeOut;}


}
  • 库存扣减服务

/**
 * 扣库存
 * create by liuliang
 * on 2019-11-13  10:46
 */
@Service
public class StockComponent {Logger logger = LoggerFactory.getLogger(StockComponent.class);

    /**
     * 不限库存
     */
    public static final long UNINITIALIZED_STOCK = -3L;

    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;




    static {
        /**
         *
         * @desc 扣减库存 Lua 脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于 0:表示剩余库存
         *
         * @params 库存 key
         * @return
         *         -3: 库存未初始化
         *         -2: 库存不足
         *         -1: 不限库存
         *         大于等于 0: 剩余库存(扣减之后剩余的库存)*         redis 缓存的库存 (value) 是 - 1 表示不限库存,直接返回 -1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("local num = tonumber(ARGV[1]);");
        sb.append("if (stock == -1) then");
        sb.append("return -1;");
        sb.append("end;");
        sb.append("if (stock >= num) then");
        sb.append("return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("end;");
        sb.append("return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();}

    /**
     * @param key           库存 key
     * @param expire        库存有效时间, 单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2: 库存不足; -1: 不限库存; 大于等于 0: 扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final String initStock = stockCallback.getStock();
                        // 将库存设置到 redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {logger.error(e.getMessage(), e);
            } finally {redisLock.unlock();
            }

        }
        return stock;
    }

    /**
     * 加库存(还原库存)
     *
     * @param key    库存 key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {return addStock(key, null, num);
    }

    /**
     * 加库存
     *
     * @param key    库存 key
     * @param expire 过期时间(秒)* @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {boolean hasKey = redisTemplate.hasKey(key);
        // 判断 key 是否存在,存在就直接更新
        if (hasKey) {return redisTemplate.opsForValue().increment(key, num);
        }

        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为 null");
        RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
        try {if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有 key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {logger.error(e.getMessage(), e);
        } finally {redisLock.unlock();
        }

        return num;
    }

    /**
     * 获取库存
     *
     * @param key 库存 key
     * @return -1: 不限库存; 大于等于 0: 剩余库存
     */
    public int getStock(String key) {Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }

    /**
     * 扣库存
     *
     * @param key 库存 key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3: 库存未初始化; -2: 库存不足; -1: 不限库存; 大于等于 0: 扣减库存之后的剩余库存】*/
    private Long stock(String key, int num) {
        // 脚本里的 KEYS 参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的 ARGV 参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));

        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }

                // 单机模式
                else if (nativeConnection instanceof Jedis) {return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
}
  • 库存操作对外接口


/**
 *  库存操作对外接口
 *
 * create by liuliang
 * on 2019-11-13  11:00
 */
@Slf4j
@Service
public class StockService {

    @Autowired
    private StockComponent stockComponent;

    @Autowired
    private ProductSkuMapper skuMapper;

    @Autowired
    private ProductMapper productMapper;

    @Autowired
    private RocketMQConfig rocketMQConfig;

    @Autowired
    private PresentRocketProducer presentRocketProducer;

    private static final String REDIS_STOCK_KEY="redis_key:stock:";

    /**
     * 扣减库存
     * @param skuId
     * @param num
     * @return
     */
    public Boolean stock(String skuId,Integer num) {
        // 库存 ID
        String redisKey = REDIS_STOCK_KEY + skuId;
        long stock = stockComponent.stock(redisKey, 60 * 60, num, () -> initStock(skuId));
        if(stock < 0){// 异常, 库存不足
            log.info("库存不足........");
            ProductSku productSku = skuMapper.selectById(skuId);
            throw new MallException(MsRespCode.STOCK_NUMBER_ERROR,new Object[]{productMapper.selectById(productSku.getProductId()).getTitle()});
        }
        return stock >= 0 ;
    }



    /**
     * 添加 redis - sku 库存数量
     * @param skuId
     * @param num
     * @return
     */
    public Long addStock(String skuId ,Integer num) {
        // 库存 ID
        String redisKey = REDIS_STOCK_KEY + skuId;
        long l = stockComponent.addStock(redisKey, num);
        return l;
    }


    /**
     * 获取初始的库存
     *
     * @return
     */
    private String initStock(String skuId) {
        // 初始化库存
        ProductSku productSku = skuMapper.selectById(skuId);
        return productSku.getStockNumber()+"";}

    /**
     * 获取 sku 库存
     * @param skuId
     * @return
     */
    public Integer getStock(String skuId) {

        // 库存 ID
        String redisKey = REDIS_STOCK_KEY + skuId;

        return stockComponent.getStock(redisKey);
    }
}

redis 序列化


/**
 * create by liuliang
 * on 2019-11-13  11:29
 */
@Configuration
public class RedisConfig {
    /**
     * 重写 Redis 序列化方式,使用 Json 方式:
     * 当我们的数据存储到 Redis 的时候,我们的键(key)和值(value)都是通过 Spring 提供的 Serializer 序列化到数据库的。RedisTemplate 默认使用的是 JdkSerializationRedisSerializer,StringRedisTemplate 默认使用的是 StringRedisSerializer。* Spring Data JPA 为我们提供了下面的 Serializer:* GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。* 在此我们将自己配置 RedisTemplate 并定义 Serializer。*
     * @param redisConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // 设置值(value)的序列化采用 Jackson2JsonRedisSerializer。redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 设置键(key)的序列化采用 StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

正文完
 0