1. 前言
之前写过一篇《Redis分布式锁的实现》的文章,次要介绍的Redis分布式锁的原始性实现,外围是基于setnx
来加锁,以及应用lua
保障事务的原子性等。但毕竟比拟原始,须要依据不同的利用场景做不同的代码实现,也容易考虑不周。过后文章中就有提到 Redisson
框架,刚好最近工作中又用的比拟多,这次就着重介绍。
Redisson 是架设在 Redis根底上的一个Java开发框架,底层基于 Netty框架,为使用者提供了一系列具备分布式个性的常用工具类。Redisson的性能十分丰盛,具体可参考 github中文wiki,但本文只介绍 Redisson分布式锁的性能。
2. 一般可重入锁
2.1. 应用示例
在SpringBoot我的项目通过Redisson来加锁非常容易,不须要像之前文章中一样写一大堆代码,框架屏蔽掉了很多细节。如下例:
Config config = new Config();config.useSingleServer().setAddress("redis://ip:port").setPassword("password").setDatabase(0);RedissonClient redissonClient = Redisson.create(config);RLock lock = redissonClient.getLock("LOCK_KEY");long waitTime=500L;long leaseTime=15000L;boolean isLock;try { isLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (isLock) { // do something ... }} catch (InterruptedException e) { Thread.currentThread().interrupt();} finally { lock.unlock();}
留神代码中 Config 并无限度,示例中是Redis单节点连贯,但实际上能够是哨兵模式、集群模式、主从模式等。
2.2. 源码解说
后面例子中加锁用到了RLock
接口,这里贴一下源码:
org.redisson.api.RLock.java
public interface RLock extends Lock, RLockAsync { String getName(); void lockInterruptibly(long var1, TimeUnit var3) throws InterruptedException; boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException; void lock(long var1, TimeUnit var3); boolean forceUnlock(); boolean isLocked(); boolean isHeldByThread(long var1); boolean isHeldByCurrentThread(); int getHoldCount(); long remainTimeToLive();}
对于可重入锁,接口对应的实现办法在org.redisson.RedissonLock
类外面,源码就不贴了,能够看到落到Redis时,理论的“加锁”和“解锁”过程也是一段lua脚本。
1、加锁
lua
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil;endif (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil;endreturn redis.call('pttl', KEYS[1]);
参数解释:
KEYS[1]
:被锁资源名。ARGV[1]
:过期工夫。ARGV[2]
:以后程序标识(UUID + 以后threadId)。
加锁的逻辑,是在redis中存入一个Set类型值。资源一旦被锁,首次设置Value为1,也只有以后程序可反复加锁,即Value往上加1。
2、解锁
lua
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;endlocal counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;endreturn nil;
参数解释:
KEYS[1]
:被锁资源名。KEYS[2]
:解锁时播送通道名。ARGV[1]
:解锁时播送通道音讯(值为0L)。ARGV[2]
:过期工夫。ARGV[3]
:以后程序标识(UUID + 以后threadId)。
解锁的逻辑,是先判断被锁资源名是否存在,如果存在则给Value减1,当Value为0时,则删除Key,并向指定通道播送音讯。
播送通道的设计很有亮点,当多个线程同时竞争锁时,未抢到锁的线程无需有效轮询,只需订阅一个通道。当锁开释时,在通道中播送音讯,告诉那些期待获取锁的线程当初能够取得锁了,那些线程再去竞争锁,防止性能资源的节约。
3、看门狗机制
如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会呈现锁死的状态,为了防止这种状况的产生,锁都会设置一个过期工夫。即后面在加锁时传入的leaseTime
。某些利用场景中,如果在指定工夫中,咱们尚未实现业务,此时就须要给锁“续期”。如果整个过程齐全可控,能够在程序中手动给锁续期。但如果心愿能主动续期,就能够用到Redisson的Wath Dog(看门狗)机制。
Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被敞开前,一直的缩短锁的有效期,也就是说,如果一个拿到锁的线程始终没有实现逻辑,那么看门狗会帮忙线程一直的缩短锁超时工夫,锁不会因为超时而被开释。默认状况下,看门狗的续期工夫是30s,也能够通过批改Config.lockWatchdogTimeout来另行指定。
上面就是加锁的源码,留神,在调用加锁办法时,如果想用看门狗,则传leaseTime
值为-1L
。如果给leaseTime设置了有效值,那么看门狗就不会失效,锁不会主动续期,而是在你指定的工夫后主动解锁。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
3. RedLock 红锁
3.1. 概念阐明
也是在之前的那一篇文章中,也提到了RedLock,中文直译“红锁”。其实那篇文章曾经介绍过了,这里再介绍一下。后面用redis实现分布式锁时存在破绽,具体场景:
客户端A在Redis master节点申请锁。但master在将存储的key同步到slave上之前解体了,而后slave晋升为master。而客户端B申请一个客户端A曾经持有的资源的锁。而后呢?而后呢?出问题啦,客户端A和B都能申请到同一个锁。
RedLock是Redis官网提出的算法,具体流程包含:
- 获取以后工夫。
- 顺次N个节点获取锁,并设置响应超时工夫,避免单节点获取锁工夫过长。
- 锁无效工夫=锁过期工夫-获取锁消耗工夫,如果第2步骤中获取胜利的节点数大于
N/2+1,且锁无效工夫大于0,则取得锁胜利。 - 若取得锁失败,则向所有节点开释锁。
简略点说,就是在锁过期工夫内,如果半数以上的节点胜利获取到了锁,则阐明获取锁胜利。这个有点像ZooKeeper的选举机制。这里讲讲Redisson中的实现办法。
3.2. 应用示例
Redisson对于RedLock的应用代码上及其简略,只是将几个锁组合成一个“大锁”,而后再失常应用“大锁”的加锁/解锁。
Config config1 = new Config();config1.useSingleServer().setAddress("redis://ip1:port1") .setPassword("password1").setDatabase(0);RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();config2.useSingleServer().setAddress("redis://ip2:port2") .setPassword("password2").setDatabase(0);RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();config3.useSingleServer().setAddress("redis://ip3:port3") .setPassword("password3").setDatabase(0);RedissonClient redissonClient3 = Redisson.create(config3);String lockKey = "REDLOCK_KEY";RLock lock1 = redissonClient1.getLock(lockKey);RLock lock2 = redissonClient2.getLock(lockKey);RLock lock3 = redissonClient3.getLock(lockKey);RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);boolean isLock;long waitTime=500L;long leaseTime=15000L;try { isLock = redLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (isLock) { // do something ... }} catch (Exception e) { ... ...} finally { redLock.unlock();}
留神代码中 Config 并无限度,示例中是Redis单节点连贯,但实际上能够是哨兵模式、集群模式、主从模式等。
3.3. 源码解说
在讲Redisson的 RedLock
(红锁)之前,先讲 MultiLock
(联锁),起因先看 RedissonRedLock
源码,齐全是继承 RedissonMultiLock
的所有性能。
RedissonRedLock.java
public class RedissonRedLock extends RedissonMultiLock { public RedissonRedLock(RLock... locks) { super(locks); } protected int failedLocksLimit() { return this.locks.size() - this.minLocksAmount(this.locks); } protected int minLocksAmount(List<RLock> locks) { return locks.size() / 2 + 1; } protected long calcLockWaitTime(long remainTime) { return Math.max(remainTime / (long)this.locks.size(), 1L); } public void unlock() { this.unlockInner(this.locks); }}
RedissonMultiLock.java外围代码
// 加锁 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1L; if (leaseTime != -1L) { if (waitTime == -1L) { newLeaseTime = unit.toMillis(leaseTime); } else { newLeaseTime = unit.toMillis(waitTime) * 2L; } } long time = System.currentTimeMillis(); long remainTime = -1L; if (waitTime != -1L) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = this.calcLockWaitTime(remainTime); int failedLocksLimit = this.failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList(this.locks.size()); ListIterator iterator = this.locks.listIterator(); while(iterator.hasNext()) { RLock lock = (RLock)iterator.next(); boolean lockAcquired; try { if (waitTime == -1L && leaseTime == -1L) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException var21) { this.unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception var22) { lockAcquired = false; } if (lockAcquired) { acquiredLocks.add(lock); } else { if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) { break; } if (failedLocksLimit == 0) { this.unlockInner(acquiredLocks); if (waitTime == -1L) { return false; } failedLocksLimit = this.failedLocksLimit(); acquiredLocks.clear(); while(iterator.hasPrevious()) { iterator.previous(); } } else { --failedLocksLimit; } } if (remainTime != -1L) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0L) { this.unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1L) { List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size()); Iterator var24 = acquiredLocks.iterator(); while(var24.hasNext()) { RLock rLock = (RLock)var24.next(); RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } var24 = futures.iterator(); while(var24.hasNext()) { RFuture<Boolean> rFuture = (RFuture)var24.next(); rFuture.syncUninterruptibly(); } } return true; } // 解锁 public void unlock() { List<RFuture<Void>> futures = new ArrayList(this.locks.size()); Iterator var2 = this.locks.iterator(); while(var2.hasNext()) { RLock lock = (RLock)var2.next(); futures.add(lock.unlockAsync()); } var2 = futures.iterator(); while(var2.hasNext()) { RFuture<Void> future = (RFuture)var2.next(); future.syncUninterruptibly(); } }
RedissonRedLock.java
中重写了 RedissonMultiLock.java
里的几个办法:
- failedLocksLimit:MultiLock中返回
0
,RedLock中返回locks.size() / 2 - 1
。 - calcLockWaitTime:MultiLock中返回
remainTime
,RedLock中返回Math.max(remainTime / (long)this.locks.size(), 1L)
。
通过源码容易看到,Redisson中的 RedLock算法齐全是基于 MultiLock实现的。Redisson 反对这种“联结锁”的概念,将多个 RLock锁放入一个 ArrayList中,而后开始遍历加锁。只不过 MultiLock的要求比拟刻薄,List中的所有的 RLock加锁时,不能存在任何加锁失败的,即 failedLocksLimit=0。而 RedLock要求放松一点,只有过半加锁胜利即可,即 failedLocksLimit = locks.size() / 2 - 1。但解锁时,要求将整个 ArrayList 中的锁都解一遍。