关于java:从秒杀聊到ZooKeeper分布式锁

1次阅读

共计 8904 个字符,预计需要花费 23 分钟才能阅读完成。

思维导图

文章已收录到 Github 精选,欢送 Star
https://github.com/yehongzhi/learningSummary

前言

通过《ZooKeeper 入门》后,咱们学会了 ZooKeeper 的根本用法。

实际上 ZooKeeper 的利用是十分宽泛的,实现分布式锁只是其中一种。接下来咱们就 ZooKeeper 实现分布式锁解决 秒杀超卖问题 进行开展。

一、什么是秒杀超卖问题

秒杀流动应该都不生疏,不必过多解释。

不难想象,在这种 ” 秒杀 ” 的场景中,实际上会呈现多个用户争抢 ” 资源 ” 的状况,也就是多个线程同时并发,这种状况是很容易呈现数据不精确,也就是超卖问题

1.1 我的项目演示

上面应用程序演示,我应用了 SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC 搭建了一个简略的我的项目,github 地址:

https://github.com/yehongzhi/…

创立一个商品信息表:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT ''COMMENT' 商品形容 ',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

增加一个商品 [ 叉烧包 ] 进去:

外围的代码逻辑是这样的:

    @Override
    public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //1. 先查询数据库中商品的数量
        TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
        //2. 判断商品数量是否大于 0,或者购买的数量大于库存
        Integer count = commodityInfo.getNumber();
        if (count <= 0 || number > count) {
            // 商品数量小于或者等于 0,或者购买的数量大于库存,则返回 false
            return false;
        }
        //3. 如果库存数量大于 0,并且购买的数量小于或者等于库存。则更新商品数量
        count -= number;
        commodityInfo.setNumber(count);
        boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
        if (bool) {
            // 如果更新胜利,则打印购买商品胜利
            System.out.println("购买商品 [" + commodityInfo.getCommodityName() + "] 胜利, 数量为:" + number);
        }
        return bool;
    }

逻辑示意图如下:

下面这个逻辑,如果单线程申请的话是没有问题的。

然而多线程的话就呈现问题了。当初我就创立多个线程,通过 HttpClient 进行申请,看会产生什么:

    public static void main(String[] args) throws Exception {
        // 申请地址
        String url = "http://localhost:8080/mall/commodity/purchase";
        // 申请参数,商品 ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        // 创立 10 个线程通过 HttpClient 进行发送申请,测试
        for (int i = 0; i < 10; i++) {
            // 这个线程的逻辑仅仅是发送申请
            CommodityThread commodityThread = new CommodityThread(url, map);
            commodityThread.start();}
    }

阐明一下,叉烧包的数量是 100,这里有 10 个线程同时去购买,假如都购买胜利的话,库存数量应该是 90。

实际上,10 个线程确实都购买胜利了:

然而数据库的商品库存,却不精确:

二、尝试应用本地锁

下面的场景,大略流程如下所示:

能够看出问题的 关键在于两个线程 ” 同时 ” 去查问残余的库存,而后更新库存导致的 。要解决这个问题,其实 只有保障多个线程在这段逻辑是程序执行即可,也就是加锁

本地锁 JDK 提供有两种:synchronized 和 Lock 锁。

两种形式都能够,我这里为了简便,应用 synchronized:

    // 应用 synchronized 润饰办法
    @Override
    public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {// 省略...}

而后再测试刚刚多线程并发抢购的状况,看看后果:

问题失去解决!!!

你认为事件就这样完结了吗,看了看进度条,发现事件并不简略。

咱们晓得在理论我的项目中,往往不会只部署一台服务器,所以无妨咱们启动两台服务器,端口号别离是 8080、8081,模仿理论我的项目的场景:

写一个交替申请的测试脚本,模仿多台服务器别离解决申请,用户秒杀抢购的场景:

    public static void main(String[] args) throws Exception {
        // 申请地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        // 申请参数,商品 ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        // 创立 10 个线程通过 HttpClient 进行发送申请,测试
        for (int i = 0; i < 10; i++) {
            //8080、8081 交替申请,每个服务器解决 5 个申请
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();}
    }

首先看购买的状况,必定都是购买胜利的:

要害是库存数量是否正确:

有 10 个申请购买胜利,库存应该是 90 才对,这里库存是 95。事实证明 本地锁是不能解决多台服务器秒杀抢购呈现超卖的问题

为什么会这样呢,请看示意图:

其实和多线程问题是差不多的起因,多个服务器去查询数据库,获取到雷同的库存,而后更新库存,导致数据不正确 。要保障库存的数量正确, 关键在于多台服务器要保障只能一台服务器在执行这段逻辑,也就是要加分布式锁。

这也体现出分布式锁的作用,就是要保障多台服务器只能有一台服务器执行。

分布式锁有三种实现形式,别离是 redis、ZooKeeper、数据库(比方 mysql)。

三、应用 ZooKeeper 实现分布式锁

3.1 原理

实际上是利用 ZooKeeper 的长期程序节点的个性实现分布式锁。怎么实现呢?

假如当初有一个客户端 A,须要加锁,那么就在 ”/Lock” 门路下创立一个长期程序节点。而后获取 ”/Lock” 下的节点列表,判断本人的序号是否是最小的,如果是最小的序号,则加锁胜利!

当初又有另一个客户端,客户端 B 须要加锁,那么也是在 ”/Lock” 门路下创立长期程序节点。仍然获取 ”/Lock” 下的节点列表,判断本人的节点序号是否最小的。发现不是最小的,加锁失败,接着对本人的上一个节点进行监听。

怎么开释锁呢,其实就是把长期节点删除。假如客户端 A 开释锁,把节点 01 删除了。那就会触发节点 02 的监听事件,客户端就再次获取节点列表,而后判断本人是否是最小的序号,如果是最小序号则加锁。

如果多个客户端其实也是一样,一上来就会创立一个长期节点,而后开始判断本人是否是最小的序号,如果不是就监听上一个节点,造成一种排队的机制。也就造成了锁的成果,保障了多台服务器只有一台执行。

假如其中有一个客户端宕机了,依据长期节点的特点,ZooKeeper 会主动删除对应的长期节点,相当于主动开释了锁。

3.2 手写代码实现分布式锁

首先退出 Maven 依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>

接着依照下面剖析的思路敲代码,创立 ZkLock 类:

public class ZkLock implements Lock {
    // 计数器,用于加锁失败时,阻塞
    private static CountDownLatch cdl = new CountDownLatch(1);
    //ZooKeeper 服务器的 IP 端口
    private static final String IP_PORT = "127.0.0.1:2181";
    // 锁的根门路
    private static final String ROOT_NODE = "/Lock";
    // 上一个节点的门路
    private volatile String beforePath;
    // 以后上锁的节点门路
    private volatile String currPath;
    // 创立 ZooKeeper 客户端
    private ZkClient zkClient = new ZkClient(IP_PORT);

    public ZkLock() {
        // 判断是否存在根节点
        if (!zkClient.exists(ROOT_NODE)) {
            // 不存在则创立
            zkClient.createPersistent(ROOT_NODE);
        }
    }
    
    // 加锁
    public void lock() {if (tryLock()) {System.out.println("加锁胜利!!");
        } else {
            // 尝试加锁失败,进入期待 监听
            waitForLock();
            // 再次尝试加锁
            lock();}

    }
    
    // 尝试加锁
    public synchronized boolean tryLock() {
        // 第一次就进来创立本人的长期节点
        if (StringUtils.isBlank(currPath)) {currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
        }
        // 对节点排序
        List<String> children = zkClient.getChildren(ROOT_NODE);
        Collections.sort(children);

        // 以后的是最小节点就返回加锁胜利
        if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {return true;} else {
            // 不是最小节点 就找到本人的前一个 顺次类推 开释也是一样
            int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
            beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
            // 返回加锁失败
            return false;
        }
    }
    
    // 解锁
    public void unlock() {
        // 删除节点并敞开客户端
        zkClient.delete(currPath);
        zkClient.close();}
    
    // 期待上锁,加锁失败进入阻塞,监听上一个节点
    private void waitForLock() {IZkDataListener listener = new IZkDataListener() {
            // 监听节点更新事件
            public void handleDataChange(String s, Object o) throws Exception { }

            // 监听节点被删除事件
            public void handleDataDeleted(String s) throws Exception {
                // 解除阻塞
                cdl.countDown();}
        };
        // 监听上一个节点
        this.zkClient.subscribeDataChanges(beforePath, listener);
        // 判断上一个节点是否存在
        if (zkClient.exists(beforePath)) {
            // 上一个节点存在
            try {System.out.println("加锁失败 期待");
                // 加锁失败,阻塞期待
                cdl.await();} catch (InterruptedException e) {e.printStackTrace();
            }
        }
        // 开释监听
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}

    public void lockInterruptibly() throws InterruptedException {}

    public Condition newCondition() {return null;}
}

在 Controller 层加上锁:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool;
        // 获取 ZooKeeper 分布式锁
        ZkLock zkLock = new ZkLock();
        try {
            // 上锁
            zkLock.lock();
            // 调用秒杀抢购的 service 办法
            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
        } catch (Exception e) {e.printStackTrace();
            bool = false;
        } finally {
            // 解锁
            zkLock.unlock();}
        return bool;
    }

测试,仍然起两台服务器,8080、8081。而后跑测试脚本:

    public static void main(String[] args) throws Exception {
        // 申请地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        // 申请参数,商品 ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        // 创立 10 个线程通过 HttpClient 进行发送申请,测试
        for (int i = 0; i < 10; i++) {
            //8080、8081 交替申请
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();}
    }

后果正确:

3.3 造好的轮子

Curator 是 Apache 开源的一个操作 ZooKeeper 的框架。其中就有实现 ZooKeeper 分布式锁的性能。

当然分布式锁的实现只是这个框架的其中一个很小的局部,除此之外还有很多用处,大家能够到官网去学习。

首先增加 Maven 依赖:

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3.0</version>
    </dependency>

还是一样在须要加锁的中央进行加锁:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
                                         @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool = false;
        // 设置重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        // 启动客户端
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
        try {
            // 加锁
            if (mutex.acquire(3, TimeUnit.SECONDS)) {
                // 调用抢购秒杀 service 办法
                bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
            }
        } catch (Exception e) {e.printStackTrace();
        } finally {
            // 解锁
            mutex.release();
            client.close();}
        return bool;
    }

四、遇到的坑

我尝试用原生的 ZooKeeper 写分布式锁,有点炸裂。遇到不少坑,最终放弃了,用 zkclient 的 API。可能我太菜了不太会用。

上面我分享我遇到的一些问题,心愿你们在遇到同类型的异样时能迅速定位问题。

4.1 Session expired

这个谬误是应用原生 ZooKeeper 的 API 呈现的谬误。次要是我在进入 debug 模式进行调试呈现的。

因为原生的 ZooKeeper 须要设定一个会话超时工夫,个别 debug 模式咱们都会卡在一个中央去调试,必定就超出了设置的会话工夫~

4.2 KeeperErrorCode = ConnectionLoss

这个也是原生 ZooKeeper 的 API 的谬误,怎么呈现的呢?

次要是创立的 ZooKeeper 客户端连贯服务器时是异步的,因为连贯须要工夫,还没连贯胜利,代码曾经开始执行 create()或者 exists(),而后就报这个谬误。

解决办法:应用 CountDownLatch 计数器阻塞,连贯胜利后再进行阻塞,而后执行 create()或者 exists()等操作。

4.3 并发查问更新呈现数据不统一

这个谬误真的太炸裂了~

一开始我是把分布式锁加在 service 层,而后认为搞定了。接着启动 8080、8081 进行并发测试。10 个线程都是购买胜利,后果竟然是不正确!

第一反馈感觉本人实现的代码有问题,于是换成 curator 框架实现的分布式锁,开源框架应该没问题了吧。没想到还是不行~

既然不是锁自身的问题,是不是事务问题。上一个事务更新库存的操作还没提交,而后下一个申请就进来查问。于是我就把加锁的范畴放大一点,放在 Controller 层。竟然胜利了!

你可能曾经留神到,我在下面的例子就是把分布式锁加在 Controller 层,其实我不太喜爱在 Controller 层写太多代码。

兴许有更加优雅的形式,惋惜自己能力有余,如果你有更好的实现形式,能够分享一下~

补充:上面评论有位大佬说,在原来的办法外再包裹一层,亲测是能够的。这应该是事务的问题。

下面放在 Controller 层能够胜利是不是因为 Controller 层没有事务,原来写在 service 我是写了一个 @Transactional 注解在类上,所以整个类外面的都有事务,所以解锁后还没提交事务去更新数据库,而后下一个申请进来就查到了没更新的数据。

为了优雅一点,就把 @Transactional 注解放在抢购的 service 办法上

而后再包裹一个没有事务的办法,用于上锁。

五、总结

最初,咱们回顾总结一下吧:

  • 首先咱们模仿单机多线程的秒杀场景,单机的话能够应用本地锁解决问题。
  • 接着模仿多服务器多线程的场景,思路是应用 ZooKeeper 实现分布式锁解决。
  • 图解 ZooKeeper 实现分布式锁的原理。
  • 而后入手写代码,实现分布式锁。
  • 最初总结遇到的坑。

心愿这篇文章对你有用,感觉有用就点个赞吧~

正文完
 0