共计 11572 个字符,预计需要花费 29 分钟才能阅读完成。
1. 锁的品种
2. 一个健壮性高的分布式锁应该具备的特质
3. 单个 redis 分布式锁的演变
4. 多 redis 分布式锁
5. 总结
1. 锁的品种
咱们在日常的开发流动中,个别把锁分为两类:
1)同一个 JVM 里的锁 ,比方 synchronized 和 Lock,ReentrantLock 等等
2) 跨 JVM 的分布式锁,因为服务是集群部署的,单机版的锁不再起作用,资源在不同的服务器之间共享。
2. 一个健壮性高的分布式锁应该具备的特质
1) 独占性 任何时刻只能有 一个线程 持有锁
2) 高可用 在 redis 集群环境下, 不能因为某个节点挂了而呈现锁生效的状况
3) 防死锁 不能有死锁状况,要有 超时管制的性能
4) 不乱抢 不能 unlock 他人的锁, 本人的锁只能本人开释
5) 重入性 同一个节点的同一个线程取得锁之后, 能够再次取得这个锁
3. 单个 redis 分布式锁的演变
版本 1:单机版的锁
咱们先来看这样无锁的代码:
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods()
{String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
}
}
以上的程序在进行商品销售的时候,并没有加锁,在并发下会造成超卖的景象。
应用 jMeter 进行压力测试:
因为在单机版的状况下,咱们能够应用 synchronize 或者 lock 来进行解决,上代码:
@GetMapping("/buy_goods")
public String buy_Goods() {synchronized (this) {String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort;
} else {System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort;
}
}
运行后果:
然而这只是对单机版的程序无效,咱们启动两个微服务,再用 nginx 配置一下负载平衡,照样会产生超卖的景象:
服务器 1:
服务器 2:
看,第 189 件库存被卖了两次。
问题:分布式部署后,单机锁还是呈现超卖景象,这个时候就须要分布式锁!
版本 2:redis 的分布式锁:
咱们应用redis 来进行加锁,避免超卖景象
@GetMapping("/buy_goods/v2")
public String buy_GoodsV2() {
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
// 应用 redis 进行加锁
Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(!flagLock)
{return "争夺锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
stringRedisTemplate.delete(key);
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
}
此时,咱们运行一下代码:
服务 1:
服务 2:
问题:为什么卖了一件就卖不动了呢?因为咱们没有在卖完之后,没有进行对分布式锁的 key 进行解锁操作。
版本 3:在 finally 中,解除该锁
@GetMapping("/buy_goods/v3")
public String buy_GoodsV3() {
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(!flagLock)
{return "抢锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
} finally {stringRedisTemplate.delete(key);
}
}
运行后果;
服务 1:
服务 2:
问题:如果 服务器宕机 了,代码层面基本没有走到 finally 这一块,就没有方法保障解锁,这个 key 没有被删除 , 咱们须要给 key 减少一个过期工夫!
版本 4:给 key 减少过期工夫
@GetMapping("/buy_goods/v4")
public String buy_GoodsV4() {
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
// 减少过期工夫
stringRedisTemplate.expire(key,10L, TimeUnit.SECONDS);
if(!flagLock)
{return "抢锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
} finally {stringRedisTemplate.delete(key);
}
}
问题:设置 key 和设置过期工夫不是原子性的,可能在这个期间,服务器宕机也是可能的。
版本 5:将设置 key 和设置 key 的过期工夫合并成一行,作为一个原子性操作
@GetMapping("/buy_goods/v5")
public String buy_GoodsV5() {
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
try {
// 设置 key 和 key 的过期工夫合并为一行,是原子操作,底层为 setnx 命令
Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
if(!flagLock)
{return "抢锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
} finally {stringRedisTemplate.delete(key);
}
}
问题:delete key 的时候,可能咱们这个锁曾经过期了,删的是下一个线程的锁。
版本 6:删除 key 的时候,只能删除本人的,不能删除他人的,加一层判断
@GetMapping("/buy_goods/v6")
public String buy_GoodsV6(){
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
if(!flagLock)
{return "抢锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
} finally {
// 删除 key 的时候做一个判断
if (stringRedisTemplate.opsForValue().get(key).equals(value)) {stringRedisTemplate.delete(key);
}
}
}
问题:finally 块的判断 +del 删除操作不是原子性的, 可能判断完之后,锁就过期了,又删除了他人的锁。
版本 7:用 Lua 脚本,将保障判断和删除锁的原子性
@GetMapping("/buy_goods/v7")
public String buy_GoodsV7() throws Exception {
String key = "redisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
if(!flagLock)
{return "抢锁失败,o(╥﹏╥)o";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
} finally {Jedis jedis = RedisUtils.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1]" +
"then" +
"return redis.call('del', KEYS[1])" +
"else" +
"return 0" +
"end";
try {Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(value));
if ("1".equals(result.toString())) {System.out.println("------del REDIS_LOCK_KEY success");
}else{System.out.println("------del REDIS_LOCK_KEY error");
}
} finally {if(null != jedis) {jedis.close();
}
}
}
}
到这里,咱们根本的一个 redis 锁就造成了,个别公司写到这里差不太多了。
问题:此时咱们要确保,业务逻辑的运行工夫,要比咱们加锁的 key 过期工夫要短 , 如果业务逻辑运行工夫比咱们们的锁过期工夫更长,又会呈现锁隐没景象。
版本 8:应用 redission,不仅能解决后面所有问题,redission 自带的 watchDog,可能 定时刷新锁的过期工夫。
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods/v8")
public String buy_GoodsV8()
{
String key = "redisLock";
RLock redissonLock = redisson.getLock(key);
redissonLock.lock();
try
{String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0)
{
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
}else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
}
return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
}finally {if(redissonLock.isLocked() && redissonLock.isHeldByCurrentThread())
{redissonLock.unlock();
}
}
}
锁刷新要害逻辑:
![image.png](/img/bVcXV6s)
以上咱们便实现了单机版 redis 锁的编写。
问题:当初咱们都是用主从构造的 redis,当主节点的数据还没来得及同步到从节点,redis 主节点宕机了,仍然会造成锁失落。
4. 多 redis 分布式锁
咱们先将下面形容的问题再反复一次:
当用户调用 redis 的主节点,而且加锁胜利的时候,主节点还没来得及同步数据到从节点,主节点就挂了,导致锁失落,前面的线程就又开始加锁,就会造成脏数据。
解决方案:Redlock 算法!
锁由多个 redis(都是主节点)一起保护,如果有了其中一个 redis 产生故障,还有其它 redis 能够兜底,锁依然是存在的。RedLock 算法是实现高牢靠分布式锁的一种无效的解决方案,能够在理论开发中应用。
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
/**
* @param lettuceConnectionFactory
* @return redis 序列化的工具配置类,上面这个请肯定开启配置
* 127.0.0.1:6379> keys *
* 1) "ord:102" 序列化过
* 2) "\xac\xed\x00\x05t\x00\aord:102" 家养,没有序列化过
*/
@Bean
public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
// 设置 key 序列化形式 string
redisTemplate.setKeySerializer(new StringRedisSerializer());
// 设置 value 的序列化形式 json
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public Redisson redisson()
{Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.140:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
@Bean
public Redisson redissonClient1()
{Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.140:6380").setDatabase(0);
return (Redisson) Redisson.create(config);
}
@Bean
public Redisson redissonClient2()
{Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.140:6381").setDatabase(0);
return (Redisson) Redisson.create(config);
}
@Bean
public Redisson redissonClient3()
{Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.111.140:6382").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
public class RedLockController {
public static final String CACHE_KEY_REDLOCK = "REDLOCK";
@Autowired
RedissonClient redissonClient1;
@Autowired
RedissonClient redissonClient2;
@Autowired
RedissonClient redissonClient3;
@GetMapping(value = "/redlock")
public void getlock() {
//CACHE_KEY_REDLOCK 为 redis 分布式锁的 key
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
// 三个锁汇聚成 redLock
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
//waitTime 锁的等待时间解决, 失常状况下 等 5s
//leaseTime 就是 redis key 的过期工夫, 失常状况下等 5 分钟。isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS);
if (isLock) {
//TODO if get lock success, do something;
// 暂停 20 秒钟线程
try {TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {e.printStackTrace();
}
}
} catch (Exception e) {e.printStackTrace();
} finally {
// 无论如何, 最初都要解锁
redLock.unlock();
System.out.println(Thread.currentThread().getName() + "\t" + "redLock.unlock()");
}
}
}
5. 总结
这次咱们讲了分布式锁的演变
无锁 ->synchronized 单机锁 -> 单机 redis 分布式锁 -> 多机 redis 分布式锁。