思维导图
文章已收录到Github精选,欢送Star:
https://github.com/yehongzhi/learningSummary
前言
通过《ZooKeeper入门》后,咱们学会了ZooKeeper的根本用法。
实际上ZooKeeper的利用是十分宽泛的,实现分布式锁只是其中一种。接下来咱们就ZooKeeper实现分布式锁解决秒杀超卖问题进行开展。
一、什么是秒杀超卖问题
秒杀流动应该都不生疏,不必过多解释。
不难想象,在这种"秒杀"的场景中,实际上会呈现多个用户争抢"资源"的状况,也就是多个线程同时并发,这种状况是很容易呈现数据不精确,也就是超卖问题。
1.1 我的项目演示
上面应用程序演示,我应用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一个简略的我的项目,github地址:
https://github.com/yehongzhi/...
创立一个商品信息表:
CREATE TABLE `tb_commodity_info` ( `id` varchar(32) NOT NULL, `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称', `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格', `number` int(10) DEFAULT '0' COMMENT '商品数量', `description` varchar(2048) DEFAULT '' COMMENT '商品形容', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
增加一个商品[叉烧包]进去:
外围的代码逻辑是这样的:
@Override public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception { //1.先查询数据库中商品的数量 TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId); //2.判断商品数量是否大于0,或者购买的数量大于库存 Integer count = commodityInfo.getNumber(); if (count <= 0 || number > count) { //商品数量小于或者等于0,或者购买的数量大于库存,则返回false return false; } //3.如果库存数量大于0,并且购买的数量小于或者等于库存。则更新商品数量 count -= number; commodityInfo.setNumber(count); boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1; if (bool) { //如果更新胜利,则打印购买商品胜利 System.out.println("购买商品[ " + commodityInfo.getCommodityName() + " ]胜利,数量为:" + number); } return bool; }
逻辑示意图如下:
下面这个逻辑,如果单线程申请的话是没有问题的。
然而多线程的话就呈现问题了。当初我就创立多个线程,通过HttpClient进行申请,看会产生什么:
public static void main(String[] args) throws Exception { //申请地址 String url = "http://localhost:8080/mall/commodity/purchase"; //申请参数,商品ID,数量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //创立10个线程通过HttpClient进行发送申请,测试 for (int i = 0; i < 10; i++) { //这个线程的逻辑仅仅是发送申请 CommodityThread commodityThread = new CommodityThread(url, map); commodityThread.start(); } }
阐明一下,叉烧包的数量是100,这里有10个线程同时去购买,假如都购买胜利的话,库存数量应该是90。
实际上,10个线程确实都购买胜利了:
然而数据库的商品库存,却不精确:
二、尝试应用本地锁
下面的场景,大略流程如下所示:
能够看出问题的关键在于两个线程"同时"去查问残余的库存,而后更新库存导致的。要解决这个问题,其实只有保障多个线程在这段逻辑是程序执行即可,也就是加锁。
本地锁JDK提供有两种:synchronized和Lock锁。
两种形式都能够,我这里为了简便,应用synchronized:
//应用synchronized润饰办法 @Override public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception { //省略... }
而后再测试刚刚多线程并发抢购的状况,看看后果:
问题失去解决!!!
你认为事件就这样完结了吗,看了看进度条,发现事件并不简略。
咱们晓得在理论我的项目中,往往不会只部署一台服务器,所以无妨咱们启动两台服务器,端口号别离是8080、8081,模仿理论我的项目的场景:
写一个交替申请的测试脚本,模仿多台服务器别离解决申请,用户秒杀抢购的场景:
public static void main(String[] args) throws Exception { //申请地址 String url = "http://localhost:%s/mall/commodity/purchase"; //申请参数,商品ID,数量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //创立10个线程通过HttpClient进行发送申请,测试 for (int i = 0; i < 10; i++) { //8080、8081交替申请,每个服务器解决5个申请 String port = "808" + (i % 2); CommodityThread commodityThread = new CommodityThread(String.format(url, port), map); commodityThread.start(); } }
首先看购买的状况,必定都是购买胜利的:
要害是库存数量是否正确:
有10个申请购买胜利,库存应该是90才对,这里库存是95。事实证明本地锁是不能解决多台服务器秒杀抢购呈现超卖的问题。
为什么会这样呢,请看示意图:
其实和多线程问题是差不多的起因,多个服务器去查询数据库,获取到雷同的库存,而后更新库存,导致数据不正确。要保障库存的数量正确,关键在于多台服务器要保障只能一台服务器在执行这段逻辑,也就是要加分布式锁。
这也体现出分布式锁的作用,就是要保障多台服务器只能有一台服务器执行。
分布式锁有三种实现形式,别离是redis、ZooKeeper、数据库(比方mysql)。
三、应用ZooKeeper实现分布式锁
3.1 原理
实际上是利用ZooKeeper的长期程序节点的个性实现分布式锁。怎么实现呢?
假如当初有一个客户端A,须要加锁,那么就在"/Lock"门路下创立一个长期程序节点。而后获取"/Lock"下的节点列表,判断本人的序号是否是最小的,如果是最小的序号,则加锁胜利!
当初又有另一个客户端,客户端B须要加锁,那么也是在"/Lock"门路下创立长期程序节点。仍然获取"/Lock"下的节点列表,判断本人的节点序号是否最小的。发现不是最小的,加锁失败,接着对本人的上一个节点进行监听。
怎么开释锁呢,其实就是把长期节点删除。假如客户端A开释锁,把节点01删除了。那就会触发节点02的监听事件,客户端就再次获取节点列表,而后判断本人是否是最小的序号,如果是最小序号则加锁。
如果多个客户端其实也是一样,一上来就会创立一个长期节点,而后开始判断本人是否是最小的序号,如果不是就监听上一个节点,造成一种排队的机制。也就造成了锁的成果,保障了多台服务器只有一台执行。
假如其中有一个客户端宕机了,依据长期节点的特点,ZooKeeper会主动删除对应的长期节点,相当于主动开释了锁。
3.2 手写代码实现分布式锁
首先退出Maven依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version></dependency><dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.4</version></dependency>
接着依照下面剖析的思路敲代码,创立ZkLock类:
public class ZkLock implements Lock { //计数器,用于加锁失败时,阻塞 private static CountDownLatch cdl = new CountDownLatch(1); //ZooKeeper服务器的IP端口 private static final String IP_PORT = "127.0.0.1:2181"; //锁的根门路 private static final String ROOT_NODE = "/Lock"; //上一个节点的门路 private volatile String beforePath; //以后上锁的节点门路 private volatile String currPath; //创立ZooKeeper客户端 private ZkClient zkClient = new ZkClient(IP_PORT); public ZkLock() { //判断是否存在根节点 if (!zkClient.exists(ROOT_NODE)) { //不存在则创立 zkClient.createPersistent(ROOT_NODE); } } //加锁 public void lock() { if (tryLock()) { System.out.println("加锁胜利!!"); } else { // 尝试加锁失败,进入期待 监听 waitForLock(); // 再次尝试加锁 lock(); } } //尝试加锁 public synchronized boolean tryLock() { // 第一次就进来创立本人的长期节点 if (StringUtils.isBlank(currPath)) { currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock"); } // 对节点排序 List<String> children = zkClient.getChildren(ROOT_NODE); Collections.sort(children); // 以后的是最小节点就返回加锁胜利 if (currPath.equals(ROOT_NODE + "/" + children.get(0))) { return true; } else { // 不是最小节点 就找到本人的前一个 顺次类推 开释也是一样 int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1; beforePath = ROOT_NODE + "/" + children.get(beforePathIndex); //返回加锁失败 return false; } } //解锁 public void unlock() { //删除节点并敞开客户端 zkClient.delete(currPath); zkClient.close(); } //期待上锁,加锁失败进入阻塞,监听上一个节点 private void waitForLock() { IZkDataListener listener = new IZkDataListener() { //监听节点更新事件 public void handleDataChange(String s, Object o) throws Exception { } //监听节点被删除事件 public void handleDataDeleted(String s) throws Exception { //解除阻塞 cdl.countDown(); } }; // 监听上一个节点 this.zkClient.subscribeDataChanges(beforePath, listener); //判断上一个节点是否存在 if (zkClient.exists(beforePath)) { //上一个节点存在 try { System.out.println("加锁失败 期待"); //加锁失败,阻塞期待 cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 开释监听 zkClient.unsubscribeDataChanges(beforePath, listener); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void lockInterruptibly() throws InterruptedException { } public Condition newCondition() { return null; }}
在Controller层加上锁:
@PostMapping("/purchase") public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception { boolean bool; //获取ZooKeeper分布式锁 ZkLock zkLock = new ZkLock(); try { //上锁 zkLock.lock(); //调用秒杀抢购的service办法 bool = commodityInfoService.purchaseCommodityInfo(commodityId, number); } catch (Exception e) { e.printStackTrace(); bool = false; } finally { //解锁 zkLock.unlock(); } return bool; }
测试,仍然起两台服务器,8080、8081。而后跑测试脚本:
public static void main(String[] args) throws Exception { //申请地址 String url = "http://localhost:%s/mall/commodity/purchase"; //申请参数,商品ID,数量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //创立10个线程通过HttpClient进行发送申请,测试 for (int i = 0; i < 10; i++) { //8080、8081交替申请 String port = "808" + (i % 2); CommodityThread commodityThread = new CommodityThread(String.format(url, port), map); commodityThread.start(); } }
后果正确:
3.3 造好的轮子
Curator是Apache开源的一个操作ZooKeeper的框架。其中就有实现ZooKeeper分布式锁的性能。
当然分布式锁的实现只是这个框架的其中一个很小的局部,除此之外还有很多用处,大家能够到官网去学习。
首先增加Maven依赖:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency>
还是一样在须要加锁的中央进行加锁:
@PostMapping("/purchase") public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception { boolean bool = false; //设置重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); // 启动客户端 client.start(); InterProcessMutex mutex = new InterProcessMutex(client, "/locks"); try { //加锁 if (mutex.acquire(3, TimeUnit.SECONDS)) { //调用抢购秒杀service办法 bool = commodityInfoService.purchaseCommodityInfo(commodityId, number); } } catch (Exception e) { e.printStackTrace(); } finally { //解锁 mutex.release(); client.close(); } return bool; }
四、遇到的坑
我尝试用原生的ZooKeeper写分布式锁,有点炸裂。遇到不少坑,最终放弃了,用zkclient的API。可能我太菜了不太会用。
上面我分享我遇到的一些问题,心愿你们在遇到同类型的异样时能迅速定位问题。
4.1 Session expired
这个谬误是应用原生ZooKeeper的API呈现的谬误。次要是我在进入debug模式进行调试呈现的。
因为原生的ZooKeeper须要设定一个会话超时工夫,个别debug模式咱们都会卡在一个中央去调试,必定就超出了设置的会话工夫~
4.2 KeeperErrorCode = ConnectionLoss
这个也是原生ZooKeeper的API的谬误,怎么呈现的呢?
次要是创立的ZooKeeper客户端连贯服务器时是异步的,因为连贯须要工夫,还没连贯胜利,代码曾经开始执行create()或者exists(),而后就报这个谬误。
解决办法:应用CountDownLatch计数器阻塞,连贯胜利后再进行阻塞,而后执行create()或者exists()等操作。
4.3 并发查问更新呈现数据不统一
这个谬误真的太炸裂了~
一开始我是把分布式锁加在service层,而后认为搞定了。接着启动8080、8081进行并发测试。10个线程都是购买胜利,后果竟然是不正确!
第一反馈感觉本人实现的代码有问题,于是换成curator框架实现的分布式锁,开源框架应该没问题了吧。没想到还是不行~
既然不是锁自身的问题,是不是事务问题。上一个事务更新库存的操作还没提交,而后下一个申请就进来查问。于是我就把加锁的范畴放大一点,放在Controller层。竟然胜利了!
你可能曾经留神到,我在下面的例子就是把分布式锁加在Controller层,其实我不太喜爱在Controller层写太多代码。
兴许有更加优雅的形式,惋惜自己能力有余,如果你有更好的实现形式,能够分享一下~
补充:上面评论有位大佬说,在原来的办法外再包裹一层,亲测是能够的。这应该是事务的问题。
下面放在Controller层能够胜利是不是因为Controller层没有事务,原来写在service我是写了一个@Transactional注解在类上,所以整个类外面的都有事务,所以解锁后还没提交事务去更新数据库,而后下一个申请进来就查到了没更新的数据。
为了优雅一点,就把@Transactional注解放在抢购的service办法上
而后再包裹一个没有事务的办法,用于上锁。
五、总结
最初,咱们回顾总结一下吧:
- 首先咱们模仿单机多线程的秒杀场景,单机的话能够应用本地锁解决问题。
- 接着模仿多服务器多线程的场景,思路是应用ZooKeeper实现分布式锁解决。
- 图解ZooKeeper实现分布式锁的原理。
- 而后入手写代码,实现分布式锁。
- 最初总结遇到的坑。
心愿这篇文章对你有用,感觉有用就点个赞吧~