共计 8904 个字符,预计需要花费 23 分钟才能阅读完成。
思维导图
文章已收录到 Github 精选,欢送 Star:
https://github.com/yehongzhi/learningSummary
前言
通过《ZooKeeper 入门》后,咱们学会了 ZooKeeper 的根本用法。
实际上 ZooKeeper 的利用是十分宽泛的,实现分布式锁只是其中一种。接下来咱们就 ZooKeeper 实现分布式锁解决 秒杀超卖问题 进行开展。
一、什么是秒杀超卖问题
秒杀流动应该都不生疏,不必过多解释。
不难想象,在这种 ” 秒杀 ” 的场景中,实际上会呈现多个用户争抢 ” 资源 ” 的状况,也就是多个线程同时并发,这种状况是很容易呈现数据不精确,也就是超卖问题。
1.1 我的项目演示
上面应用程序演示,我应用了 SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC 搭建了一个简略的我的项目,github 地址:
https://github.com/yehongzhi/…
创立一个商品信息表:
CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
`number` int(10) DEFAULT '0' COMMENT '商品数量',
`description` varchar(2048) DEFAULT ''COMMENT' 商品形容 ',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
增加一个商品 [ 叉烧包 ] 进去:
外围的代码逻辑是这样的:
@Override
public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//1. 先查询数据库中商品的数量
TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
//2. 判断商品数量是否大于 0,或者购买的数量大于库存
Integer count = commodityInfo.getNumber();
if (count <= 0 || number > count) {
// 商品数量小于或者等于 0,或者购买的数量大于库存,则返回 false
return false;
}
//3. 如果库存数量大于 0,并且购买的数量小于或者等于库存。则更新商品数量
count -= number;
commodityInfo.setNumber(count);
boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
if (bool) {
// 如果更新胜利,则打印购买商品胜利
System.out.println("购买商品 [" + commodityInfo.getCommodityName() + "] 胜利, 数量为:" + number);
}
return bool;
}
逻辑示意图如下:
下面这个逻辑,如果单线程申请的话是没有问题的。
然而多线程的话就呈现问题了。当初我就创立多个线程,通过 HttpClient 进行申请,看会产生什么:
public static void main(String[] args) throws Exception {
// 申请地址
String url = "http://localhost:8080/mall/commodity/purchase";
// 申请参数,商品 ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
// 创立 10 个线程通过 HttpClient 进行发送申请,测试
for (int i = 0; i < 10; i++) {
// 这个线程的逻辑仅仅是发送申请
CommodityThread commodityThread = new CommodityThread(url, map);
commodityThread.start();}
}
阐明一下,叉烧包的数量是 100,这里有 10 个线程同时去购买,假如都购买胜利的话,库存数量应该是 90。
实际上,10 个线程确实都购买胜利了:
然而数据库的商品库存,却不精确:
二、尝试应用本地锁
下面的场景,大略流程如下所示:
能够看出问题的 关键在于两个线程 ” 同时 ” 去查问残余的库存,而后更新库存导致的 。要解决这个问题,其实 只有保障多个线程在这段逻辑是程序执行即可,也就是加锁。
本地锁 JDK 提供有两种:synchronized 和 Lock 锁。
两种形式都能够,我这里为了简便,应用 synchronized:
// 应用 synchronized 润饰办法
@Override
public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {// 省略...}
而后再测试刚刚多线程并发抢购的状况,看看后果:
问题失去解决!!!
你认为事件就这样完结了吗,看了看进度条,发现事件并不简略。
咱们晓得在理论我的项目中,往往不会只部署一台服务器,所以无妨咱们启动两台服务器,端口号别离是 8080、8081,模仿理论我的项目的场景:
写一个交替申请的测试脚本,模仿多台服务器别离解决申请,用户秒杀抢购的场景:
public static void main(String[] args) throws Exception {
// 申请地址
String url = "http://localhost:%s/mall/commodity/purchase";
// 申请参数,商品 ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
// 创立 10 个线程通过 HttpClient 进行发送申请,测试
for (int i = 0; i < 10; i++) {
//8080、8081 交替申请,每个服务器解决 5 个申请
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();}
}
首先看购买的状况,必定都是购买胜利的:
要害是库存数量是否正确:
有 10 个申请购买胜利,库存应该是 90 才对,这里库存是 95。事实证明 本地锁是不能解决多台服务器秒杀抢购呈现超卖的问题。
为什么会这样呢,请看示意图:
其实和多线程问题是差不多的起因,多个服务器去查询数据库,获取到雷同的库存,而后更新库存,导致数据不正确 。要保障库存的数量正确, 关键在于多台服务器要保障只能一台服务器在执行这段逻辑,也就是要加分布式锁。
这也体现出分布式锁的作用,就是要保障多台服务器只能有一台服务器执行。
分布式锁有三种实现形式,别离是 redis、ZooKeeper、数据库(比方 mysql)。
三、应用 ZooKeeper 实现分布式锁
3.1 原理
实际上是利用 ZooKeeper 的长期程序节点的个性实现分布式锁。怎么实现呢?
假如当初有一个客户端 A,须要加锁,那么就在 ”/Lock” 门路下创立一个长期程序节点。而后获取 ”/Lock” 下的节点列表,判断本人的序号是否是最小的,如果是最小的序号,则加锁胜利!
当初又有另一个客户端,客户端 B 须要加锁,那么也是在 ”/Lock” 门路下创立长期程序节点。仍然获取 ”/Lock” 下的节点列表,判断本人的节点序号是否最小的。发现不是最小的,加锁失败,接着对本人的上一个节点进行监听。
怎么开释锁呢,其实就是把长期节点删除。假如客户端 A 开释锁,把节点 01 删除了。那就会触发节点 02 的监听事件,客户端就再次获取节点列表,而后判断本人是否是最小的序号,如果是最小序号则加锁。
如果多个客户端其实也是一样,一上来就会创立一个长期节点,而后开始判断本人是否是最小的序号,如果不是就监听上一个节点,造成一种排队的机制。也就造成了锁的成果,保障了多台服务器只有一台执行。
假如其中有一个客户端宕机了,依据长期节点的特点,ZooKeeper 会主动删除对应的长期节点,相当于主动开释了锁。
3.2 手写代码实现分布式锁
首先退出 Maven 依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
接着依照下面剖析的思路敲代码,创立 ZkLock 类:
public class ZkLock implements Lock {
// 计数器,用于加锁失败时,阻塞
private static CountDownLatch cdl = new CountDownLatch(1);
//ZooKeeper 服务器的 IP 端口
private static final String IP_PORT = "127.0.0.1:2181";
// 锁的根门路
private static final String ROOT_NODE = "/Lock";
// 上一个节点的门路
private volatile String beforePath;
// 以后上锁的节点门路
private volatile String currPath;
// 创立 ZooKeeper 客户端
private ZkClient zkClient = new ZkClient(IP_PORT);
public ZkLock() {
// 判断是否存在根节点
if (!zkClient.exists(ROOT_NODE)) {
// 不存在则创立
zkClient.createPersistent(ROOT_NODE);
}
}
// 加锁
public void lock() {if (tryLock()) {System.out.println("加锁胜利!!");
} else {
// 尝试加锁失败,进入期待 监听
waitForLock();
// 再次尝试加锁
lock();}
}
// 尝试加锁
public synchronized boolean tryLock() {
// 第一次就进来创立本人的长期节点
if (StringUtils.isBlank(currPath)) {currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
}
// 对节点排序
List<String> children = zkClient.getChildren(ROOT_NODE);
Collections.sort(children);
// 以后的是最小节点就返回加锁胜利
if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {return true;} else {
// 不是最小节点 就找到本人的前一个 顺次类推 开释也是一样
int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
// 返回加锁失败
return false;
}
}
// 解锁
public void unlock() {
// 删除节点并敞开客户端
zkClient.delete(currPath);
zkClient.close();}
// 期待上锁,加锁失败进入阻塞,监听上一个节点
private void waitForLock() {IZkDataListener listener = new IZkDataListener() {
// 监听节点更新事件
public void handleDataChange(String s, Object o) throws Exception { }
// 监听节点被删除事件
public void handleDataDeleted(String s) throws Exception {
// 解除阻塞
cdl.countDown();}
};
// 监听上一个节点
this.zkClient.subscribeDataChanges(beforePath, listener);
// 判断上一个节点是否存在
if (zkClient.exists(beforePath)) {
// 上一个节点存在
try {System.out.println("加锁失败 期待");
// 加锁失败,阻塞期待
cdl.await();} catch (InterruptedException e) {e.printStackTrace();
}
}
// 开释监听
zkClient.unsubscribeDataChanges(beforePath, listener);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}
public void lockInterruptibly() throws InterruptedException {}
public Condition newCondition() {return null;}
}
在 Controller 层加上锁:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool;
// 获取 ZooKeeper 分布式锁
ZkLock zkLock = new ZkLock();
try {
// 上锁
zkLock.lock();
// 调用秒杀抢购的 service 办法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
} catch (Exception e) {e.printStackTrace();
bool = false;
} finally {
// 解锁
zkLock.unlock();}
return bool;
}
测试,仍然起两台服务器,8080、8081。而后跑测试脚本:
public static void main(String[] args) throws Exception {
// 申请地址
String url = "http://localhost:%s/mall/commodity/purchase";
// 申请参数,商品 ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
// 创立 10 个线程通过 HttpClient 进行发送申请,测试
for (int i = 0; i < 10; i++) {
//8080、8081 交替申请
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();}
}
后果正确:
3.3 造好的轮子
Curator 是 Apache 开源的一个操作 ZooKeeper 的框架。其中就有实现 ZooKeeper 分布式锁的性能。
当然分布式锁的实现只是这个框架的其中一个很小的局部,除此之外还有很多用处,大家能够到官网去学习。
首先增加 Maven 依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
还是一样在须要加锁的中央进行加锁:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
@RequestParam(name = "number") Integer number) throws Exception {
boolean bool = false;
// 设置重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 启动客户端
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
// 加锁
if (mutex.acquire(3, TimeUnit.SECONDS)) {
// 调用抢购秒杀 service 办法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
}
} catch (Exception e) {e.printStackTrace();
} finally {
// 解锁
mutex.release();
client.close();}
return bool;
}
四、遇到的坑
我尝试用原生的 ZooKeeper 写分布式锁,有点炸裂。遇到不少坑,最终放弃了,用 zkclient 的 API。可能我太菜了不太会用。
上面我分享我遇到的一些问题,心愿你们在遇到同类型的异样时能迅速定位问题。
4.1 Session expired
这个谬误是应用原生 ZooKeeper 的 API 呈现的谬误。次要是我在进入 debug 模式进行调试呈现的。
因为原生的 ZooKeeper 须要设定一个会话超时工夫,个别 debug 模式咱们都会卡在一个中央去调试,必定就超出了设置的会话工夫~
4.2 KeeperErrorCode = ConnectionLoss
这个也是原生 ZooKeeper 的 API 的谬误,怎么呈现的呢?
次要是创立的 ZooKeeper 客户端连贯服务器时是异步的,因为连贯须要工夫,还没连贯胜利,代码曾经开始执行 create()或者 exists(),而后就报这个谬误。
解决办法:应用 CountDownLatch 计数器阻塞,连贯胜利后再进行阻塞,而后执行 create()或者 exists()等操作。
4.3 并发查问更新呈现数据不统一
这个谬误真的太炸裂了~
一开始我是把分布式锁加在 service 层,而后认为搞定了。接着启动 8080、8081 进行并发测试。10 个线程都是购买胜利,后果竟然是不正确!
第一反馈感觉本人实现的代码有问题,于是换成 curator 框架实现的分布式锁,开源框架应该没问题了吧。没想到还是不行~
既然不是锁自身的问题,是不是事务问题。上一个事务更新库存的操作还没提交,而后下一个申请就进来查问。于是我就把加锁的范畴放大一点,放在 Controller 层。竟然胜利了!
你可能曾经留神到,我在下面的例子就是把分布式锁加在 Controller 层,其实我不太喜爱在 Controller 层写太多代码。
兴许有更加优雅的形式,惋惜自己能力有余,如果你有更好的实现形式,能够分享一下~
补充:上面评论有位大佬说,在原来的办法外再包裹一层,亲测是能够的。这应该是事务的问题。
下面放在 Controller 层能够胜利是不是因为 Controller 层没有事务,原来写在 service 我是写了一个 @Transactional 注解在类上,所以整个类外面的都有事务,所以解锁后还没提交事务去更新数据库,而后下一个申请进来就查到了没更新的数据。
为了优雅一点,就把 @Transactional 注解放在抢购的 service 办法上
而后再包裹一个没有事务的办法,用于上锁。
五、总结
最初,咱们回顾总结一下吧:
- 首先咱们模仿单机多线程的秒杀场景,单机的话能够应用本地锁解决问题。
- 接着模仿多服务器多线程的场景,思路是应用 ZooKeeper 实现分布式锁解决。
- 图解 ZooKeeper 实现分布式锁的原理。
- 而后入手写代码,实现分布式锁。
- 最初总结遇到的坑。
心愿这篇文章对你有用,感觉有用就点个赞吧~