Redisson分布式锁的实现原理及源码

源码解析

简略的业务代码

次要负责源码入口, 分布式锁的应用次要有三个办法

  1. RLock lock = redissonClient.getLock("hpc-lock")获取实现可重入分布式锁的类
  2. lock.lock() 加锁
  3. lock.unlock()解锁

    @GetMapping("/redis/lock")  public ResResult testDistributedLock() {    RLock lock = redissonClient.getLock("hpc-lock");    lock.lock();    try {           System.out.println("加锁业务, xxx, xxx, xxxx");    } finally {           lock.unlock();    }    return new ResResult(true, "");  }

源码解析

获取实现可重入分布式锁的类

redissonClient.getLock("hpc-lock") ,该办法次要是获取实现了分布式可重入锁的类,进入getLock办法

@Override  public RLock getLock(String name) {      return new RedissonLock(commandExecutor, name);  }

发现是初始化了RedissonLock类, 追到结构类办法

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {      super(commandExecutor, name);      this.commandExecutor = commandExecutor;    // 通过下一行代码并进入追踪,发现默认线程的外部锁租用工夫为默认的30s,    // 也就是30s后主动释法锁    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();      this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();  }

加锁逻辑

lock.lock() 该办法次要性能是进行加锁,进入lock()办法,并向下追踪找到外围逻辑,找到办法org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)并查看外围逻辑。
该外围逻辑次要有三个点:尝试加锁未加锁胜利的线程订阅Redis的音讯未加锁胜利的线程通过自旋获取锁

尝试加锁逻辑

首先看org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)办法的尝试加锁局部,如下

long threadId = Thread.currentThread().getId();  // 获取锁Long ttl = tryAcquire(-1, leaseTime, unit, threadId);  // 获取锁胜利,间接返回if (ttl == null) {      return;  }

进入tryAcquire(-1, leaseTime, unit, threadId)办法并一直向下跟踪,找到外围逻辑org.redisson.RedissonLock#tryAcquireAsync办法体如下,该办法次要有几个逻辑:加锁锁续命

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {      RFuture<Long> ttlRemainingFuture;      // 获取锁    if (leaseTime != -1) {          // 有锁生效工夫        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);      } else {          // 采纳默认锁生效工夫        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,   TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);      }      // 加锁后回调该办法    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {          // 加锁胜利        if (ttlRemaining == null) {              if (leaseTime != -1) {                  internalLockLeaseTime = unit.toMillis(leaseTime);              } else {                  // 如果没有传入锁生效工夫,也就是在加锁时采纳的是默认的锁生效工夫                // 加锁胜利后,进行锁续命                scheduleExpirationRenewal(threadId);              }          }          return ttlRemaining;      });      return new CompletableFutureWrapper<>(f);  }
  1. 首先,进入加锁的外围逻辑tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG), 办法体如下:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {      // 对redis执行lua脚本    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,           "if (redis.call('exists', KEYS[1]) == 0) then " +                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +                      "return nil; " +                      "end; " +                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +                      "return nil; " +                      "end; " +                      "return redis.call('pttl', KEYS[1]);",           Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));  }

    这里重要的是Lua脚本的逻辑,上面简略解释lua脚本的逻辑

    • 判断redis中是否存在具备getRawName()的键值的数据。(留神: 这里getRawName()所获取的值就是业务办法RLock lock = redissonClient.getLock("hpc-lock")中传入的参数hpc-lock
    • 如果不存在键值,则保留该键值到redis中,并且该key对用的value值为1;同时该键值的生效工夫设置为unit.toMillis(leaseTime)(这里的生效工夫如果用户不传的话,个别采纳默认的30s,这在之前的对redissonClient.getLock("hpc-lock")源码解析中曾经剖析过),并返回null值。
    • 如果存在键值,则对该键值的value的值进行自增1,并且从新设置该键值的生效工夫,生效工夫设置为unit.toMillis(leaseTime), 同时返回null值
    • 如果还有其它状况,则返回该键值的残余过期工夫,如果该键值不存在返回-2,如果该键值没有过期工夫返回-1(这是Lua脚本中的return redis.call('pttl', KEYS[1]))
      留神: Lua脚本在Redis中是先天具备原子性的,只有Lua脚本执行完之后,Redis才会进行其它操作,因而不必放心Lua脚本的并发问题。
  2. 当获取锁胜利后,会进入回调办法,进行锁续命的逻辑,进入外围办法scheduleExpirationRenewal(threadId)中,办法体如下:

    protected void scheduleExpirationRenewal(long threadId) {      ExpirationEntry entry = new ExpirationEntry();      ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);      if (oldEntry != null) {          oldEntry.addThreadId(threadId);      } else {          entry.addThreadId(threadId);          try {              // 续期            renewExpiration();          } finally {              // 当线程中断时,勾销续期,这个下边剖析,现不探讨            if (Thread.currentThread().isInterrupted()) {                  cancelExpirationRenewal(threadId);              }          }      }  }

    首先看续期的代码renewExpiration(),该办法内容较多,只看外围局部。其中internalLockLeaseTime的值在前文剖析过,如果用户不传键值的有效期的话,默认为30s,通过上面代码能够看到该工作均匀10s执行一次,也就是线程加锁后是10s续命一次。

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {          @Override       public void run(Timeout timeout) throws Exception {              //......            // 进行续约            RFuture<Boolean> future = renewExpirationAsync(threadId);              // 执行完续约后的回调            future.whenComplete((res, e) -> {                  if (e != null) {                      log.error("Can't update lock " + getRawName() + " expiration", e);                      EXPIRATION_RENEWAL_MAP.remove(getEntryName());                      return;                 }                                    if (res) {                      // 执行胜利,回调本人                    // reschedule itself                      renewExpiration();                  } else {                      // 敞开续约                    cancelExpirationRenewal(null);                  }              });       }  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    • 持续跟踪外围办法renewExpirationAsync(threadId),查看该线程续约的外围逻辑

      protected RFuture<Boolean> renewExpirationAsync(long threadId) {      return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,           "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +                      "return 1; " +                      "end; " +                      "return 0;",           Collections.singletonList(getRawName()),           internalLockLeaseTime, getLockName(threadId));  }

      能够看到仍然是执行Lua脚本,该脚本的逻辑是,如果该键值存在(仍在加锁/仍在执行锁内业务)时,从新设置Redis中该键值的生效工夫internalLockLeaseTime(默认为30s,前文以剖析),并返回1;如果检测到Redis中不存在该键值,则间接返回0。

    • 看办法cancelExpirationRenewal(null),敞开续约的办法

      protected void cancelExpirationRenewal(Long threadId) {      ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());       if (task == null) {              return;       }            if (threadId != null) {          task.removeThreadId(threadId);       }        if (threadId == null || task.hasNoThreads()) {          Timeout timeout = task.getTimeout();           if (timeout != null) {                 // 如果有续约的定时工作,间接敞开                timeout.cancel();           }          EXPIRATION_RENEWAL_MAP.remove(getEntryName());      }  }
未加锁胜利的线程订阅Redis音讯

回到org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)的办法中,通过下面尝试加锁逻辑的剖析能够看到如果加锁胜利后会间接返回,但未加锁成绩会持续向下执行代码。
先看一下未加锁胜利的线程订阅Redis音讯的外围代码:

// 订阅音讯CompletableFuture<RedissonLockEntry> future = subscribe(threadId);  if (interruptibly) {      commandExecutor.syncSubscriptionInterrupted(future);  } else {      commandExecutor.syncSubscription(future);  }

持续进入代码subscribe(threadId)中,并持续追踪,查看如下外围逻辑,发现是未获取锁的线程通过信号量来监听接管信息

public CompletableFuture<E> subscribe(String entryName, String channelName) {      AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));      CompletableFuture<E> newPromise = new CompletableFuture<>();        int timeout = service.getConnectionManager().getConfig().getTimeout();      Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {          newPromise.completeExceptionally(new RedisTimeoutException(                  "Unable to acquire subscription lock after " + timeout + "ms. " +                          "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));           }, timeout, TimeUnit.MILLISECONDS);       semaphore.acquire(() -> {          // ......     });         return newPromise;  }
未加锁胜利的线程通过自旋获取锁

回到org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)的办法中,看一下通过自旋获取锁的代码如下

try {      while (true) {          // 尝试获取锁        ttl = tryAcquire(-1, leaseTime, unit, threadId);           // 锁获取胜利,间接跳出循环           if (ttl == null) {                  break;           }            // 期待信号量,          if (ttl >= 0) {              try {                  commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);               } catch (InterruptedException e) {                  if (interruptibly) {                      throw e;                   }                  commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);               }          } else {              if (interruptibly) {                  commandExecutor.getNow(future).getLatch().acquire();               } else {                  commandExecutor.getNow(future).getLatch().acquireUninterruptibly();               }          }      }  } finally {      // 勾销订阅    unsubscribe(commandExecutor.getNow(future), threadId);  }

开释锁的逻辑

业务代码中的lock.unlock()是用来解锁的代码,并向下跟踪外围办法代码如下

@Override  public RFuture<Void> unlockAsync(long threadId) {      // 开释锁代码    RFuture<Boolean> future = unlockInnerAsync(threadId);         CompletionStage<Void> f = future.handle((opStatus, e) -> {          // 勾销到期续订,上文提到过,这里不再解释        cancelExpirationRenewal(threadId);             if (e != null) {                  throw new CompletionException(e);           }          if (opStatus == null) {                  IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "       + id + " thread-id: " + threadId);               throw new CompletionException(cause);           }                return null;       });         return new CompletableFutureWrapper<>(f);  }

上述办法有两个次要代码:开释锁勾销到期续订。勾销到期续订的办法cancelExpirationRenewal(threadId)上文形容过。这里次要剖析开释锁代码的办法,进入的办法unlockInnerAsync(threadId)

protected RFuture<Boolean> unlockInnerAsync(long threadId) {      return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,           "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                      "return nil;" +                      "end; " +                      "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                      "if (counter > 0) then " +                      "redis.call('pexpire', KEYS[1], ARGV[2]); " +                      "return 0; " +                      "else " +                      "redis.call('del', KEYS[1]); " +                      "redis.call('publish', KEYS[2], ARGV[1]); " +                      "return 1; " +                      "end; " +                      "return nil;",          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  }

这里次要还是执行的Lua脚本,Lua脚本的逻辑如下:

  • 如果不存在该键值,间接返回null
  • 该键值的value间接减1,再获取value的值,如果value的值仍大于0,则从新设置该键值的生效工夫,而后返回0;如果value不大于0,则间接删除该键值,并公布订阅音讯,并返回1
  • 其它状况间接返回null