乐趣区

分布式锁3Redisson实现

分布式锁(3)-Redisson 实现

文章分布式锁(2)- 基于 Redis 的实现中,最后给出的 redis 实现的分布式锁,还有一个严重的问题,那就是这种实现是不可重入的,而要实现可重入的分布式锁,会很麻烦,幸亏已经有现成的轮子可以使用。

1.Redisson 简介

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于 Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

相对于 Jedis 而言,Redisson 强大的一批。当然了,随之而来的就是它的复杂性。它里面也实现了分布式锁,而且包含多种类型的锁,更多请参阅分布式锁和同步器

2. 可重入锁

首先引入 jar 包

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.1</version>
</dependency>

然后,通过配置获取 `RedissonClient 客户端的实例,然后 getLock 获取锁的实例,进行操作即可。

public static void main(String[] args) {Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    config.useSingleServer().setPassword("redis1234");
    
    final RedissonClient client = Redisson.create(config);  
    RLock lock = client.getLock("lock1");
    try{lock.lock();
    }finally{lock.unlock();
    }
}

3. 获取锁实例

我们先来看 RLock lock = client.getLock("lock1"); 这句代码就是为了获取锁的实例,然后我们可以看到它返回的是一个RedissonLock 对象。

public RLock getLock(String name) {return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

RedissonLock 构造方法中,主要初始化一些属性。

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);
    // 命令执行器
    this.commandExecutor = commandExecutor;
    //UUID 字符串
    this.id = commandExecutor.getConnectionManager().getId();
    // 内部锁过期时间
    this.internalLockLeaseTime = commandExecutor.
                getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}

4. 加锁

当我们调用 lock 方法,定位到lockInterruptibly。在这里,完成了加锁的逻辑。

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    
    // 当前线程 ID
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 如果 ttl 为空,则证明获取锁成功
    if (ttl == null) {return;}
    // 如果获取锁失败,则订阅到对应这个锁的 channel
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {while (true) {
            // 再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            //ttl 为空,说明成功获取锁,返回
            if (ttl == null) {break;}
            //ttl 大于 0 则等待 ttl 时间后继续尝试获取
            if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 取消对 channel 的订阅
        unsubscribe(future, threadId);
    }
    //get(lockAsync(leaseTime, unit));
}

如上代码,就是加锁的全过程。先调用 tryAcquire 来获取锁,如果返回值 ttl 为空,则证明加锁成功,返回;如果不为空,则证明加锁失败。这时候,它会订阅这个锁的 Channel,等待锁释放的消息,然后重新尝试获取锁。流程如下:

获取锁

获取锁的过程是怎样的呢?接下来就要看 tryAcquire 方法。在这里,它有两种处理方式,一种是带有过期时间的锁,一种是不带过期时间的锁。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {

    // 如果带有过期时间,则按照普通方式获取锁
    if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    // 先按照 30 秒的过期时间来执行获取锁的方法
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        
    // 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

接着往下看,tryLockInnerAsync方法是真正执行获取锁的逻辑,它是一段 LUA 脚本代码。在这里,它使用的是 hash 数据结构。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command) {

        // 过期时间
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // 如果锁不存在,则通过 hset 设置它的值,并设置过期时间
                  "if (redis.call('exists', KEYS[1]) == 0) then" +
                      "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                      "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                      "return nil;" +
                  "end;" +
                  // 如果锁已存在,并且锁的是当前线程,则通过 hincrby 给数值递增 1
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                      "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                      "return nil;" +
                  "end;" +
                  // 如果锁已存在,但并非本线程,则返回过期时间 ttl
                  "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }

这段 LUA 代码看起来并不复杂,有三个判断:

  • 通过 exists 判断,如果锁不存在,则设置值和过期时间,加锁成功
  • 通过 hexists 判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功
  • 如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败

加锁成功后,在 redis 的内存数据中,就有一条 hash 结构的数据。Key 为锁的名称;field 为随机字符串 + 线程 ID;值为 1。如果同一线程多次调用 lock 方法,值递增 1。

127.0.0.1:6379> hgetall lock1
1) "b5ae0be4-5623-45a5-8faa-ab7eb167ce87:1"
2) "1"

5. 解锁

我们通过调用 unlock 方法来解锁。

public RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise<Void>();
    
    // 解锁方法
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            // 获取返回值
            Boolean opStatus = future.getNow();
            // 如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"
                        + id + "thread-id:" + threadId);
                result.tryFailure(cause);
                return;
            }
            // 解锁成功,取消刷新过期时间的那个定时任务
            if (opStatus) {cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}

然后我们再看 unlockInnerAsync 方法。这里也是一段 LUA 脚本代码。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
    
            // 如果锁已经不存在,发布锁释放的消息
            "if (redis.call('exists', KEYS[1]) == 0) then" +
                "redis.call('publish', KEYS[2], ARGV[1]);" +
                "return 1;" +
            "end;" +
            // 如果释放锁的线程和已存在锁的线程不是同一个线程,返回 null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
                "return nil;" +
            "end;" +
            // 通过 hincrby 递减 1 的方式,释放一次锁
            // 若剩余次数大于 0,则刷新过期时间
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
            "if (counter > 0) then" +
                "redis.call('pexpire', KEYS[1], ARGV[2]);" +
                "return 0;" +
            // 否则证明锁已经释放,删除 key 并发布锁释放的消息
            "else" +
                "redis.call('del', KEYS[1]);" +
                "redis.call('publish', KEYS[2], ARGV[1]);" +
                "return 1;"+
            "end;" +
            "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

如上代码,就是释放锁的逻辑。同样的,它也是有三个判断:

  • 如果锁已经不存在,通过 publish 发布锁释放的消息,解锁成功
  • 如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常
  • 通过 hincrby 递减 1,先释放一次锁。若剩余次数还大于 0,则证明当前锁是重入锁,刷新过期时间;若剩余次数小于 0,删除 key 并发布锁释放的消息,解锁成功

至此,Redisson 中的可重入锁的逻辑,就分析完了。但值得注意的是,上面的两种实现方式都是针对单机 Redis 实例而进行的。如果我们有多个 Redis 实例,请参阅Redlock 算法。该算法的具体内容,请参考 http://redis.cn/topics/distlo…

退出移动版