一、什么是分布式锁
1.1 分布式锁介绍
分布式锁是管制不同零碎之间访问共享资源的一种锁实现,如果不同的零碎或同一个零碎的不同主机之间共享了某个资源时,往往须要互斥来避免彼此烦扰来保障一致性。
1.2 为什么须要分布式锁
在单机部署的零碎中,应用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如应用synchornized、ReentrantLock等。然而在后端集群部署的零碎中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保障同一个JVM过程中保障无效,所以这时就须要应用分布式锁了。这里就不再赘述synchornized锁的原理。
1.3 分布式锁须要具备的条件
分布式锁须要具备互斥性、不会死锁和容错等。互斥性,在于不论任何时候,应该只能有一个线程持有一把锁;不会死锁在于即便是持有锁的客户端意外宕机或产生过程被kill等状况时也能开释锁,不至于导致整个服务死锁。容错性指的是只有大多数节点失常工作,客户端应该都能获取和开释锁。
二、分布式锁的实现形式
目前支流的分布式锁的实现形式,基于数据库实现分布式锁、基于Redis实现分布式锁、基于ZooKeeper实现分布式锁,本篇文章次要介绍了Redis实现的分布式锁。
2.1 由单机部署到集群部署锁的演变
一开始在redis设置一个默认值key:ticket 对应的值为20,并搭建一个Spring Boot服务,用来模仿多窗口卖票景象,配置类的代码就不一一列出了。
2.1.1 单机模式解决并发问题
一开始的时候在redis预设置的门票值ticket=20,那么当一个申请进来之后,会判断是否余票是否是大于0,若大于0那么就将余票减一,再从新写入Redis中,假使库存小于0,那么就会打印谬误日志。
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } return "end"; } }
代码运行剖析: 这里显著有一个问题,就是以后若有两个线程同时申请进来,那么两个线程同时申请这段代码时,如图thread 1 和thread 2同时,两个线程从Redis拿到的数据都是20,那么执行实现后thread 1 和thread 2又将减完后的库存ticket=19从新写入Redis,那么数据就会产生问题,实际上两个线程各减去了一张票数,然而理论写进就减了一次票数,就呈现了数据不统一的景象。
这种问题很好解决,上述问题的产生其实就是从Redis中拿数据和减余票不是原子操作,那么此时只须要将按下图代码给这俩操作加上synchronized同步代码快就能解决这个问题。
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; synchronized (this) { int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } return "end"; }}
代码运行剖析: 此时当多个线程执行到第14行的地位时,只会有一个线程可能获取锁,进入synchronized代码块中执行,当该线程执行实现后才会开释锁,等下个线程进来之后就会从新给这段代码上锁再执行。说简略些就是让每个线程排队执行代码块中的代码,从而保障了线程的平安。
上述的这种做法如果后端服务只有一台机器,那毫无疑问是没问题的,然而当初互联网公司或者是个别软件公司,后端服务都不可能只用一台机器,起码都是2台服务器组成的后端服务集群架构,那么synchronized加锁就显然没有任何作用了。
如下图所示,若后端是两个微服务形成的服务集群,由nginx将多个的申请负载平衡转发到不同的后端服务上,因为synchronize代码块只能在同一个JVM过程中失效,两个申请可能同时进两个服务,所以下面代码中的synchronized就一点作用没有了。
用JMeter工具轻易测试一下,就很简略能发现上述代码的bug。实际上synchronized和juc包下个那些锁都是只能用于JVM过程维度的锁,并不能使用在集群或分布式部署的环境中。
2.1.2 集群模式解决并发问题
通过下面的试验很容易就发现了synchronized等JVM过程级别的锁并不能解决分布式场景中的并发问题,就是为了应答这种场景产生了分布式锁。
本篇文章介绍了Redis实现的分布式锁,能够通过Redis的setnx(只在键key不存在的状况下, 将键key的值设置为value。 若键key曾经存在, 则SETNX命令不做任何动作。)的指令来解决的,这样就能够解决下面集群环境的锁不惟一的状况。
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; // redis setnx 操作 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu"); if (Boolean.FALSE.equals(result)) { return "error"; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } stringRedisTemplate.delete(lockKey); return "end"; }}
代码运行剖析: 代码是有问题的,就是当执行扣减余票操作时,若业务代码报了异样,那么就会导致前面的删除Redis的key代码没有执行到,就会使Redis的key没有删掉的状况,那么Redis的这个key就会始终存在Redis中,前面的线程再进来执行上面这行代码都是执行不胜利的,就会导致线程死锁,那么问题就会很重大了。
为了解决上述问题其实很简略,只有加上一个try...finally即可,这样业务代码即便抛了异样也能够失常的开释锁。setnx + try ... finally解决,具体代码如下:
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu"); if (Boolean.FALSE.equals(result)) { return "error"; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }}
代码运行剖析:上述业务代码执行报错的问题解决了,然而又会有新的问题,当程序执行到try代码块中某个位置服务宕机或者服务从新公布,这样就还是会有上述的Redis的key没有删掉导致死锁的状况。这样能够应用Redis的过期工夫来进行设置key,setnx + 过期工夫解决,如下代码所示:
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu"); //程序执行到这 stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }}
代码运行剖析:上述代码解决了因为程序执行过程中宕机导致的锁没有开释导致的死锁问题,然而如果代码像上述的这种写法依然还是会有问题,当程序执行到第18行时,程序宕机了,此时Redis的过期工夫并没有设置,也会导致线程死锁的景象。能够用了Redis设置的原子命设置过期工夫的命令,原子性过期工夫的setnx命令,如下代码所示:
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, "dewu", 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }}
代码运行剖析:通过设置原子性过期工夫命令能够很好的解决上述这种程序执行过程中忽然宕机的状况。这种Redis分布式锁的实现看似曾经没有问题了,但在高并发场景下任会存在问题,个别软件公司并发量不是很高的状况下,这种实现分布式锁的形式曾经够用了,即便出了些小的数据不统一的问题,也是可能承受的,然而如果是在高并发的场景下,上述的这种实现形式还是会存在很大问题。
如下面代码所示,该分布式锁的过期工夫是10s,如果thread 1执行实现工夫须要15s,且当thread 1线程执行到10s时,Redis的key恰好就是过期就间接开释锁了,此时thread 2就能够取得锁执行代码了,如果thread 2线程执行实现工夫须要8s,那么当thread 2线程执行到第5s时,恰好thread 1线程执行了开释锁的代码————stringRedisTemplate.delete(lockKey); 此时,就会发现thread 1线程删除的锁并不是其本人的加锁,而是thread 2加的锁;那么thread 3就又能够进来了,那么如果一共执行5s,那么当thread 3执行到第3s时,thread 2又会恰好执行到开释锁的代码,那么thread 2又删除了thread 3 加的锁。
在高并发场景下,假使遇到上述问题,那将是灾难性的bug,只有高并发存在,那么这个分布式锁就会时而加锁胜利时而加锁失败。
解决上述问题其实也很简略,让每个线程加的锁时给Redis设置一个惟一id的value,每次开释锁的时候先判断一下线程的惟一id与Redis 存的值是否雷同,若雷同即可开释锁。设置线程id的原子性过期工夫的setnx命令, 具体代码如下:
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { String lockKey = "ticket"; String threadUniqueKey = UUID.randomUUID().toString(); // redis setnx 操作 try { Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, threadUniqueKey, 10, TimeUnit.SECONDS); if (Boolean.FALSE.equals(result)) { return "error"; } int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } finally { if (Objects.equals(stringRedisTemplate.opsForValue().get(lockKey), threadUniqueKey)) { stringRedisTemplate.delete(lockKey); } } return "end"; }}
代码运行剖析:上述实现的Redis分布式锁曾经可能满足大部分利用场景了,然而还是略有有余,比方当线程进来须要的执行工夫超过了Redis key的过期工夫,那么此时曾经开释了,你其余线程就能够立马取得锁执行代码,就又会产生bug了。
分布式锁Redis key的过期工夫不论设置成多少都不适合,比方将过期工夫设置为30s,那么如果业务代码呈现了相似慢SQL、查问数据量很大那么过期工夫就不好设置了。那么这里有没有什么更好的计划呢?答案是有的——锁续命。
那么锁续命计划的原来就在于当线程加锁胜利时,会开一个分线程,取锁过期工夫的1/3工夫点定时执行工作,如上图的锁为例,每10s判断一次锁是否存在(即Redis的key),若锁还存在那么就间接从新设置锁的过期工夫,若锁曾经不存在了那么就间接完结以后的分线程。
2.2 Redison框架实现Redis分布式锁
上述“锁续命”计划说起来简略,然而实现起来还是挺简单的,于是市面上有很多开源框架曾经帮咱们实现好了,所以就不须要本人再去反复造轮子再去写一个分布式锁了,所以本次就拿Redison框架来举例,次要是能够学习这种设计分布式锁的思维。
2.2.1 Redison分布式锁的应用
Redison实现的分布式锁,应用起来还是非常简单的,具体代码如下:
@RestController@Slf4jpublic class RedisLockController { @Resource private Redisson redisson; @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/lock") public String deductTicket() throws InterruptedException { //传入Redis的key String lockKey = "ticket"; // redis setnx 操作 RLock lock = redisson.getLock(lockKey); try { //加锁并且实现锁续命 lock.lock(); int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey)); if (ticketCount > 0) { int realTicketCount = ticketCount - 1; log.info("扣减胜利,残余票数:" + realTicketCount + ""); stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + ""); } else { log.error("扣减失败,余票有余"); } } finally { //开释锁 lock.unlock(); } return "end"; }}
2.2.2 Redison分布式锁的原理
Redison实现分布式锁的原理流程如下图所示,当线程1加锁胜利,并开始执行业务代码时,Redison框架会开启一个后盾线程,每隔锁过期工夫的1/3工夫定时判断一次是否还持有锁(Redis中的key是否还存在),若不持有那么就间接完结以后的后盾线程,若还持有锁,那么就从新设置锁的过期工夫。当线程1加锁胜利后,那么线程2就会加锁失败,此时线程2就会就会做相似于CAS的自旋操作,始终期待线程1开释了之后线程2能力加锁胜利。
2.2.3 Redison分布式锁的源码剖析
Redison底层实现分布式锁时应用了大量的lua脚本保障了其加锁操作的各种原子性。Redison实现分布式锁应用lua脚本的益处次要是能保障Redis的操作是原子性的,Redis会将整个脚本作为一个整体执行,两头不会被其余命令插入。
Redisson外围应用lua脚本加锁源码剖析:
办法名为tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command):
//应用lua脚本加锁办法<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, //当第一个线程进来会间接执行这段逻辑 //判断传入的Redis的key是否存在,即String lockKey = "ticket"; "if (redis.call('exists', KEYS[1]) == 0) then " + //如果不存在那么就设置这个key为传入值、以后线程id 即参数ARGV[2]值(即getLockName(threadId)),并且将线程id的value值设置为1 "redis.call('hset', KEYS[1], ARGV[2], 1); " + //再给这个key设置超时工夫,超时工夫即参数ARGV[1](即internalLockLeaseTime的值)的工夫 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //当第二个线程进来,Redis中的key曾经存在(锁曾经存在),那么间接进这段逻辑 //判断这个Redis key是否存在且以后的这个key是否是以后线程设置的 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //如果是的话,那么就进入重入锁的逻辑,利用hincrby指令将第一个线程进来将线程id的value值设置为1再加1 //而后每次开释锁的时候就会减1,直到这个值为0,这把锁就开释了,这点与juc的可重锁相似 //“hincrby”指令为Redis hash构造的加法 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //假使不是本线程加的锁,而是其余线程加的锁,因为上述lua脚本都是有线程id的校验,那么下面的两段lua脚本都不会执行 //那么此时这里就会将以后这个key的过期工夫返回 "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); // KEYS[1]) ARGV[1] ARGV[2]}// getName()传入KEYS[1],示意传入解锁的keyName,这里是 String lockKey = "ticket";// internalLockLeaseTime传入ARGV[1],示意锁的超时工夫,默认是30秒// getLockName(threadId)传入ARGV[2],示意锁的惟一标识线程id
设置监听器办法:办法名tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId)
//设置监听器办法: private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } //加锁胜利这里会返回一个null值,即ttlRemainingFuture为null //若线程没有加锁胜利,那么这里返回的就是这个别的线程加过的锁的残余的过期工夫,即ttlRemainingFuture为过期工夫 RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); //如果还持有这个锁,则开启定时工作一直刷新该锁的过期工夫 //这里给以后业务加了个监听器 ttlRemainingFuture.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { if (!future.isSuccess()) { return; } Boolean ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining) { //定时工作执行办法 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
定时工作执行办法: 办法名scheduleExpirationRenewal(final long threadId):
//定时工作执行办法 private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } //这里new了一个TimerTask()定时工作器 //这里定时工作会推延执行,推延的工夫是设置的锁过期工夫的1/3, //很容易就能发现是一开始锁的过期工夫默认值30s,具体可见private long lockWatchdogTimeout = 30 * 1000; //过期工夫单位是秒 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //这里又是一个lua脚本 //这里lua脚本先判断了一下,Redis的key是否存在且设置key的线程id是否是参数ARGV[2]值 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //如果这个线程创立的Redis的key即锁依然存在,那么久给锁的过期工夫从新设值为internalLockLeaseTime,也就是初始值30s "redis.call('pexpire', KEYS[1], ARGV[1]); " + //Redis的key过期工夫从新设置胜利后,这里的lua脚本返回的就是1 "return 1; " + "end; " + //如果主线程曾经开释了这个锁,那么这里的lua脚本就会返回0,间接完结“看门狗”的程序 "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }
//下面源码剖析过了,当加锁胜利后tryAcquireAsync()返回的值为null, 那么这个办法的返回值也为nullprivate Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId));}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { //取得以后线程id long threadId = Thread.currentThread().getId(); //由下面的源码剖析能够得出,当加锁胜利后,这个ttl就是null //若线程没有加锁胜利,那么这里返回的就是这个别的线程加过的锁的残余的过期工夫 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired //如果加锁胜利后,这个ttl就是null,那么这个办法后续就不须要做任何逻辑 //若没有加锁胜利这里ttl的值不为null,为别的线程加过锁的残余的过期工夫,就会持续往下执行 if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { //若没有加锁胜利的线程,会在这里做一个死循环,即自旋 while (true) { //始终死循环尝试加锁,这里又是下面的加锁逻辑了 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } //这里不会疯狂自旋,这里会判断锁生效之后才会持续进行自旋,这样能够节俭一点CPU资源 // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
Redison底层解锁源码剖析:
@Override public void unlock() { // 调用异步解锁办法 Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); //当开释锁的线程和已存在锁的线程不是同一个线程,返回null if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } //依据执行lua脚本返回值判断是否勾销续命订阅 if (opStatus) { // 勾销续命订阅 cancelExpirationRenewal(); } }
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //如果锁曾经不存在, 公布锁开释的音讯,返回1 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + //如果开释锁的线程和已存在锁的线程不是同一个线程,返回null "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + //以后线程持有锁,用hincrby命令将锁的可重入次数-1,即线程id的value值-1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //若线程id的value值即可重入锁的次数大于0 ,就更新过期工夫,返回0 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + //否则证实锁曾经开释,删除key并公布锁开释的音讯,返回1 "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } // getName()传入KEYS[1],示意传入解锁的keyName // getChannelName()传入KEYS[2],示意redis外部的音讯订阅channel // LockPubSub.unlockMessage传入ARGV[1],示意向其余redis客户端线程发送解锁音讯 // internalLockLeaseTime传入ARGV[2],示意锁的超时工夫,默认是30秒 // getLockName(threadId)传入ARGV[3],示意锁的惟一标识线程id
void cancelExpirationRenewal() { // 将该线程从定时工作中删除 Timeout task = expirationRenewalMap.remove(getEntryName()); if (task != null) { task.cancel(); } }
上述情况如果是单台Redis,那么利用Redison开源框架实现Redis的分布式锁曾经很完满了,然而往往生产环境的的Redis个别都是哨兵主从架构,Redis的主从架构有别与Zookeeper的主从,客户端只能申请Redis主从架构的Master节点,Slave节点只能做数据备份,Redis从Master同步数据到Slave并不需要同步实现后能力持续接管新的申请,那么就会存在一个主从同步的问题。
当Redis的锁设置胜利,正在执行业务代码,当Redis向从服务器同步时,Redis的Maste节点宕机了,Redis刚刚设置胜利的锁还没来得及同步到Slave节点,那么此时Redis的主从哨兵模式就会从新选举出新的Master节点,那么这个新的Master节点其实就是原来的Slave节点,此时前面申请进来的线程都会申请这个新的Master节点,然而选举后产生的新Master节点实际上是没有那把锁的,那么从而导致了锁的生效。
上述问题用Redis主从哨兵架构实现的分布式锁在这种极其状况下是无奈防止的,然而个别状况下生产上这种故障的概率极低,即便偶然有问题也是能够承受的。
如果想使分布式锁变的百分百牢靠,那能够选用Zookeeper作为分布式锁,就能完满的解决这个问题。因为zk的主从数据同步有别与Redis主从同步,zk的强一致性使得当客户端申请zk的Leader节点加锁时,当Leader将这个锁同步到了zk集群的大部分节点时,Leader节点才会返回客户端加锁胜利,此时当Leader节点宕机之后,zk外部选举产生新的Leader节点,那么新的客户款拜访新的Leader节点时,这个锁也会存在,所以zk集群可能完满解决上述Redis集群的问题。
因为Redis和Zookeeper的设计思路不一样,任何分布式架构都须要满足CAP实践,“鱼和熊掌不可兼得”,要么抉择AP要么抉择CP,很显然Redis是AP构造,而zk是属于CP架构,也导致了两者的数据同步实质上的区别。
其实设计Redis分布式锁有种RedLock的思维就是借鉴zk实现分布式锁的这个特点,这种Redis的加锁形式在Redison框架中也有提供api,具体应用也很简略,这里就不一一赘述了。其次要思维如下图所示:
这种实现形式,我认为生产上并不举荐应用。很简略本来只须要对一个Redis加锁,设置胜利返回即可,然而当初须要对多个Redis进行加锁,无形之中减少了好几次网络IO,万一第一个Redis加锁胜利后,前面几个Redis在加锁过程中呈现了相似网络异样的这种状况,那第一个Redis的数据可能就须要做数据回滚操作了,那为了解决一个极低概率产生的问题又引入了多个可能产生的新问题,很显然得失相当。并且这里还有可能呈现更多乌七八糟的问题,所以我认为这种Redis分布式锁的实现形式极其不举荐生产应用。
退一万说如果真的须要这种强一致性的分布式锁的话,那为什么不间接用zk实现的分布式锁呢,性能必定也比这个RedLock的性能要好。
三、分布式锁应用场景
这里着重讲一下分布式锁的两种以下应用场景:
3.1 热点缓存key重建优化
个别状况下互联网公司根本都是应用“缓存”加过期工夫的策略,这样不仅放慢数据读写, 而且还能保证数据的定期更新,这种策略可能满足大部分需要,然而也会有一种非凡状况会有问题:本来就存在一个冷门的key,因为某个热点新闻的呈现,忽然这个冷门的key申请量暴增成了使其称为了一个热点key,此时缓存生效,并且又无奈在很短时间内从新设置缓存,那么缓存生效的霎时,就会有大量线程来拜访到后端,造成数据库负载加大,从而可能会让利用解体。
例如:“Air Force one”本来就是一个冷门的key存在于缓存中,微博忽然有个明星衣着“Air Force one”上了热搜,那么就会有很多明星的粉丝来得物app购买“Air Force one”,此时的“Air Force one”就间接成为了一个热点key,那么此时“Air Force one”这个key如果缓存恰好生效了之后,就会有大量的申请同时拜访到db,会给后端造成很大的压力,甚至会让零碎宕机。
要解决这个问题只须要用一个简略的分布式锁即可解决这个问题,只容许一个线程去重建缓存,其余线程期待重建缓存的线程执行完, 从新从缓存获取数据即可。可见上面的实例伪代码:
public String getCache(String key) { //从缓存获取数据 String value = stringRedisTemplate.opsForValue().get(key); //传入Redis的key try { if (Objects.isNull(value)) { //这里只容许一个线程进入,从新设置缓存 String mutexKey = "lock_key" + key; //如果加锁胜利 if (stringRedisTemplate.opsForValue().setIfPresent(mutexKey, "poizon", 30, TimeUnit.SECONDS)) { //从db 获取数据 value = mysql.getDataFromMySQL(key); //写回缓存 stringRedisTemplate.opsForValue().setIfPresent(key, "poizon", 60, TimeUnit.SECONDS); //开释锁 stringRedisTemplate.delete(mutexKey); } } else { Thread.sleep(100); getCache(key); } } catch (InterruptedException e) { log.error("getCache is error", e); } return value; }
3.2 解决缓存与数据库数据不统一问题
如果业务对数据的缓存与数据库须要强统一时,且并发量不是很高的状况下的状况下时,就能够间接加一个分布式读写锁就能够间接解决这个问题了。能够间接利用能够加分布式读写锁保障并发读写或写写的时候按程序排好队,读读的时候相当于无锁。
并发量不是很高且业务对缓存与数据库有着强统一对要求时,通过这种形式实现最简略,且成果空谷传声。假使在这种场景下,如果还监听binlog通过音讯的形式提早双删的形式去保证数据一致性的话,引入了新的中间件减少了零碎的复杂度,得失相当。
3.3超高并发场景下的分布式锁设计实践
与ConcurrentHashMap的设计思维有点相似,用分段锁来实现,这个是之前在网上看到的实现思路,自己并没有理论应用过,不晓得水深不深,然而能够学习一下实现思路。
如果A商品的库存是2000个,当初能够将该A商品的2000个库存利用相似ConcurrentHashMap的原理将不同数量段位的库存的利用取模或者是hash算法让其扩容到不同的节点下来,这样这2000的库存就程度扩容到了多个Redis节点上,而后申请Redis拿库存的时候申请本来只能从一个Redis上取数据,当初能够从五个Redis上取数据,从而能够大大提高并发效率。
第四局部:总结与思考
综上可知,Redis分布式锁并不是相对平安,Redis分布式锁在某种极其状况下是无奈防止的,然而个别状况下生产上这种故障的概率极低,即便偶然有问题也是能够承受。
CAP 准则指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)这三个因素最多只能同时实现两点,不可能三者兼顾。鱼和熊掌不可兼得”,要么抉择AP要么抉择CP,抉择Redis作为分布式锁的组件在于其单线程内存操作效率很高,且在高并发场景下也能够放弃很好的性能。
如果肯定要要求分布式锁百分百牢靠,那能够选用Zookeeper或者MySQL作为分布式锁,就能完满的解决锁平安的问题,然而抉择了一致性那就要失去可用性,所以Zookeeper或者MySQL实现的分布式锁的性能远不如Redis实现的分布式锁。
感激浏览此篇文章的你,若有有余的中央烦请指出,大家能够一起探讨学习,共同进步。
文/harmony
关注得物技术,做最潮技术人!