共计 4385 个字符,预计需要花费 11 分钟才能阅读完成。
前言
对于分布式锁的问题我也查过很多材料,感觉很多形式实现的并不欠缺,或者看着云里雾里的,不知所以然,于是就整顿了这篇文章,心愿对您有用,有写的不对的中央,欢送留言斧正。
首先咱们来聊聊什么是分布式锁,到底解决了什么问题?间接看代码
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
很简略的一个场景,用户下单,咱们查问商品库存够不够,不够的话间接返回库存有余相似的错误信息,如果库存够的话间接在数据库中库存 -1,而后返回胜利,在业务逻辑上这段代码是没有什么问题的。
然而,这段代码是存在重大的问题的。
如果库存只剩 1,并且在并发比拟高的状况下,比方两个申请同时执行了这段代码,同时查到库存为 1,而后顺利成章的都去数据库执行 stock-1 的操作,这样库存就会变成 -1,而后就会引发超卖的景象,方才说的是两个申请同时执行,如果同时几千个申请打过去,可见造成的损失是十分大的。于是呢有些聪明人就想了个方法,方法如下。
大家都晓得 redis 有个 setnx 命令,不晓得的话也没关系,我曾经帮你查过了
咱们把下面的代码优化一下
version-1
$lock_key="lock_key";
$res = $redis->setNx($lock_key, 1);
if (!$res){return "error_code";}
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
$redis->delete($lock_key);
- 第一次申请进来会去 setNx,当然后果是返回 true,因为 lock_key 不存在,而后上面业务逻辑失常进行,工作执行完了之后把 lock_key 删除掉,这样下一次申请进来反复上述逻辑
- 第二次申请进来同样会去执行 setNx,后果返回 false,因为 lock_key 曾经存在,而后间接返回错误信息 (你双 11 抢购秒杀产品的时候给你返回的零碎忙碌就是这么来的),不执行库存减 1 的操作
- 有的同学可能有纳闷,咱们不是说高并发的状况下么?要是两个申请同时 setNx 的话获取的后果不都是 true 了,同样会同时去执行业务逻辑,问题不是一样没解决么?然而大家要明确 redis 是单线程的,具备原子性,不同的申请执行 setnx 是程序执行的,所以这个是不必放心的。
看似问题解决了,其实并不然。
咱们这里伪代码写的简略,查问一下库存,而后减 1 操作而已,然而实在的生产环境中的状况是非常复杂的,在一些极其状况下,程序很可能会报错,解体,如果第一次执行加锁了之后程序报错了,那这个锁永远存在,接下来的申请永远也申请不进来了,所以咱们持续优化
version-2
try{ // 新退出 try catch 解决,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;// 新退出过期工夫,这样锁不会始终占有
$res = $redis->setNx($lock_key, 1, $expire_time);
if (!$res){return "error_code";}
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
}finally {$redis->delete($lock_key);
}
- 在 setnx 的时候给加上过期工夫,这样至多不会让锁始终存在成为死锁
- 做 try catch 解决,万一程序抛出异样把锁删掉,也是为了解决死锁问题
这次是把死锁问题解决了,然而问题还是存在,大家能够先想一想还存在什么问题再接着往下看。
存在的问题如下
- 咱们的过期工夫是 5 秒钟,万一这个申请执行了 6 秒钟怎么办?超出的那一秒,跟没有加锁有什么区别?其实不仅仅如此,还有一个更重大的问题存在。比方第二个申请也是执行 6 秒,那么在第二个申请在超出的那 1 秒才进来的时候,第一个申请执行完了,当然会删除第二个申请加的锁,如果始终并发都很大的话,锁跟没有加没什么区别。
- 针对上述问题,最间接的方法是加长过期工夫,然而这个不是解决问题的最终方法。把工夫设置过长也会产生新的问题,比方各种起因机器解体了,须要重启,而后你把锁设置的工夫是 1 年,同时也没有 delete 掉,难道机器重启了再等一年?另外这样设置固定值的解决方案在计算机当中是不容许的,已经的“千年虫”问题就是相似的起因导致的
- 在加超时工夫的时候肯定要留神肯定是一次性加上,保障其原子性,不要先 setnx 之后,再设置 expire_time,这样的话万一在 setnx 之后那一个霎时零碎挂了,这个锁仍然会成为一个永恒的死锁
- 其实上述问题的次要起因在于,申请 1 会删掉申请 2 的锁,所以说锁须要保障唯一性。
咱们接着优化
version-3
try{ // 新退出 try catch 解决,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;// 新退出过期工夫,这样锁不会始终占有
$client_id = session_create_id(); // 对每个申请生成唯一性的 id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if (!$res){return "error_code";}
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
}finally {if ($redis->get($lock_key) == $client_id){ // 在这里加一个判断,保障每次删除的锁是当次申请加的锁,这样防止误删了别的申请加的锁
$redis->delete($lock_key);
}
}
- 咱们在每个申请生成了惟一 client_id,并且把该值写入了 lock_key 中
- 在最初删除锁的时候会先判断这个 lock_key 是否是该申请生成的,如果不是的话则不会删除
然而下面计划还有问题,咱们看最初 redis 是先进行了 get 操作判断,而后再删除,是两步操作,并没有保障其原子性,redis 的多步操作能够用 lua 脚本来保障原子性,其实看到 lua 也不须要感觉太生疏,他就是一种语言而已,在这里的作用是把多个 redis 操作打包成一个命令去执行,保障了原子性而已
version-4
try{ // 新退出 try catch 解决,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;// 新退出过期工夫,这样锁不会始终占有
$client_id = session_create_id(); // 对每个申请生成唯一性的 id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if (!$res){return "error_code";}
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
}finally {
$script = ' // 此处用 lua 脚本执行是为了 get 比照之后再 delete 的两步操作的原子性
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
';
return $instance->eval($script, [$lock_key, $client_id], 1);
}
这样封装之后,分布式锁应该就比较完善了。当然咱们还能够进一步的优化一下用户体验
- 当初比方一个申请进来之后,如果申请被锁住,会立刻返回给用户申请失败,请从新尝试,咱们能够适当的缩短一点这个工夫,不要立刻返回给用户申请失败,这样体验会更好
- 具体形式为用户申请进来如果遇到了锁,能够适当的期待一些工夫之后重试,重试的时候如果锁开释了,则这次申请就能够胜利
version-5
$retry_times = 3; // 重试次数
$usleep_times = 5000;// 重试间隔时间
try{ // 新退出 try catch 解决,这样程序万一报错会把锁删除掉
$lock_key="lock_key";
$expire_time = 5;// 新退出过期工夫,这样锁不会始终占有
while($retry_times > 0){$client_id = session_create_id(); // 对每个申请生成唯一性的 id
$res = $redis->setNx($lock_key, $client_id, $expire_time);
if ($res){break;}
echo "尝试从新获取锁";
$retry_times--;
usleep($usleep_times);
}
if (!$res){ // 重试三次之后都没有获取到锁则给用户返回错误信息
return "error_code";
}
$stock = $this->getStockFromDb();// 查问残余库存
if ($stock>0){$this->ReduceStockInDb(); // 在数据库中进行减库存操作
echo "successful";
}else{echo "库存有余";}
}finally {
$script = ' // 此处用 lua 脚本执行是为了 get 比照之后再 delete 的两步操作的原子性
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
';
return $instance->eval($script, [$lock_key, $client_id], 1);
}
当然下面的分布式锁还是不够欠缺的,比方 redis 主从同步提早,就会产生问题,像 java 中 redission 实现的思维是十分好的,大家感兴趣能够看看源码,明天就聊到这里,感兴趣的敌人能够留言大家一起探讨