一、前言

分布式锁在理论工作中的利用还是比拟多的,其实现形式也有很多种,常见的有基于数据库锁、基于zookeeper、基于redis的,明天咱们来讲下基于redis实现的分布式锁。

redisson是一个redis客户端框架,提供了分布式锁的性能个性,这里咱们通过解析redisson的源码来剖析它是如何基于redis来实现分布式锁的?

二、源码解析

2.1 样例代码

上面是一个分布式锁的简略样例代码

// 初始化配置,创立Redisson客户端Config config = new Config();config.setCodec(new JsonJacksonCodec())        .useSingleServer()        .setAddress("redis://192.168.10.131:6379");RedissonClient client = Redisson.create(config);// 获取分布式锁RLock lock = client.getLock("myLock");lock.lock();System.out.println(Thread.currentThread().getId() + ": 获取到分布式锁");try {    Thread.sleep(60 * 1000);} catch (Exception e) {    e.printStackTrace();} finally {    // 解锁    lock.unlock();}

下面的样例代码比较简单,通过redisson客户端获取一个分布式锁,该分布式锁的key为myLock,睡眠60秒之后开释锁。这里比拟重要的是lock()办法,该办法是获取锁的具体步骤,所以接下来具体解析一下该办法

2.2 整体流程

获取锁的流程图如下

具体流程为:

  • 第一次尝试获取锁,如果获取到锁,间接返回。如果未获取到锁,返回锁的残余过期工夫ttl
  • 当未获取到锁时,订阅频道redisson_lock__channel:{myLock}(订阅该频道的作用是,当该分布式锁被其余拥有者所开释时,会往该订阅频道发送一个解锁音讯UNLOCK_MESSAGE,这时以后期待该分布式锁的线程会中断期待,并再次尝试获取锁
  • 开启死循环,尝试获取锁,如果未获取到锁,拿到锁的残余过期工夫,并期待该锁的残余过期工夫(两头过程中如果订阅频道有解锁音讯UNLOCK_MESSAGE,会提前中断期待,持续循环),直到获取锁,退出循环
  • 获取到锁之后,勾销订阅频道

源码如下

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {        long threadId = Thread.currentThread().getId();        // 1、第一次尝试获取锁,ttl为null,示意获取到锁,间接return        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);        if (ttl == null) {            return;        }        // 2、订阅频道redisson_lock__channel:{myLock}        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);        pubSub.timeout(future);        RedissonLockEntry entry;        if (interruptibly) {            entry = commandExecutor.getInterrupted(future);        } else {            entry = commandExecutor.get(future);        }        try {            // 3、开启循环            while (true) {                // 再次尝试获取锁,ttl为null,示意获取到锁,退出循环                ttl = tryAcquire(-1, leaseTime, unit, threadId);                if (ttl == null) {                    break;                }                // 如果ttl大于等于0                if (ttl >= 0) {                    try {                        // 期待ttl工夫 或者 接管到解锁音讯                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                    } catch (InterruptedException e) {                        ...                    }                } else {    // 如果ttl小于0,阐明该锁未设置过期工夫,期待接管解锁音讯                    if (interruptibly) {                        entry.getLatch().acquire();                    } else {                        entry.getLatch().acquireUninterruptibly();                    }                }            }        } finally {            // 退出订阅频道redisson_lock__channel:{myLock}            unsubscribe(entry, threadId);        }    }

2.3 锁的获取

那么如何示意以后线程获取到锁

redisson中的分布式锁本质上是个hash构造的数据,假如锁的名称为myLock,那么当某个线程获取到锁之后,会在这个hash构造里设置一个hashkey,其为 【连贯管理器id】 : 【线程id】,如下图

redisson通过执行lua脚本来获取锁,lua脚本如下

// 如果锁不存在,则胜利获取到锁,设置锁的过期工夫,并返回nilif (redis.call('exists', KEYS[1]) == 0) then    redis.call('hincrby', KEYS[1], ARGV[2], 1);    redis.call('pexpire', KEYS[1], ARGV[1]);    return nil;end;// 如果锁已存在,判断是否是以后线程曾经获取到,如果是,对应的值加1if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then    redis.call('hincrby', KEYS[1], ARGV[2], 1);    redis.call('pexpire', KEYS[1], ARGV[1]);    return nil;end;// 否则示意未获取到锁,返回锁的过期工夫return redis.call('pttl', KEYS[1]);

该lua脚本的次要作用是

  1. 如果锁不存在,则胜利获取到锁,设置锁的过期工夫(lockWatchdogTimeout默认是30秒),并返回nil
  2. 如果锁已存在,判断是否是以后线程曾经获取到,如果是,对应的值加1
  3. 否则示意未获取到锁,返回锁的过期工夫

这里的第一步为什么要设置锁的过期工夫?其实是为了当锁的拥有者挂了之后,防止锁始终存在,导致其余利用永远无奈获取到锁

2.4 锁续期

那么既然锁设置了过期工夫,那很天然地想到,如果在锁过期的这段时间内,领有锁的线程还未执行完业务逻辑,这时锁主动过期,导致其余利用也获取到了锁,从而产生逻辑谬误。所以引入了锁续期

当获取到锁时,redisson会启动一个看门狗,该看门狗每隔 lockWatchdogTimeout / 3秒续期一次锁(假如lockWatchdogTimeout默认为30秒,则每隔10秒续期锁),源码如下

private void renewExpiration() {    ...        // 1、创立一个10秒后执行的提早工作    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {        @Override        public void run(Timeout timeout) throws Exception {            ...                        // 2、执行续期锁的lua脚本            CompletionStage<Boolean> future = renewExpirationAsync(threadId);            future.whenComplete((res, e) -> {                ...                                // 3、res为true,代表锁续期胜利,从新调用该办法,持续创立提早工作                // false示意锁续期失败                if (res) {                    renewExpiration();                } else {                    cancelExpirationRenewal(null);                }            });        }    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);    }

续期锁的lua脚本如下:

// 如果锁存在这个hashkey,从新设置锁的过期工夫if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then    redis.call('pexpire', KEYS[1], ARGV[1]);    return 1;end;return 0;

到这里,redisson实现分布式锁的源码解析就完结了。

三、总结

redisson的源码中大量应用了异步编程,这导致浏览源码的难度系数较高,这里我也只是大略整顿了一下,有问题的同学能够相互讨论一下或自行查阅源码。