关于redis:Redis-Redisson-分布式锁的应用和源码

39次阅读

共计 8153 个字符,预计需要花费 21 分钟才能阅读完成。

1. 前言

之前写过一篇《Redis 分布式锁的实现》的文章,次要介绍的 Redis 分布式锁的原始性实现,外围是基于 setnx 来加锁,以及应用 lua 保障事务的原子性等。但毕竟比拟原始,须要依据不同的利用场景做不同的代码实现,也容易考虑不周。过后文章中就有提到 Redisson框架,刚好最近工作中又用的比拟多,这次就着重介绍。

Redisson 是架设在 Redis 根底上的一个 Java 开发框架,底层基于 Netty 框架,为使用者提供了一系列具备分布式个性的常用工具类。Redisson 的性能十分丰盛,具体可参考 github 中文 wiki,但本文只介绍 Redisson 分布式锁的性能。

2. 一般可重入锁

2.1. 应用示例

在 SpringBoot 我的项目通过 Redisson 来加锁非常容易,不须要像之前文章中一样写一大堆代码,框架屏蔽掉了很多细节。如下例:

Config config = new Config();
config.useSingleServer().setAddress("redis://ip:port").setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);

RLock lock = redissonClient.getLock("LOCK_KEY");
long waitTime=500L;
long leaseTime=15000L;
boolean isLock;
try {isLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
    if (isLock) {// do something ...}
} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {lock.unlock();
}

留神代码中 Config 并无限度,示例中是 Redis 单节点连贯,但实际上能够是哨兵模式、集群模式、主从模式等。

2.2. 源码解说

后面例子中加锁用到了 RLock 接口,这里贴一下源码:
org.redisson.api.RLock.java

public interface RLock extends Lock, RLockAsync {String getName();

    void lockInterruptibly(long var1, TimeUnit var3) throws InterruptedException;

    boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;

    void lock(long var1, TimeUnit var3);

    boolean forceUnlock();

    boolean isLocked();

    boolean isHeldByThread(long var1);

    boolean isHeldByCurrentThread();

    int getHoldCount();

    long remainTimeToLive();}

对于可重入锁,接口对应的实现办法在 org.redisson.RedissonLock 类外面,源码就不贴了,能够看到落到 Redis 时,理论的“加锁”和“解锁”过程也是一段 lua 脚本。

1、加锁

lua

if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end
return redis.call('pttl', KEYS[1]);

参数解释:

  • KEYS[1]:被锁资源名。
  • ARGV[1]:过期工夫。
  • ARGV[2]:以后程序标识(UUID + 以后 threadId)。

加锁的逻辑,是在 redis 中存入一个 Set 类型值。资源一旦被锁,首次设置 Value 为 1,也只有以后程序可反复加锁,即 Value 往上加 1。

2、解锁

lua

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end
return nil;

参数解释:

  • KEYS[1]:被锁资源名。
  • KEYS[2]:解锁时播送通道名。
  • ARGV[1]:解锁时播送通道音讯(值为 0L)。
  • ARGV[2]:过期工夫。
  • ARGV[3]:以后程序标识(UUID + 以后 threadId)。

解锁的逻辑,是先判断被锁资源名是否存在,如果存在则给 Value 减 1,当 Value 为 0 时,则删除 Key,并向指定通道播送音讯。

播送通道的设计很有亮点,当多个线程同时竞争锁时,未抢到锁的线程无需有效轮询,只需订阅一个通道。当锁开释时,在通道中播送音讯,告诉那些期待获取锁的线程当初能够取得锁了,那些线程再去竞争锁,防止性能资源的节约。

3、看门狗机制

如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会呈现锁死的状态,为了防止这种状况的产生,锁都会设置一个过期工夫。即后面在加锁时传入的leaseTime。某些利用场景中,如果在指定工夫中,咱们尚未实现业务,此时就须要给锁“续期”。如果整个过程齐全可控,能够在程序中手动给锁续期。但如果心愿能主动续期,就能够用到 Redisson 的 Wath Dog(看门狗)机制。

Redisson 提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被敞开前,一直的缩短锁的有效期,也就是说,如果一个拿到锁的线程始终没有实现逻辑,那么看门狗会帮忙线程一直的缩短锁超时工夫,锁不会因为超时而被开释。默认状况下,看门狗的续期工夫是 30s,也能够通过批改 Config.lockWatchdogTimeout 来另行指定。

上面就是加锁的源码,留神,在调用加锁办法时,如果想用看门狗,则传 leaseTime 值为-1L。如果给 leaseTime 设置了有效值,那么看门狗就不会失效,锁不会主动续期,而是在你指定的工夫后主动解锁。

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining == null) {this.scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

3. RedLock 红锁

3.1. 概念阐明

也是在之前的那一篇文章中,也提到了 RedLock,中文直译“红锁”。其实那篇文章曾经介绍过了,这里再介绍一下。后面用 redis 实现分布式锁时存在破绽,具体场景:

客户端 A 在 Redis master 节点申请锁。但 master 在将存储的 key 同步到 slave 上之前解体了,而后 slave 晋升为 master。而客户端 B 申请一个客户端 A 曾经持有的资源的锁。而后呢?而后呢?出问题啦,客户端 A 和 B 都能申请到同一个锁。

RedLock 是 Redis 官网提出的算法,具体流程包含:

  1. 获取以后工夫。
  2. 顺次 N 个节点获取锁,并设置响应超时工夫,避免单节点获取锁工夫过长。
  3. 锁无效工夫 = 锁过期工夫 - 获取锁消耗工夫,如果第 2 步骤中获取胜利的节点数大于
    N/2+1, 且锁无效工夫大于 0,则取得锁胜利。
  4. 若取得锁失败,则向所有节点开释锁。

简略点说,就是在锁过期工夫内,如果半数以上的节点胜利获取到了锁,则阐明获取锁胜利。这个有点像 ZooKeeper 的选举机制。这里讲讲 Redisson 中的实现办法。

3.2. 应用示例

Redisson 对于 RedLock 的应用代码上及其简略,只是将几个锁组合成一个“大锁”,而后再失常应用“大锁”的加锁 / 解锁。

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://ip1:port1")
        .setPassword("password1").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://ip2:port2")
        .setPassword("password2").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://ip3:port3")
        .setPassword("password3").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);

String lockKey = "REDLOCK_KEY";
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
long waitTime=500L;
long leaseTime=15000L;
try {isLock = redLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
    if (isLock) {// do something ...}
} catch (Exception e) {... ...} finally {redLock.unlock();
}

留神代码中 Config 并无限度,示例中是 Redis 单节点连贯,但实际上能够是哨兵模式、集群模式、主从模式等。

3.3. 源码解说

在讲 Redisson 的 RedLock(红锁)之前,先讲 MultiLock(联锁),起因先看 RedissonRedLock源码,齐全是继承 RedissonMultiLock的所有性能。

RedissonRedLock.java

public class RedissonRedLock extends RedissonMultiLock {public RedissonRedLock(RLock... locks) {super(locks);
    }

    protected int failedLocksLimit() {return this.locks.size() - this.minLocksAmount(this.locks);
    }

    protected int minLocksAmount(List<RLock> locks) {return locks.size() / 2 + 1;
    }

    protected long calcLockWaitTime(long remainTime) {return Math.max(remainTime / (long)this.locks.size(), 1L);
    }

    public void unlock() {this.unlockInner(this.locks);
    }
}

RedissonMultiLock.java 外围代码

    // 加锁
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1L;
        if (leaseTime != -1L) {if (waitTime == -1L) {newLeaseTime = unit.toMillis(leaseTime);
            } else {newLeaseTime = unit.toMillis(waitTime) * 2L;
            }
        }

        long time = System.currentTimeMillis();
        long remainTime = -1L;
        if (waitTime != -1L) {remainTime = unit.toMillis(waitTime);
        }

        long lockWaitTime = this.calcLockWaitTime(remainTime);
        int failedLocksLimit = this.failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList(this.locks.size());
        ListIterator iterator = this.locks.listIterator();

        while(iterator.hasNext()) {RLock lock = (RLock)iterator.next();

            boolean lockAcquired;
            try {if (waitTime == -1L && leaseTime == -1L) {lockAcquired = lock.tryLock();
                } else {long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (RedisResponseTimeoutException var21) {this.unlockInner(Arrays.asList(lock));
                lockAcquired = false;
            } catch (Exception var22) {lockAcquired = false;}

            if (lockAcquired) {acquiredLocks.add(lock);
            } else {if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {break;}

                if (failedLocksLimit == 0) {this.unlockInner(acquiredLocks);
                    if (waitTime == -1L) {return false;}

                    failedLocksLimit = this.failedLocksLimit();
                    acquiredLocks.clear();

                    while(iterator.hasPrevious()) {iterator.previous();
                    }
                } else {--failedLocksLimit;}
            }

            if (remainTime != -1L) {remainTime -= System.currentTimeMillis() - time;
                time = System.currentTimeMillis();
                if (remainTime <= 0L) {this.unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1L) {List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
            Iterator var24 = acquiredLocks.iterator();

            while(var24.hasNext()) {RLock rLock = (RLock)var24.next();
                RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }

            var24 = futures.iterator();

            while(var24.hasNext()) {RFuture<Boolean> rFuture = (RFuture)var24.next();
                rFuture.syncUninterruptibly();}
        }

        return true;
    }
    
    // 解锁
    public void unlock() {List<RFuture<Void>> futures = new ArrayList(this.locks.size());
        Iterator var2 = this.locks.iterator();

        while(var2.hasNext()) {RLock lock = (RLock)var2.next();
            futures.add(lock.unlockAsync());
        }

        var2 = futures.iterator();

        while(var2.hasNext()) {RFuture<Void> future = (RFuture)var2.next();
            future.syncUninterruptibly();}

    }

RedissonRedLock.java 中重写了 RedissonMultiLock.java里的几个办法:

  • failedLocksLimit:MultiLock 中返回0,RedLock 中返回 locks.size() / 2 - 1
  • calcLockWaitTime:MultiLock 中返回 remainTime,RedLock 中返回 Math.max(remainTime / (long)this.locks.size(), 1L)

通过源码容易看到,Redisson 中的 RedLock 算法齐全是基于 MultiLock 实现的。Redisson 反对这种“联结锁”的概念,将多个 RLock 锁放入一个 ArrayList 中,而后开始遍历加锁。只不过 MultiLock 的要求比拟刻薄,List 中的所有的 RLock 加锁时,不能存在任何加锁失败的,即 failedLocksLimit=0。而 RedLock 要求放松一点,只有过半加锁胜利即可,即 failedLocksLimit = locks.size() / 2 – 1。但解锁时,要求将整个 ArrayList 中的锁都解一遍。

正文完
 0