实现的要求
- 应用lua加锁和开释锁
- 加锁开释锁须要id,保障只能开释本人加的锁
- 可重入锁,一个线程取得锁后还能够再加锁
- 自旋锁或称为阻塞锁
环境:
redis 6.0
spring boot data redis 2.3.4
可重入锁的实现原理
个别都是间接应用set key value nx px实现分布式锁,这种形式无奈实现可重入锁。这里应用redis hash实现可重入锁,hash的field的值记录重入次数。
Lock锁实现
Lock接口
package com.example.shop.service;
import java.util.concurrent.TimeUnit;
public interface Lock {
//非阻塞的,立刻返回
boolean tryLock(String uid);
//有超时工夫
boolean tryLock(String uid,long timeOut, TimeUnit timeUnit);
//阻塞的
void lock(String uid);
//开释锁
void unLock(String uid);
}
RedisLock接口
package com.example.shop.service;
public interface RedisLock {
//获取Lock实例
Lock getLock(String key);
}
RedisLockImpl实现类
package com.example.shop.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedisLockImpl implements RedisLock, Serializable {
private final RedisTemplate<String,String> redisTemplate;
public RedisLockImpl(RedisTemplate<String,String> redisTemplate){
this.redisTemplate=redisTemplate;
}
@Override
public Lock getLock(String key) {
return new LockImpl(key);
}
class LockImpl implements Lock{
private static final String lockStr = "local key = KEYS[1]\n" +
"local lockKey = ARGV[1]\n" +
"local lockCount = 1\n" +
"\n" +
"\n" +
"local val = redis.call('hget',key,lockKey)\n" +
"if val then\n" +
" redis.call('hincrby',key,lockKey,1)\n" +
" return 1\n"+
"end\n" +
"if redis.call('exists',key)==0 then\n" +
" redis.call('hset',key,lockKey,1)\n"+
" redis.call('expire',lockKey,5)\n" +
" return 1\n" +
"end\n" +
"return 0";
private static final String unLockStr = "--开释锁\n" +
"local key = KEYS[1]\n" +
"local lockKey = ARGV[1]\n" +
"local value = redis.call('hget',key,lockKey)\n" +
"if value then\n" +
" if tonumber(value)>1 then\n" +
" redis.call('hincrby',key,lockKey,-1)\n" +
" else\n" +
" redis.call('del',key)\n" +
" end\n" +
"end";
//key过期工夫
private static final long keyTimeOut = 30000;
private static final long SLEEP_TIME = 50;
private final DefaultRedisScript<Long> lockScript= new DefaultRedisScript<>(lockStr,Long.class);
private final DefaultRedisScript<Long> unLockScript = new DefaultRedisScript<>(unLockStr,Long.class);
private String key;
public LockImpl(String key){
this.key = key;
}
//尝试加锁,立刻返回
@Override
public boolean tryLock(String uuid){
Long lc = redisTemplate.execute(lockScript, List.of(this.key),uuid,String.valueOf(keyTimeOut));
log.info(uuid+" 试图加锁");
return lc!=null&&lc==1;
}
//自旋锁
@Override
public boolean tryLock(String uuid,long timeOut, TimeUnit timeUnit){
if (timeOut<0) throw new IllegalArgumentException("timeOut is illegal");
final long allTime = timeUnit.toMillis(timeOut);
long start = System.currentTimeMillis();
while (start+allTime>=System.currentTimeMillis()){
if (this.tryLock(uuid)) return true;
try{
Thread.sleep(SLEEP_TIME);
}catch (Exception e){
e.printStackTrace();
}
}
return false;
}
@Override
public void lock(String uuid){
while (true){
if (this.tryLock(uuid)) return;
try{
Thread.sleep(SLEEP_TIME);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public void unLock(String uuid) {
redisTemplate.execute(unLockScript,List.of(this.key),uuid);
log.info(uuid+" 开释锁");
}
}
}
实现上应用lua脚本,保障原子性,锁的key就是hset的key,加锁id为hash的一个key,加锁的次数为这个key的值,重入锁把这个值每次加1,开释锁每次减1,直到值为1时删除这个键。
开释锁时须要判断这个锁是不是他加的,不能呈现开释他人加的锁。
lua脚本调试
redis 3.2开始反对应用ldb调试lua脚本
lock.lua
local key = KEYS[1]
local lockKey = ARGV[1]
local lockCount = 1
local val = redis.call('hget',key,lockKey)
if val then
--可重入
redis.call('hincrby',key,lockKey,1)
return 1
end
if redis.call('exists',key)==0 then
redis.call('hset',key,lockKey,1)
redis.call('expire',lockKey,5)
return 1
end
return 0;
应用如下命令开始调试,逗号前为key,前面为参数
redis-cli --ldb --eval lock.lua order , consumer-1
进入debugger模式后
s 单步执行
p 打印变量值
b 增加断点
c 执行到下一个断点
简略测试
RedisService.java
package com.example.shop.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RedisService {
private final RedisLock redisLock;
private static int count = 10;
private Lock lock;
public RedisService(RedisLock redisLock) {
this.lock = redisLock.getLock("order");
}
public String buy(String uid) {
try {
lock.lock(uid);
if (count > 0) {
count--;
log.info("{}胜利购买到iphone,残余库存{}", uid, count);
return uid + ":胜利购买到iphone,残余库存:" + count;
} else {
return "库存有余";
}
} finally {
lock.unLock(uid);
log.info("unlock {}", uid);
}
}
}
ShopApplication.java
package com.example.shop;
import com.example.shop.service.RedisService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication(scanBasePackages = {"com.example.shop"})
@RestController
public class ShopApplication {
private final RedisService redisService;
public ShopApplication(RedisService redisService) {
this.redisService = redisService;
}
public static void main(String[] args) {
SpringApplication.run(ShopApplication.class, args);
}
@PostMapping("/buy")
public String buy(String uid){
return redisService.buy(uid);
}
}
模仿抢购10个商品。
python并发申请:
import requests as req
import threading
def start(name):
r = req.post('http://localhost:8080/buy',{'uid':name})
print(r.text)
for i in range(15):
th = threading.Thread(target=start,args=["consumer-{}".format(i)])
th.start()
后果:
总结
redis实现分布式锁形式很多,最简略的形式是应用set key value nx px实现,间接一步实现。最重要的就是要谁加的锁谁开释,锁阻塞不是最好的实现形式,耗费大量网络和cpu资源,可重入锁须要应用hash数据结构实现。须要设置过期实现,防止出现死锁,还有就是锁超时问题,业务逻辑还没执行完,锁就被开释了,导致数据不统一,可在这个根底上退出主动续期性能。
相熟lua脚本很重要,lua在redis中是原子执行的。
分布式锁还能够应用zookeeper,etcd实现。要相熟分布式锁的利用场景,单体利用间接在办法或代码快加锁即可,服务集群,微服务,分布式系统须要利用分布式锁。
另外,还须要留神的就是redis主从复制对分布式锁带来的影响。
发表回复