关于redis:Redisson分布式锁的实现原理及源码

52次阅读

共计 8635 个字符,预计需要花费 22 分钟才能阅读完成。

Redisson 分布式锁的实现原理及源码

源码解析

简略的业务代码

次要负责源码入口, 分布式锁的应用次要有三个办法

  1. RLock lock = redissonClient.getLock("hpc-lock")获取实现可重入分布式锁的类
  2. lock.lock() 加锁
  3. lock.unlock()解锁

    @GetMapping("/redis/lock")  
    public ResResult testDistributedLock() {RLock lock = redissonClient.getLock("hpc-lock");  
      lock.lock();  
      try {System.out.println("加锁业务, xxx, xxx, xxxx");  
      } finally {lock.unlock();  
      }  
      return new ResResult(true, "");  
    }

源码解析

获取实现可重入分布式锁的类

redissonClient.getLock("hpc-lock"),该办法次要是获取实现了分布式可重入锁的类,进入 getLock 办法

@Override  
public RLock getLock(String name) {return new RedissonLock(commandExecutor, name);  
}

发现是初始化了 RedissonLock 类, 追到结构类办法

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);  
    this.commandExecutor = commandExecutor;
    // 通过下一行代码并进入追踪,发现默认线程的外部锁租用工夫为默认的 30s,// 也就是 30s 后主动释法锁
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();  
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}

加锁逻辑

lock.lock() 该办法次要性能是进行加锁,进入 lock() 办法,并向下追踪找到外围逻辑,找到办法 org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 并查看外围逻辑。
该外围逻辑次要有三个点:尝试加锁 未加锁胜利的线程订阅 Redis 的音讯 未加锁胜利的线程通过自旋获取锁

尝试加锁逻辑

首先看 org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 办法的尝试加锁局部,如下

long threadId = Thread.currentThread().getId();  
// 获取锁
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);  
// 获取锁胜利,间接返回
if (ttl == null) {return;}

进入 tryAcquire(-1, leaseTime, unit, threadId) 办法并一直向下跟踪,找到外围逻辑 org.redisson.RedissonLock#tryAcquireAsync 办法体如下,该办法次要有几个逻辑:加锁 锁续命

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {  
    RFuture<Long> ttlRemainingFuture;  
    // 获取锁
    if (leaseTime != -1) {  
        // 有锁生效工夫
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
    } else {  
        // 采纳默认锁生效工夫
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,  
 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);  
    }  
    // 加锁后回调该办法
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {  
        // 加锁胜利
        if (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);  
            } else {  
                // 如果没有传入锁生效工夫,也就是在加锁时采纳的是默认的锁生效工夫
                // 加锁胜利后,进行锁续命
                scheduleExpirationRenewal(threadId);  
            }  
        }  
        return ttlRemaining;  
    });  
    return new CompletableFutureWrapper<>(f);  
}
  1. 首先,进入 加锁 的外围逻辑tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG), 办法体如下:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  
        // 对 redis 执行 lua 脚本
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,  
             "if (redis.call('exists', KEYS[1]) == 0) then" +  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +  
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +  
                        "return nil;" +  
                        "end;" +  
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +  
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +  
                        "return nil;" +  
                        "end;" +  
                        "return redis.call('pttl', KEYS[1]);",  
             Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));  
    }

    这里重要的是 Lua 脚本的逻辑,上面简略解释 lua 脚本的逻辑

    • 判断 redis 中是否存在具备 getRawName() 的键值的数据。(留神: 这里 getRawName() 所获取的值就是业务办法 RLock lock = redissonClient.getLock("hpc-lock") 中传入的参数hpc-lock
    • 如果不存在键值,则保留该键值到 redis 中,并且该 key 对用的 value 值为 1;同时该键值的生效工夫设置为 unit.toMillis(leaseTime)(这里的生效工夫如果用户不传的话,个别采纳默认的 30s,这在之前的对redissonClient.getLock("hpc-lock") 源码解析中曾经剖析过),并返回 null 值。
    • 如果存在键值,则对该键值的 value 的值进行自增 1,并且从新设置该键值的生效工夫,生效工夫设置为unit.toMillis(leaseTime),同时返回 null 值
    • 如果还有其它状况,则返回该键值的残余过期工夫,如果该键值不存在返回 -2,如果该键值没有过期工夫返回 -1(这是 Lua 脚本中的 return redis.call('pttl', KEYS[1]))
      留神: Lua 脚本在 Redis 中是先天具备原子性的,只有 Lua 脚本执行完之后,Redis 才会进行其它操作,因而不必放心 Lua 脚本的并发问题。
  2. 当获取锁胜利后,会进入回调办法,进行锁续命的逻辑,进入外围办法 scheduleExpirationRenewal(threadId) 中,办法体如下:

    protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();  
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);  
        if (oldEntry != null) {oldEntry.addThreadId(threadId);  
        } else {entry.addThreadId(threadId);  
            try {  
                // 续期
                renewExpiration();} finally {  
                // 当线程中断时,勾销续期,这个下边剖析,现不探讨
                if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);  
                }  
            }  
        }  
    }

    首先看续期的代码 renewExpiration(),该办法内容较多,只看外围局部。其中internalLockLeaseTime 的值在前文剖析过,如果用户不传键值的有效期的话,默认为 30s,通过上面代码能够看到该工作均匀 10s 执行一次,也就是线程加锁后是 10s 续命一次。

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {  
            @Override  
         public void run(Timeout timeout) throws Exception {  
                //......
    
                // 进行续约
                RFuture<Boolean> future = renewExpirationAsync(threadId);  
                // 执行完续约后的回调
                future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock "+ getRawName() +" expiration", e);  
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());  
                        return; 
                    }  
                      
                    if (res) {  
                        // 执行胜利,回调本人
                        // reschedule itself  
                        renewExpiration();} else {  
                        // 敞开续约
                        cancelExpirationRenewal(null);  
                    }  
                });  
         }  
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    • 持续跟踪外围办法renewExpirationAsync(threadId),查看该线程续约的外围逻辑

      protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +  
                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +  
                          "return 1;" +  
                          "end;" +  
                          "return 0;",  
               Collections.singletonList(getRawName()),  
               internalLockLeaseTime, getLockName(threadId));  
      }

      能够看到仍然是执行 Lua 脚本,该脚本的逻辑是,如果该键值存在 (仍在加锁 / 仍在执行锁内业务) 时,从新设置 Redis 中该键值的生效工夫internalLockLeaseTime(默认为 30s,前文以剖析),并返回 1;如果检测到 Redis 中不存在该键值,则间接返回 0。

    • 看办法cancelExpirationRenewal(null),敞开续约的办法

      protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());  
           if (task == null) {return;}  
            
          if (threadId != null) {task.removeThreadId(threadId);  
           }  
        
          if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();  
               if (timeout != null) { 
                      // 如果有续约的定时工作,间接敞开
                      timeout.cancel();}  
              EXPIRATION_RENEWAL_MAP.remove(getEntryName());  
          }  
      }
未加锁胜利的线程订阅 Redis 音讯

回到 org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 的办法中,通过下面 尝试加锁 逻辑的剖析能够看到如果加锁胜利后会间接返回,但未加锁成绩会持续向下执行代码。
先看一下未加锁胜利的线程订阅 Redis 音讯的外围代码:

// 订阅音讯
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);  
if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);  
} else {commandExecutor.syncSubscription(future);  
}

持续进入代码 subscribe(threadId) 中,并持续追踪,查看如下外围逻辑,发现是未获取锁的线程通过信号量来监听接管信息

public CompletableFuture<E> subscribe(String entryName, String channelName) {AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));  
    CompletableFuture<E> newPromise = new CompletableFuture<>();  
  
    int timeout = service.getConnectionManager().getConfig().getTimeout();  
    Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {  
        newPromise.completeExceptionally(new RedisTimeoutException(  
                "Unable to acquire subscription lock after" + timeout + "ms." +  
                        "Increase'subscriptionsPerConnection'and/or'subscriptionConnectionPoolSize'parameters."));  
         }, timeout, TimeUnit.MILLISECONDS);  

     semaphore.acquire(() -> {// ......});  
  
     return newPromise;  
}
未加锁胜利的线程通过自旋获取锁

回到 org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 的办法中,看一下通过自旋获取锁的代码如下

try {while (true) {  
        // 尝试获取锁
        ttl = tryAcquire(-1, leaseTime, unit, threadId);  
         // 锁获取胜利,间接跳出循环  
         if (ttl == null) {break;}  
  
        // 期待信号量,if (ttl >= 0) {  
            try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
             } catch (InterruptedException e) {if (interruptibly) {throw e;}  
                commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
             }  
        } else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();  
             } else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();  
             }  
        }  
    }  
} finally {  
    // 勾销订阅
    unsubscribe(commandExecutor.getNow(future), threadId);  
}

开释锁的逻辑

业务代码中的 lock.unlock() 是用来解锁的代码,并向下跟踪外围办法代码如下

@Override  
public RFuture<Void> unlockAsync(long threadId) {  
    // 开释锁代码
    RFuture<Boolean> future = unlockInnerAsync(threadId);  
  
     CompletionStage<Void> f = future.handle((opStatus, e) -> {  
        // 勾销到期续订,上文提到过,这里不再解释
        cancelExpirationRenewal(threadId);  
  
         if (e != null) {throw new CompletionException(e);  
         }  
        if (opStatus == null) {  
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"  
     + id + "thread-id:" + threadId);  
             throw new CompletionException(cause);  
         }  
      
        return null;  
     });  
  
     return new CompletableFutureWrapper<>(f);  
}

上述办法有两个次要代码:开释锁 勾销到期续订 。勾销到期续订的办法cancelExpirationRenewal(threadId) 上文形容过。这里次要剖析开释锁代码的办法,进入的办法 unlockInnerAsync(threadId)

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
         "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +  
                    "return nil;" +  
                    "end;" +  
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +  
                    "if (counter > 0) then" +  
                    "redis.call('pexpire', KEYS[1], ARGV[2]);" +  
                    "return 0;" +  
                    "else" +  
                    "redis.call('del', KEYS[1]);" +  
                    "redis.call('publish', KEYS[2], ARGV[1]);" +  
                    "return 1;" +  
                    "end;" +  
                    "return nil;",  
        Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  
}

这里次要还是执行的 Lua 脚本,Lua 脚本的逻辑如下:

  • 如果不存在该键值,间接返回 null
  • 该键值的 value 间接减 1,再获取 value 的值,如果 value 的值仍大于 0,则从新设置该键值的生效工夫,而后返回 0;如果 value 不大于 0,则间接删除该键值,并公布订阅音讯,并返回 1
  • 其它状况间接返回 null

正文完
 0