思维导图

文章已收录到Github精选,欢送Star
https://github.com/yehongzhi/learningSummary

前言

通过《ZooKeeper入门》后,咱们学会了ZooKeeper的根本用法。

实际上ZooKeeper的利用是十分宽泛的,实现分布式锁只是其中一种。接下来咱们就ZooKeeper实现分布式锁解决秒杀超卖问题进行开展。

一、什么是秒杀超卖问题

秒杀流动应该都不生疏,不必过多解释。

不难想象,在这种"秒杀"的场景中,实际上会呈现多个用户争抢"资源"的状况,也就是多个线程同时并发,这种状况是很容易呈现数据不精确,也就是超卖问题

1.1 我的项目演示

上面应用程序演示,我应用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一个简略的我的项目,github地址:

https://github.com/yehongzhi/...

创立一个商品信息表:

CREATE TABLE `tb_commodity_info` (  `id` varchar(32) NOT NULL,  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',  `number` int(10) DEFAULT '0' COMMENT '商品数量',  `description` varchar(2048) DEFAULT '' COMMENT '商品形容',  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

增加一个商品[叉烧包]进去:

外围的代码逻辑是这样的:

    @Override    public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {        //1.先查询数据库中商品的数量        TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);        //2.判断商品数量是否大于0,或者购买的数量大于库存        Integer count = commodityInfo.getNumber();        if (count <= 0 || number > count) {            //商品数量小于或者等于0,或者购买的数量大于库存,则返回false            return false;        }        //3.如果库存数量大于0,并且购买的数量小于或者等于库存。则更新商品数量        count -= number;        commodityInfo.setNumber(count);        boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;        if (bool) {            //如果更新胜利,则打印购买商品胜利            System.out.println("购买商品[ " + commodityInfo.getCommodityName() + " ]胜利,数量为:" + number);        }        return bool;    }

逻辑示意图如下:

下面这个逻辑,如果单线程申请的话是没有问题的。

然而多线程的话就呈现问题了。当初我就创立多个线程,通过HttpClient进行申请,看会产生什么:

    public static void main(String[] args) throws Exception {        //申请地址        String url = "http://localhost:8080/mall/commodity/purchase";        //申请参数,商品ID,数量        Map<String, String> map = new HashMap<>();        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");        map.put("number", "1");        //创立10个线程通过HttpClient进行发送申请,测试        for (int i = 0; i < 10; i++) {            //这个线程的逻辑仅仅是发送申请            CommodityThread commodityThread = new CommodityThread(url, map);            commodityThread.start();        }    }

阐明一下,叉烧包的数量是100,这里有10个线程同时去购买,假如都购买胜利的话,库存数量应该是90。

实际上,10个线程确实都购买胜利了:

然而数据库的商品库存,却不精确:

二、尝试应用本地锁

下面的场景,大略流程如下所示:

能够看出问题的关键在于两个线程"同时"去查问残余的库存,而后更新库存导致的。要解决这个问题,其实只有保障多个线程在这段逻辑是程序执行即可,也就是加锁

本地锁JDK提供有两种:synchronized和Lock锁。

两种形式都能够,我这里为了简便,应用synchronized:

    //应用synchronized润饰办法    @Override    public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {        //省略...    }

而后再测试刚刚多线程并发抢购的状况,看看后果:

问题失去解决!!!

你认为事件就这样完结了吗,看了看进度条,发现事件并不简略。

咱们晓得在理论我的项目中,往往不会只部署一台服务器,所以无妨咱们启动两台服务器,端口号别离是8080、8081,模仿理论我的项目的场景:

写一个交替申请的测试脚本,模仿多台服务器别离解决申请,用户秒杀抢购的场景:

    public static void main(String[] args) throws Exception {        //申请地址        String url = "http://localhost:%s/mall/commodity/purchase";        //申请参数,商品ID,数量        Map<String, String> map = new HashMap<>();        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");        map.put("number", "1");        //创立10个线程通过HttpClient进行发送申请,测试        for (int i = 0; i < 10; i++) {            //8080、8081交替申请,每个服务器解决5个申请            String port = "808" + (i % 2);            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);            commodityThread.start();        }    }

首先看购买的状况,必定都是购买胜利的:

要害是库存数量是否正确:

有10个申请购买胜利,库存应该是90才对,这里库存是95。事实证明本地锁是不能解决多台服务器秒杀抢购呈现超卖的问题

为什么会这样呢,请看示意图:

其实和多线程问题是差不多的起因,多个服务器去查询数据库,获取到雷同的库存,而后更新库存,导致数据不正确。要保障库存的数量正确,关键在于多台服务器要保障只能一台服务器在执行这段逻辑,也就是要加分布式锁。

这也体现出分布式锁的作用,就是要保障多台服务器只能有一台服务器执行。

分布式锁有三种实现形式,别离是redis、ZooKeeper、数据库(比方mysql)。

三、应用ZooKeeper实现分布式锁

3.1 原理

实际上是利用ZooKeeper的长期程序节点的个性实现分布式锁。怎么实现呢?

假如当初有一个客户端A,须要加锁,那么就在"/Lock"门路下创立一个长期程序节点。而后获取"/Lock"下的节点列表,判断本人的序号是否是最小的,如果是最小的序号,则加锁胜利!

当初又有另一个客户端,客户端B须要加锁,那么也是在"/Lock"门路下创立长期程序节点。仍然获取"/Lock"下的节点列表,判断本人的节点序号是否最小的。发现不是最小的,加锁失败,接着对本人的上一个节点进行监听。

怎么开释锁呢,其实就是把长期节点删除。假如客户端A开释锁,把节点01删除了。那就会触发节点02的监听事件,客户端就再次获取节点列表,而后判断本人是否是最小的序号,如果是最小序号则加锁。

如果多个客户端其实也是一样,一上来就会创立一个长期节点,而后开始判断本人是否是最小的序号,如果不是就监听上一个节点,造成一种排队的机制。也就造成了锁的成果,保障了多台服务器只有一台执行。

假如其中有一个客户端宕机了,依据长期节点的特点,ZooKeeper会主动删除对应的长期节点,相当于主动开释了锁。

3.2 手写代码实现分布式锁

首先退出Maven依赖

<dependency>    <groupId>org.apache.zookeeper</groupId>    <artifactId>zookeeper</artifactId>    <version>3.4.6</version></dependency><dependency>    <groupId>com.101tec</groupId>    <artifactId>zkclient</artifactId>    <version>0.4</version></dependency>

接着依照下面剖析的思路敲代码,创立ZkLock类:

public class ZkLock implements Lock {    //计数器,用于加锁失败时,阻塞    private static CountDownLatch cdl = new CountDownLatch(1);    //ZooKeeper服务器的IP端口    private static final String IP_PORT = "127.0.0.1:2181";    //锁的根门路    private static final String ROOT_NODE = "/Lock";    //上一个节点的门路    private volatile String beforePath;    //以后上锁的节点门路    private volatile String currPath;    //创立ZooKeeper客户端    private ZkClient zkClient = new ZkClient(IP_PORT);    public ZkLock() {        //判断是否存在根节点        if (!zkClient.exists(ROOT_NODE)) {            //不存在则创立            zkClient.createPersistent(ROOT_NODE);        }    }        //加锁    public void lock() {        if (tryLock()) {            System.out.println("加锁胜利!!");        } else {            // 尝试加锁失败,进入期待 监听            waitForLock();            // 再次尝试加锁            lock();        }    }        //尝试加锁    public synchronized boolean tryLock() {        // 第一次就进来创立本人的长期节点        if (StringUtils.isBlank(currPath)) {            currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");        }        // 对节点排序        List<String> children = zkClient.getChildren(ROOT_NODE);        Collections.sort(children);        // 以后的是最小节点就返回加锁胜利        if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {            return true;        } else {            // 不是最小节点 就找到本人的前一个 顺次类推 开释也是一样            int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;            beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);            //返回加锁失败            return false;        }    }        //解锁    public void unlock() {        //删除节点并敞开客户端        zkClient.delete(currPath);        zkClient.close();    }        //期待上锁,加锁失败进入阻塞,监听上一个节点    private void waitForLock() {        IZkDataListener listener = new IZkDataListener() {            //监听节点更新事件            public void handleDataChange(String s, Object o) throws Exception {            }            //监听节点被删除事件            public void handleDataDeleted(String s) throws Exception {                //解除阻塞                cdl.countDown();            }        };        // 监听上一个节点        this.zkClient.subscribeDataChanges(beforePath, listener);        //判断上一个节点是否存在        if (zkClient.exists(beforePath)) {            //上一个节点存在            try {                System.out.println("加锁失败 期待");                //加锁失败,阻塞期待                cdl.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        // 开释监听        zkClient.unsubscribeDataChanges(beforePath, listener);    }    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }    public void lockInterruptibly() throws InterruptedException {    }    public Condition newCondition() {        return null;    }}

在Controller层加上锁:

    @PostMapping("/purchase")    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {        boolean bool;        //获取ZooKeeper分布式锁        ZkLock zkLock = new ZkLock();        try {            //上锁            zkLock.lock();            //调用秒杀抢购的service办法            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);        } catch (Exception e) {            e.printStackTrace();            bool = false;        } finally {            //解锁            zkLock.unlock();        }        return bool;    }

测试,仍然起两台服务器,8080、8081。而后跑测试脚本:

    public static void main(String[] args) throws Exception {        //申请地址        String url = "http://localhost:%s/mall/commodity/purchase";        //申请参数,商品ID,数量        Map<String, String> map = new HashMap<>();        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");        map.put("number", "1");        //创立10个线程通过HttpClient进行发送申请,测试        for (int i = 0; i < 10; i++) {            //8080、8081交替申请            String port = "808" + (i % 2);            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);            commodityThread.start();        }    }

后果正确:

3.3 造好的轮子

Curator是Apache开源的一个操作ZooKeeper的框架。其中就有实现ZooKeeper分布式锁的性能。

当然分布式锁的实现只是这个框架的其中一个很小的局部,除此之外还有很多用处,大家能够到官网去学习。

首先增加Maven依赖:

    <dependency>        <groupId>org.apache.curator</groupId>        <artifactId>curator-framework</artifactId>        <version>4.3.0</version>    </dependency>    <dependency>        <groupId>org.apache.curator</groupId>        <artifactId>curator-recipes</artifactId>        <version>4.3.0</version>    </dependency>

还是一样在须要加锁的中央进行加锁:

    @PostMapping("/purchase")    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,                                         @RequestParam(name = "number") Integer number) throws Exception {        boolean bool = false;        //设置重试策略        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);        // 启动客户端        client.start();        InterProcessMutex mutex = new InterProcessMutex(client, "/locks");        try {            //加锁            if (mutex.acquire(3, TimeUnit.SECONDS)) {                //调用抢购秒杀service办法                bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);            }        } catch (Exception e) {            e.printStackTrace();        } finally {            //解锁            mutex.release();            client.close();        }        return bool;    }

四、遇到的坑

我尝试用原生的ZooKeeper写分布式锁,有点炸裂。遇到不少坑,最终放弃了,用zkclient的API。可能我太菜了不太会用。

上面我分享我遇到的一些问题,心愿你们在遇到同类型的异样时能迅速定位问题。

4.1 Session expired

这个谬误是应用原生ZooKeeper的API呈现的谬误。次要是我在进入debug模式进行调试呈现的。

因为原生的ZooKeeper须要设定一个会话超时工夫,个别debug模式咱们都会卡在一个中央去调试,必定就超出了设置的会话工夫~

4.2 KeeperErrorCode = ConnectionLoss

这个也是原生ZooKeeper的API的谬误,怎么呈现的呢?

次要是创立的ZooKeeper客户端连贯服务器时是异步的,因为连贯须要工夫,还没连贯胜利,代码曾经开始执行create()或者exists(),而后就报这个谬误。

解决办法:应用CountDownLatch计数器阻塞,连贯胜利后再进行阻塞,而后执行create()或者exists()等操作。

4.3 并发查问更新呈现数据不统一

这个谬误真的太炸裂了~

一开始我是把分布式锁加在service层,而后认为搞定了。接着启动8080、8081进行并发测试。10个线程都是购买胜利,后果竟然是不正确!

第一反馈感觉本人实现的代码有问题,于是换成curator框架实现的分布式锁,开源框架应该没问题了吧。没想到还是不行~

既然不是锁自身的问题,是不是事务问题。上一个事务更新库存的操作还没提交,而后下一个申请就进来查问。于是我就把加锁的范畴放大一点,放在Controller层。竟然胜利了!

你可能曾经留神到,我在下面的例子就是把分布式锁加在Controller层,其实我不太喜爱在Controller层写太多代码。

兴许有更加优雅的形式,惋惜自己能力有余,如果你有更好的实现形式,能够分享一下~

补充:上面评论有位大佬说,在原来的办法外再包裹一层,亲测是能够的。这应该是事务的问题。

下面放在Controller层能够胜利是不是因为Controller层没有事务,原来写在service我是写了一个@Transactional注解在类上,所以整个类外面的都有事务,所以解锁后还没提交事务去更新数据库,而后下一个申请进来就查到了没更新的数据。

为了优雅一点,就把@Transactional注解放在抢购的service办法上

而后再包裹一个没有事务的办法,用于上锁。

五、总结

最初,咱们回顾总结一下吧:

  • 首先咱们模仿单机多线程的秒杀场景,单机的话能够应用本地锁解决问题。
  • 接着模仿多服务器多线程的场景,思路是应用ZooKeeper实现分布式锁解决。
  • 图解ZooKeeper实现分布式锁的原理。
  • 而后入手写代码,实现分布式锁。
  • 最初总结遇到的坑。

心愿这篇文章对你有用,感觉有用就点个赞吧~