关于java:京东一面Redis-如何实现库存扣减操作如何防止商品被超卖

30次阅读

共计 5583 个字符,预计需要花费 14 分钟才能阅读完成。

起源:my.oschina.net/xiaolyuh/blog/1615639

在日常开发中有很多中央都有相似扣减库存的操作,比方电商零碎中的商品库存,抽奖零碎中的奖品库存等。

解决方案

  1. 应用 mysql 数据库,应用一个字段来存储库存,每次扣减库存去更新这个字段。
  2. 还是应用数据库,然而将库存分层多份存到多条记录外面,扣减库存的时候路由一下,这样子增大了并发量,然而还是防止不了大量的去拜访数据库来更新库存。
  3. 将库存放到 redis 应用 redis 的 incrby 个性来扣减库存。

剖析

在下面的第一种和第二种形式都是基于数据来扣减库存。

基于数据库单库存

第一种形式在所有申请都会在这里期待锁,获取锁有去扣减库存。在并发量不高的状况下能够应用,然而一旦并发量大了就会有大量申请阻塞在这里,导致申请超时,进而整个零碎雪崩;而且会频繁的去拜访数据库,大量占用数据库资源,所以在并发高的状况下这种形式不实用。

基于数据库多库存

第二种形式其实是第一种形式的优化版本,在肯定水平上进步了并发量,然而在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

用数据库扣减库存的形式,扣减库存的操作必须在一条语句中执行,不能先 selec 在 update,这样在并发下会呈现超扣的状况。如:

update number set x=x-1 where x > 0

MySQL 本身对于高并发的解决性能就会呈现问题,一般来说,MySQL 的解决性能会随着并发 thread 回升而回升,然而到了肯定的并发度之后会呈现显著的拐点,之后一路降落,最终甚至会比单 thread 的性能还要差。

当减库存和高并发碰到一起的时候,因为操作的库存数目在同一行,就会呈现争抢 InnoDB 行锁的问题,导致呈现相互期待甚至死锁,从而大大降低 MySQL 的解决性能,最终导致前端页面呈现超时异样。

基于 redis

针对上述问题的问题咱们就有了第三种计划,将库存放到缓存,利用 redis 的 incrby 个性来扣减库存,解决了超扣和性能问题。然而一旦缓存失落须要思考复原计划。比方抽奖零碎扣奖品库存的时候,初始库存 = 总的库存数 - 曾经发放的处分数,然而如果是异步发奖,须要等到 MQ 音讯生产完了能力重启 redis 初始化库存,否则也存在库存不统一的问题。

基于 redis 实现扣减库存的具体实现

  • 咱们应用 redis 的 lua 脚本来实现扣减库存
  • 因为是分布式环境下所以还须要一个分布式锁来管制只能有一个服务去初始化库存
  • 须要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback)

/**
 * 获取库存回调
 * @author yuhao.wang
 */
public interface IStockCallback {

    /**
     * 获取库存
     * @return
     */
    int getStock();}

举荐一个 Spring Boot 基础教程及实战示例:

https://github.com/javastacks…

扣减库存服务(StockService)

/**
 * 扣库存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {Logger logger = LoggerFactory.getLogger(StockService.class);

    /**
     * 不限库存
     */
    public static final long UNINITIALIZED_STOCK = -3L;

    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;

    static {
        /**
         *
         * @desc 扣减库存 Lua 脚本
         * 库存(stock)-1:示意不限库存
         * 库存(stock)0:示意没有库存
         * 库存(stock)大于 0:示意残余库存
         *
         * @params 库存 key
         * @return
         *   -3: 库存未初始化
         *   -2: 库存有余
         *   -1: 不限库存
         *   大于等于 0: 残余库存(扣减之后残余的库存)*      redis 缓存的库存 (value) 是 - 1 示意不限库存,间接返回 1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("local num = tonumber(ARGV[1]);");
        sb.append("if (stock == -1) then");
        sb.append("return -1;");
        sb.append("end;");
        sb.append("if (stock >= num) then");
        sb.append("return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("end;");
        sb.append("return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();}

    /**
     * @param key           库存 key
     * @param expire        库存无效工夫, 单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2: 库存有余; -1: 不限库存; 大于等于 0: 扣减库存之后的残余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,防止并发时反复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到 redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {logger.error(e.getMessage(), e);
            } finally {redisLock.unlock();
            }

        }
        return stock;
    }

    /**
     * 加库存(还原库存)
     *
     * @param key    库存 key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {return addStock(key, null, num);
    }

    /**
     * 加库存
     *
     * @param key    库存 key
     * @param expire 过期工夫(秒)* @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {boolean hasKey = redisTemplate.hasKey(key);
        // 判断 key 是否存在,存在就间接更新
        if (hasKey) {return redisTemplate.opsForValue().increment(key, num);
        }

        Assert.notNull(expire,"初始化库存失败,库存过期工夫不能为 null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有 key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {logger.error(e.getMessage(), e);
        } finally {redisLock.unlock();
        }

        return num;
    }

    /**
     * 获取库存
     *
     * @param key 库存 key
     * @return -1: 不限库存; 大于等于 0: 残余库存
     */
    public int getStock(String key) {Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }

    /**
     * 扣库存
     *
     * @param key 库存 key
     * @param num 扣减库存数量
     * @return 扣减之后残余的库存【-3: 库存未初始化; -2: 库存有余; -1: 不限库存; 大于等于 0: 扣减库存之后的残余库存】*/
    private Long stock(String key, int num) {
        // 脚本里的 KEYS 参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的 ARGV 参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));

        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式尽管执行脚本的办法一样,然而没有独特的接口,所以只能离开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }

                // 单机模式
                else if (nativeConnection instanceof Jedis) {return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }

}

调用

/**
 * @author yuhao.wang
 */
@RestController
public class StockController {

    @Autowired
    private StockService stockService;

    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品 ID
        long commodityId = 1;
        // 库存 ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }

    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }

    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品 ID
        long commodityId = 1;
        // 库存 ID
        String redisKey = "redis_key:stock:" + commodityId;

        return stockService.getStock(redisKey);
    }

    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品 ID
        long commodityId = 2;
        // 库存 ID
        String redisKey = "redis_key:stock:" + commodityId;

        return stockService.addStock(redisKey, 2);
    }
}

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿(2022 最新版)

2. 劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0