共计 5334 个字符,预计需要花费 14 分钟才能阅读完成。
实现的要求
- 应用 lua 加锁和开释锁
- 加锁开释锁须要 id, 保障只能开释本人加的锁
- 可重入锁,一个线程取得锁后还能够再加锁
- 自旋锁或称为阻塞锁
环境:
redis 6.0
spring boot data redis 2.3.4
可重入锁的实现原理
个别都是间接应用 set key value nx px 实现分布式锁,这种形式无奈实现可重入锁。这里应用 redis hash 实现可重入锁,hash 的 field 的值记录重入次数。
Lock 锁实现
Lock 接口
package com.example.shop.service; | |
import java.util.concurrent.TimeUnit; | |
public interface Lock { | |
// 非阻塞的,立刻返回 | |
boolean tryLock(String uid); | |
// 有超时工夫 | |
boolean tryLock(String uid,long timeOut, TimeUnit timeUnit); | |
// 阻塞的 | |
void lock(String uid); | |
// 开释锁 | |
void unLock(String uid); | |
} |
RedisLock 接口
package com.example.shop.service; | |
public interface RedisLock { | |
// 获取 Lock 实例 | |
Lock getLock(String key); | |
} |
RedisLockImpl 实现类
package com.example.shop.service; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.data.redis.core.RedisTemplate; | |
import org.springframework.data.redis.core.script.DefaultRedisScript; | |
import org.springframework.stereotype.Component; | |
import java.io.Serializable; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
@Component | |
@Slf4j | |
public class RedisLockImpl implements RedisLock, Serializable { | |
private final RedisTemplate<String,String> redisTemplate; | |
public RedisLockImpl(RedisTemplate<String,String> redisTemplate){this.redisTemplate=redisTemplate;} | |
@Override | |
public Lock getLock(String key) {return new LockImpl(key); | |
} | |
class LockImpl implements Lock{private static final String lockStr = "local key = KEYS[1]\n" + | |
"local lockKey = ARGV[1]\n" + | |
"local lockCount = 1\n" + | |
"\n" + | |
"\n" + | |
"local val = redis.call('hget',key,lockKey)\n" + | |
"if val then\n" + | |
"redis.call('hincrby',key,lockKey,1)\n" + | |
"return 1\n"+ | |
"end\n" + | |
"if redis.call('exists',key)==0 then\n" + | |
"redis.call('hset',key,lockKey,1)\n"+ | |
"redis.call('expire',lockKey,5)\n" + | |
"return 1\n" + | |
"end\n" + | |
"return 0"; | |
private static final String unLockStr = "-- 开释锁 \n" + | |
"local key = KEYS[1]\n" + | |
"local lockKey = ARGV[1]\n" + | |
"local value = redis.call('hget',key,lockKey)\n" + | |
"if value then\n" + | |
"if tonumber(value)>1 then\n" + | |
"redis.call('hincrby',key,lockKey,-1)\n" + | |
"else\n" + | |
"redis.call('del',key)\n" + | |
"end\n" + | |
"end"; | |
//key 过期工夫 | |
private static final long keyTimeOut = 30000; | |
private static final long SLEEP_TIME = 50; | |
private final DefaultRedisScript<Long> lockScript= new DefaultRedisScript<>(lockStr,Long.class); | |
private final DefaultRedisScript<Long> unLockScript = new DefaultRedisScript<>(unLockStr,Long.class); | |
private String key; | |
public LockImpl(String key){this.key = key;} | |
// 尝试加锁,立刻返回 | |
@Override | |
public boolean tryLock(String uuid){Long lc = redisTemplate.execute(lockScript, List.of(this.key),uuid,String.valueOf(keyTimeOut)); | |
log.info(uuid+"试图加锁"); | |
return lc!=null&&lc==1; | |
} | |
// 自旋锁 | |
@Override | |
public boolean tryLock(String uuid,long timeOut, TimeUnit timeUnit){if (timeOut<0) throw new IllegalArgumentException("timeOut is illegal"); | |
final long allTime = timeUnit.toMillis(timeOut); | |
long start = System.currentTimeMillis(); | |
while (start+allTime>=System.currentTimeMillis()){if (this.tryLock(uuid)) return true; | |
try{Thread.sleep(SLEEP_TIME); | |
}catch (Exception e){e.printStackTrace(); | |
} | |
} | |
return false; | |
} | |
@Override | |
public void lock(String uuid){while (true){if (this.tryLock(uuid)) return; | |
try{Thread.sleep(SLEEP_TIME); | |
}catch (Exception e){e.printStackTrace(); | |
} | |
} | |
} | |
@Override | |
public void unLock(String uuid) {redisTemplate.execute(unLockScript,List.of(this.key),uuid); | |
log.info(uuid+"开释锁"); | |
} | |
} | |
} |
实现上应用 lua 脚本,保障原子性,锁的 key 就是 hset 的 key,加锁 id 为 hash 的一个 key,加锁的次数为这个 key 的值,重入锁把这个值每次加 1,开释锁每次减 1,直到值为 1 时删除这个键。
开释锁时须要判断这个锁是不是他加的,不能呈现开释他人加的锁。
lua 脚本调试
redis 3.2 开始反对应用 ldb 调试 lua 脚本
lock.lua
local key = KEYS[1] | |
local lockKey = ARGV[1] | |
local lockCount = 1 | |
local val = redis.call('hget',key,lockKey) | |
if val then | |
-- 可重入 | |
redis.call('hincrby',key,lockKey,1) | |
return 1 | |
end | |
if redis.call('exists',key)==0 then | |
redis.call('hset',key,lockKey,1) | |
redis.call('expire',lockKey,5) | |
return 1 | |
end | |
return 0; |
应用如下命令开始调试,逗号前为 key,前面为参数
redis-cli --ldb --eval lock.lua order , consumer-1
进入 debugger 模式后
s 单步执行
p 打印变量值
b 增加断点
c 执行到下一个断点
简略测试
RedisService.java
package com.example.shop.service; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Component; | |
@Component | |
@Slf4j | |
public class RedisService { | |
private final RedisLock redisLock; | |
private static int count = 10; | |
private Lock lock; | |
public RedisService(RedisLock redisLock) {this.lock = redisLock.getLock("order"); | |
} | |
public String buy(String uid) { | |
try {lock.lock(uid); | |
if (count > 0) { | |
count--; | |
log.info("{} 胜利购买到 iphone, 残余库存 {}", uid, count); | |
return uid + ":胜利购买到 iphone, 残余库存:" + count; | |
} else {return "库存有余";} | |
} finally {lock.unLock(uid); | |
log.info("unlock {}", uid); | |
} | |
} | |
} |
ShopApplication.java
package com.example.shop; | |
import com.example.shop.service.RedisService; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.web.bind.annotation.PostMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
@SpringBootApplication(scanBasePackages = {"com.example.shop"}) | |
@RestController | |
public class ShopApplication { | |
private final RedisService redisService; | |
public ShopApplication(RedisService redisService) {this.redisService = redisService;} | |
public static void main(String[] args) {SpringApplication.run(ShopApplication.class, args); | |
} | |
@PostMapping("/buy") | |
public String buy(String uid){return redisService.buy(uid); | |
} | |
} |
模仿抢购 10 个商品。
python 并发申请:
import requests as req | |
import threading | |
def start(name): | |
r = req.post('http://localhost:8080/buy',{'uid':name}) | |
print(r.text) | |
for i in range(15): | |
th = threading.Thread(target=start,args=["consumer-{}".format(i)]) | |
th.start() |
后果:
总结
redis 实现分布式锁形式很多,最简略的形式是应用 set key value nx px 实现,间接一步实现。最重要的就是要谁加的锁谁开释,锁阻塞不是最好的实现形式,耗费大量网络和 cpu 资源,可重入锁须要应用 hash 数据结构实现。须要设置过期实现,防止出现死锁,还有就是锁超时问题,业务逻辑还没执行完,锁就被开释了,导致数据不统一,可在这个根底上退出主动续期性能。
相熟 lua 脚本很重要,lua 在 redis 中是原子执行的。
分布式锁还能够应用 zookeeper,etcd 实现。要相熟分布式锁的利用场景,单体利用间接在办法或代码快加锁即可,服务集群,微服务,分布式系统须要利用分布式锁。
另外,还须要留神的就是 redis 主从复制对分布式锁带来的影响。