关于redis:得物技术浅谈深入浅出的Redis分布式锁

1次阅读

共计 19179 个字符,预计需要花费 48 分钟才能阅读完成。

一、什么是分布式锁

1.1 分布式锁介绍

分布式锁是管制不同零碎之间访问共享资源的一种锁实现,如果不同的零碎或同一个零碎的不同主机之间共享了某个资源时,往往须要互斥来避免彼此烦扰来保障一致性。

1.2 为什么须要分布式锁

在单机部署的零碎中,应用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如应用 synchornized、ReentrantLock 等。然而在后端集群部署的零碎中,程序在不同的 JVM 虚拟机中运行,且因为 synchronized 或 ReentrantLock 都只能保障同一个 JVM 过程中保障无效,所以这时就须要应用分布式锁了。这里就不再赘述 synchornized 锁的原理。

1.3 分布式锁须要具备的条件

分布式锁须要具备互斥性、不会死锁和容错等。互斥性,在于不论任何时候,应该只能有一个线程持有一把锁;不会死锁在于即便是持有锁的客户端意外宕机或产生过程被 kill 等状况时也能开释锁,不至于导致整个服务死锁。容错性指的是只有大多数节点失常工作,客户端应该都能获取和开释锁。

二、分布式锁的实现形式

目前支流的分布式锁的实现形式,基于数据库实现分布式锁、基于 Redis 实现分布式锁、基于 ZooKeeper 实现分布式锁,本篇文章次要介绍了 Redis 实现的分布式锁。

2.1 由单机部署到集群部署锁的演变

一开始在 redis 设置一个默认值 key:ticket 对应的值为 20,并搭建一个 Spring Boot 服务,用来模仿多窗口卖票景象,配置类的代码就不一一列出了。

2.1.1 单机模式解决并发问题

一开始的时候在 redis 预设置的门票值 ticket=20,那么当一个申请进来之后,会判断是否余票是否是大于 0,若大于 0 那么就将余票减一,再从新写入 Redis 中,假使库存小于 0,那么就会打印谬误日志。

@RestController
@Slf4j
public class RedisLockController {
    
    @Resource
    private Redisson redisson;
    
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {
        String lockKey = "ticket";
        int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
        if (ticketCount > 0) {
            int realTicketCount = ticketCount - 1;
            log.info("扣减胜利,残余票数:" + realTicketCount + "");
            stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
        } else {log.error("扣减失败,余票有余");
        }
        return "end";
    }
    
}

代码运行剖析: 这里显著有一个问题,就是以后若有两个线程同时申请进来,那么两个线程同时申请这段代码时,如图 thread 1 和 thread 2 同时,两个线程从 Redis 拿到的数据都是 20,那么执行实现后 thread 1 和 thread 2 又将减完后的库存 ticket=19 从新写入 Redis,那么数据就会产生问题,实际上两个线程各减去了一张票数,然而理论写进就减了一次票数,就呈现了数据不统一的景象。

这种问题很好解决,上述问题的产生其实就是从 Redis 中拿数据和减余票不是原子操作,那么此时只须要将按下图代码给这俩操作加上 synchronized 同步代码快就能解决这个问题。

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {
        String lockKey = "ticket";
        synchronized (this) {int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }
        }
        return "end";
    }

}

代码运行剖析: 此时当多个线程执行到第 14 行的地位时,只会有一个线程可能获取锁,进入 synchronized 代码块中执行,当该线程执行实现后才会开释锁,等下个线程进来之后就会从新给这段代码上锁再执行。说简略些就是让每个线程排队执行代码块中的代码,从而保障了线程的平安。

上述的这种做法如果后端服务只有一台机器,那毫无疑问是没问题的,然而当初互联网公司或者是个别软件公司,后端服务都不可能只用一台机器,起码都是 2 台服务器组成的后端服务集群架构,那么 synchronized 加锁就显然没有任何作用了。

如下图所示,若后端是两个微服务形成的服务集群,由 nginx 将多个的申请负载平衡转发到不同的后端服务上,因为 synchronize 代码块只能在同一个 JVM 过程中失效,两个申请可能同时进两个服务,所以下面代码中的 synchronized 就一点作用没有了。

用 JMeter 工具轻易测试一下,就很简略能发现上述代码的 bug。实际上 synchronized 和 juc 包下个那些锁都是只能用于 JVM 过程维度的锁,并不能使用在集群或分布式部署的环境中。

2.1.2 集群模式解决并发问题

通过下面的试验很容易就发现了 synchronized 等 JVM 过程级别的锁并不能解决分布式场景中的并发问题,就是为了应答这种场景产生了分布式锁。

本篇文章介绍了 Redis 实现的分布式锁,能够通过 Redis 的 setnx(只在键 key 不存在的状况下, 将键 key 的值设置为 value。若键 key 曾经存在, 则 SETNX 命令不做任何动作。)的指令来解决的,这样就能够解决下面集群环境的锁不惟一的状况。

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        String lockKey = "ticket";
        // redis setnx 操作
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
        if (Boolean.FALSE.equals(result)) {return "error";}

        int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
        if (ticketCount > 0) {
            int realTicketCount = ticketCount - 1;
            log.info("扣减胜利,残余票数:" + realTicketCount + "");
            stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
        } else {log.error("扣减失败,余票有余");
        }

        stringRedisTemplate.delete(lockKey);
        return "end";
    }

}

代码运行剖析: 代码是有问题的,就是当执行扣减余票操作时,若业务代码报了异样,那么就会导致前面的删除 Redis 的 key 代码没有执行到,就会使 Redis 的 key 没有删掉的状况,那么 Redis 的这个 key 就会始终存在 Redis 中,前面的线程再进来执行上面这行代码都是执行不胜利的,就会导致线程死锁,那么问题就会很重大了。

为了解决上述问题其实很简略,只有加上一个 try…finally 即可,这样业务代码即便抛了异样也能够失常的开释锁。setnx + try … finally 解决,具体代码如下:

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        String lockKey = "ticket";
        // redis setnx 操作
        try {Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
            if (Boolean.FALSE.equals(result)) {return "error";}
            
            int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }
        } finally {stringRedisTemplate.delete(lockKey);
        }
        return "end";
    }

}

代码运行剖析:上述业务代码执行报错的问题解决了,然而又会有新的问题,当程序执行到 try 代码块中某个位置服务宕机或者服务从新公布,这样就还是会有上述的 Redis 的 key 没有删掉导致死锁的状况。这样能够应用 Redis 的过期工夫来进行设置 key,setnx + 过期工夫解决,如下代码所示:

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        String lockKey = "ticket";
        // redis setnx 操作
        try {Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "dewu");
            // 程序执行到这
            stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
            if (Boolean.FALSE.equals(result)) {return "error";}

            int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }
        } finally {stringRedisTemplate.delete(lockKey);
        }
        return "end";
    }

}

代码运行剖析 :上述代码解决了因为程序执行过程中宕机导致的锁没有开释导致的死锁问题,然而如果代码像上述的这种写法依然还是会有问题,当程序执行到第 18 行时,程序宕机了,此时 Redis 的过期工夫并没有设置,也会导致线程死锁的景象。能够用了 Redis 设置的原子命设置过期工夫的命令, 原子性过期工夫的 setnx 命令,如下代码所示:

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        String lockKey = "ticket";
        // redis setnx 操作
        try {Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, "dewu", 10, TimeUnit.SECONDS);
            if (Boolean.FALSE.equals(result)) {return "error";}

            int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }

        } finally {stringRedisTemplate.delete(lockKey);
        }
        return "end";
    }

}

代码运行剖析:通过设置原子性过期工夫命令能够很好的解决上述这种程序执行过程中忽然宕机的状况。这种 Redis 分布式锁的实现看似曾经没有问题了,但在高并发场景下任会存在问题,个别软件公司并发量不是很高的状况下,这种实现分布式锁的形式曾经够用了,即便出了些小的数据不统一的问题,也是可能承受的,然而如果是在高并发的场景下,上述的这种实现形式还是会存在很大问题。

如下面代码所示,该分布式锁的过期工夫是 10s,如果 thread 1 执行实现工夫须要 15s,且当 thread 1 线程执行到 10s 时,Redis 的 key 恰好就是过期就间接开释锁了,此时 thread 2 就能够取得锁执行代码了,如果 thread 2 线程执行实现工夫须要 8s,那么当 thread 2 线程执行到第 5s 时,恰好 thread 1 线程执行了开释锁的代码————stringRedisTemplate.delete(lockKey); 此时,就会发现 thread 1 线程删除的锁并不是其本人的加锁,而是 thread 2 加的锁;那么 thread 3 就又能够进来了,那么如果一共执行 5s,那么当 thread 3 执行到第 3s 时,thread 2 又会恰好执行到开释锁的代码,那么 thread 2 又删除了 thread 3 加的锁。

在高并发场景下,假使遇到上述问题,那将是灾难性的 bug,只有高并发存在,那么这个分布式锁就会时而加锁胜利时而加锁失败。

解决上述问题其实也很简略,让每个线程加的锁时给 Redis 设置一个惟一 id 的 value,每次开释锁的时候先判断一下线程的惟一 id 与 Redis 存的值是否雷同,若雷同即可开释锁。设置线程 id 的原子性过期工夫的 setnx 命令, 具体代码如下:

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        String lockKey = "ticket";
        String threadUniqueKey = UUID.randomUUID().toString();
        // redis setnx 操作
        try {Boolean result = stringRedisTemplate.opsForValue().setIfPresent(lockKey, threadUniqueKey, 10, TimeUnit.SECONDS);
            if (Boolean.FALSE.equals(result)) {return "error";}

            int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }
        } finally {if (Objects.equals(stringRedisTemplate.opsForValue().get(lockKey), threadUniqueKey)) {stringRedisTemplate.delete(lockKey);
            }
        }
        return "end";
    }

}

代码运行剖析:上述实现的 Redis 分布式锁曾经可能满足大部分利用场景了,然而还是略有有余,比方当线程进来须要的执行工夫超过了 Redis key 的过期工夫,那么此时曾经开释了,你其余线程就能够立马取得锁执行代码,就又会产生 bug 了。

分布式锁 Redis key 的过期工夫不论设置成多少都不适合,比方将过期工夫设置为 30s,那么如果业务代码呈现了相似慢 SQL、查问数据量很大那么过期工夫就不好设置了。那么这里有没有什么更好的计划呢?答案是有的——锁续命。

那么锁续命计划的原来就在于当线程加锁胜利时,会开一个分线程,取锁过期工夫的 1 / 3 工夫点定时执行工作,如上图的锁为例,每 10s 判断一次锁是否存在(即 Redis 的 key),若锁还存在那么就间接从新设置锁的过期工夫,若锁曾经不存在了那么就间接完结以后的分线程。

2.2 Redison 框架实现 Redis 分布式锁

上述“锁续命”计划说起来简略,然而实现起来还是挺简单的,于是市面上有很多开源框架曾经帮咱们实现好了,所以就不须要本人再去反复造轮子再去写一个分布式锁了,所以本次就拿 Redison 框架来举例,次要是能够学习这种设计分布式锁的思维。

2.2.1 Redison 分布式锁的应用

Redison 实现的分布式锁,应用起来还是非常简单的,具体代码如下:

@RestController
@Slf4j
public class RedisLockController {

    @Resource
    private Redisson redisson;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/lock")
    public String deductTicket() throws InterruptedException {

        // 传入 Redis 的 key
        String lockKey = "ticket";
        // redis setnx 操作
        RLock lock = redisson.getLock(lockKey);
        try {
            // 加锁并且实现锁续命
            lock.lock();
            int ticketCount = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
            if (ticketCount > 0) {
                int realTicketCount = ticketCount - 1;
                log.info("扣减胜利,残余票数:" + realTicketCount + "");
                stringRedisTemplate.opsForValue().set(lockKey, realTicketCount + "");
            } else {log.error("扣减失败,余票有余");
            }

        } finally {
            // 开释锁
            lock.unlock();}
        return "end";
    }

}

2.2.2 Redison 分布式锁的原理

Redison 实现分布式锁的原理流程如下图所示,当线程 1 加锁胜利,并开始执行业务代码时,Redison 框架会开启一个后盾线程,每隔锁过期工夫的 1 / 3 工夫定时判断一次是否还持有锁(Redis 中的 key 是否还存在),若不持有那么就间接完结以后的后盾线程,若还持有锁,那么就从新设置锁的过期工夫。当线程 1 加锁胜利后,那么线程 2 就会加锁失败,此时线程 2 就会就会做相似于 CAS 的自旋操作,始终期待线程 1 开释了之后线程 2 能力加锁胜利。

2.2.3 Redison 分布式锁的源码剖析

Redison 底层实现分布式锁时应用了大量的 lua 脚本保障了其加锁操作的各种原子性。Redison 实现分布式锁应用 lua 脚本的益处次要是能保障 Redis 的操作是原子性的,Redis 会将整个脚本作为一个整体执行,两头不会被其余命令插入。

Redisson 外围应用 lua 脚本加锁源码剖析:

办法名为 tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command):

 // 应用 lua 脚本加锁办法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);

     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
           // 当第一个线程进来会间接执行这段逻辑                            
           // 判断传入的 Redis 的 key 是否存在,即 String lockKey = "ticket";
           "if (redis.call('exists', KEYS[1]) == 0) then" +    
           // 如果不存在那么就设置这个 key 为传入值、以后线程 id 即参数 ARGV[2]值(即 getLockName(threadId)), 并且将线程 id 的 value 值设置为 1
             "redis.call('hset', KEYS[1], ARGV[2], 1);" +    
          // 再给这个 key 设置超时工夫,超时工夫即参数 ARGV[1](即 internalLockLeaseTime 的值)的工夫
             "redis.call('pexpire', KEYS[1], ARGV[1]);" +        
             "return nil;" +
             "end;" +
          // 当第二个线程进来,Redis 中的 key 曾经存在(锁曾经存在),那么间接进这段逻辑
          // 判断这个 Redis key 是否存在且以后的这个 key 是否是以后线程设置的
           "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
          // 如果是的话,那么就进入重入锁的逻辑,利用 hincrby 指令将第一个线程进来将线程 id 的 value 值设置为 1 再加 1 
          // 而后每次开释锁的时候就会减 1,直到这个值为 0,这把锁就开释了,这点与 juc 的可重锁相似           
          //“hincrby”指令为 Redis hash 构造的加法
             "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
             "redis.call('pexpire', KEYS[1], ARGV[1]);" +
             "return nil;" +
             "end;" +
          // 假使不是本线程加的锁,而是其余线程加的锁, 因为上述 lua 脚本都是有线程 id 的校验,那么下面的两段 lua 脚本都不会执行
          // 那么此时这里就会将以后这个 key 的过期工夫返回 
             "return redis.call('pttl', KEYS[1]);",
             Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));   // KEYS[1])  ARGV[1]   ARGV[2]
}
// getName()传入 KEYS[1],示意传入解锁的 keyName,这里是 String lockKey = "ticket";
// internalLockLeaseTime 传入 ARGV[1],示意锁的超时工夫,默认是 30 秒
// getLockName(threadId)传入 ARGV[2],示意锁的惟一标识线程 id

设置监听器办法:办法名 tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId)

    // 设置监听器办法:private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
   // 加锁胜利这里会返回一个 null 值,即 ttlRemainingFuture 为 null
   // 若线程没有加锁胜利,那么这里返回的就是这个别的线程加过的锁的残余的过期工夫, 即 ttlRemainingFuture 为过期工夫
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        // 如果还持有这个锁,则开启定时工作一直刷新该锁的过期工夫
        // 这里给以后业务加了个监听器
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {return;}

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    // 定时工作执行办法
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

定时工作执行办法: 办法名 scheduleExpirationRenewal(final long threadId):

    // 定时工作执行办法
    private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}

        // 这里 new 了一个 TimerTask()定时工作器
        // 这里定时工作会推延执行,推延的工夫是设置的锁过期工夫的 1 /3,
        // 很容易就能发现是一开始锁的过期工夫默认值 30s,具体可见 private long lockWatchdogTimeout = 30 * 1000;
        // 过期工夫单位是秒
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
             // 这里又是一个 lua 脚本
             // 这里 lua 脚本先判断了一下,Redis 的 key 是否存在且设置 key 的线程 id 是否是参数 ARGV[2]值
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" + 
             // 如果这个线程创立的 Redis 的 key 即锁依然存在,那么久给锁的过期工夫从新设值为 internalLockLeaseTime,也就是初始值 30s
                            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
             //Redis 的 key 过期工夫从新设置胜利后,这里的 lua 脚本返回的就是 1
                            "return 1;" +
                        "end;" +
             // 如果主线程曾经开释了这个锁,那么这里的 lua 脚本就会返回 0,间接完结“看门狗”的程序
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {log.error("Can't update lock "+ getName() +" expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);    
        

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();
        }
    }
// 下面源码剖析过了,当加锁胜利后 tryAcquireAsync()返回的值为 null,那么这个办法的返回值也为 null
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));
}
     public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 取得以后线程 id
         long threadId = Thread.currentThread().getId();
         // 由下面的源码剖析能够得出,当加锁胜利后,这个 ttl 就是 null
         // 若线程没有加锁胜利,那么这里返回的就是这个别的线程加过的锁的残余的过期工夫
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
         // 如果加锁胜利后,这个 ttl 就是 null,那么这个办法后续就不须要做任何逻辑
         // 若没有加锁胜利这里 ttl 的值不为 null,为别的线程加过锁的残余的过期工夫,就会持续往下执行
        if (ttl == null) {return;}

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
        // 若没有加锁胜利的线程,会在这里做一个死循环,即自旋
            while (true) {
                // 始终死循环尝试加锁,这里又是下面的加锁逻辑了
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {break;}
        // 这里不会疯狂自旋,这里会判断锁生效之后才会持续进行自旋,这样能够节俭一点 CPU 资源
                // waiting for message
                if (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {unsubscribe(future, threadId);
        }
    //        get(lockAsync(leaseTime, unit));
    }

Redison 底层解锁源码剖析:

    @Override
    public void unlock() {
        // 调用异步解锁办法
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        // 当开释锁的线程和已存在锁的线程不是同一个线程,返回 null
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"
                    + id + "thread-id:" + Thread.currentThread().getId());
        }
        // 依据执行 lua 脚本返回值判断是否勾销续命订阅
        if (opStatus) {
            // 勾销续命订阅
            cancelExpirationRenewal();}
    }
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 如果锁曾经不存在,公布锁开释的音讯,返回 1
                "if (redis.call('exists', KEYS[1]) == 0) then" +
                    "redis.call('publish', KEYS[2], ARGV[1]);" +
                    "return 1;" +
                "end;" +
                // 如果开释锁的线程和已存在锁的线程不是同一个线程,返回 null
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
                    "return nil;" +
                "end;" +
                // 以后线程持有锁,用 hincrby 命令将锁的可重入次数 -1,即线程 id 的 value 值 -1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
                // 若线程 id 的 value 值即可重入锁的次数大于 0,就更新过期工夫,返回 0
                "if (counter > 0) then" +
                    "redis.call('pexpire', KEYS[1], ARGV[2]);" +
                    "return 0;" +
                // 否则证实锁曾经开释,删除 key 并公布锁开释的音讯,返回 1
                "else" +
                    "redis.call('del', KEYS[1]);" +
                    "redis.call('publish', KEYS[2], ARGV[1]);" +
                    "return 1;"+
                "end;" +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }
    // getName()传入 KEYS[1],示意传入解锁的 keyName
    // getChannelName()传入 KEYS[2],示意 redis 外部的音讯订阅 channel
    // LockPubSub.unlockMessage 传入 ARGV[1],示意向其余 redis 客户端线程发送解锁音讯
    // internalLockLeaseTime 传入 ARGV[2],示意锁的超时工夫,默认是 30 秒
    // getLockName(threadId)传入 ARGV[3],示意锁的惟一标识线程 id
    void cancelExpirationRenewal() {
        // 将该线程从定时工作中删除
        Timeout task = expirationRenewalMap.remove(getEntryName());
        if (task != null) {task.cancel();
        }
    }

上述情况如果是单台 Redis,那么利用 Redison 开源框架实现 Redis 的分布式锁曾经很完满了,然而往往生产环境的的 Redis 个别都是哨兵主从架构,Redis 的主从架构有别与 Zookeeper 的主从,客户端只能申请 Redis 主从架构的 Master 节点,Slave 节点只能做数据备份,Redis 从 Master 同步数据到 Slave 并不需要同步实现后能力持续接管新的申请,那么就会存在一个主从同步的问题。

当 Redis 的锁设置胜利,正在执行业务代码,当 Redis 向从服务器同步时,Redis 的 Maste 节点宕机了,Redis 刚刚设置胜利的锁还没来得及同步到 Slave 节点,那么此时 Redis 的主从哨兵模式就会从新选举出新的 Master 节点,那么这个新的 Master 节点其实就是原来的 Slave 节点,此时前面申请进来的线程都会申请这个新的 Master 节点,然而选举后产生的新 Master 节点实际上是没有那把锁的,那么从而导致了锁的生效。

上述问题用 Redis 主从哨兵架构实现的分布式锁在这种极其状况下是无奈防止的,然而个别状况下生产上这种故障的概率极低,即便偶然有问题也是能够承受的。

如果想使分布式锁变的百分百牢靠,那能够选用 Zookeeper 作为分布式锁,就能完满的解决这个问题。因为 zk 的主从数据同步有别与 Redis 主从同步,zk 的强一致性使得当客户端申请 zk 的 Leader 节点加锁时,当 Leader 将这个锁同步到了 zk 集群的大部分节点时,Leader 节点才会返回客户端加锁胜利,此时当 Leader 节点宕机之后,zk 外部选举产生新的 Leader 节点,那么新的客户款拜访新的 Leader 节点时,这个锁也会存在,所以 zk 集群可能完满解决上述 Redis 集群的问题。

因为 Redis 和 Zookeeper 的设计思路不一样,任何分布式架构都须要满足 CAP 实践,“鱼和熊掌不可兼得”,要么抉择 AP 要么抉择 CP,很显然 Redis 是 AP 构造,而 zk 是属于 CP 架构,也导致了两者的数据同步实质上的区别。

其实设计 Redis 分布式锁有种 RedLock 的思维就是借鉴 zk 实现分布式锁的这个特点,这种 Redis 的加锁形式在 Redison 框架中也有提供 api,具体应用也很简略,这里就不一一赘述了。其次要思维如下图所示:

这种实现形式,我认为生产上并不举荐应用。很简略本来只须要对一个 Redis 加锁,设置胜利返回即可,然而当初须要对多个 Redis 进行加锁,无形之中减少了好几次网络 IO,万一第一个 Redis 加锁胜利后,前面几个 Redis 在加锁过程中呈现了相似网络异样的这种状况,那第一个 Redis 的数据可能就须要做数据回滚操作了,那为了解决一个极低概率产生的问题又引入了多个可能产生的新问题,很显然得失相当。并且这里还有可能呈现更多乌七八糟的问题,所以我认为这种 Redis 分布式锁的实现形式极其不举荐生产应用。

退一万说如果真的须要这种强一致性的分布式锁的话,那为什么不间接用 zk 实现的分布式锁呢,性能必定也比这个 RedLock 的性能要好。

三、分布式锁应用场景

这里着重讲一下分布式锁的两种以下应用场景:

3.1 热点缓存 key 重建优化

个别状况下互联网公司根本都是应用“缓存”加过期工夫的策略,这样不仅放慢数据读写,而且还能保证数据的定期更新,这种策略可能满足大部分需要,然而也会有一种非凡状况会有问题:本来就存在一个冷门的 key,因为某个热点新闻的呈现,忽然这个冷门的 key 申请量暴增成了使其称为了一个热点 key,此时缓存生效,并且又无奈在很短时间内从新设置缓存,那么缓存生效的霎时,就会有大量线程来拜访到后端,造成数据库负载加大,从而可能会让利用解体。

例如:“Air Force one”本来就是一个冷门的 key 存在于缓存中,微博忽然有个明星衣着“Air Force one”上了热搜,那么就会有很多明星的粉丝来得物 app 购买“Air Force one”,此时的“Air Force one”就间接成为了一个热点 key,那么此时“Air Force one”这个 key 如果缓存恰好生效了之后,就会有大量的申请同时拜访到 db,会给后端造成很大的压力,甚至会让零碎宕机。

要解决这个问题只须要用一个简略的分布式锁即可解决这个问题,只容许一个线程去重建缓存,其余线程期待重建缓存的线程执行完,从新从缓存获取数据即可。可见上面的实例伪代码:

    public String getCache(String key) {
        // 从缓存获取数据
        String value = stringRedisTemplate.opsForValue().get(key);
        // 传入 Redis 的 key
        try {if (Objects.isNull(value)) {
               // 这里只容许一个线程进入,从新设置缓存
                String mutexKey = "lock_key" + key;

                // 如果加锁胜利
                if (stringRedisTemplate.opsForValue().setIfPresent(mutexKey, "poizon", 30, TimeUnit.SECONDS)) {
                    // 从 db 获取数据
                    value = mysql.getDataFromMySQL(key);
                    // 写回缓存
                    stringRedisTemplate.opsForValue().setIfPresent(key, "poizon", 60, TimeUnit.SECONDS);
                    // 开释锁
                    stringRedisTemplate.delete(mutexKey);
                }
                
            } else {Thread.sleep(100);
                getCache(key);
            }

        } catch (InterruptedException e) {log.error("getCache is error", e);
        }
        return value;
    }

3.2 解决缓存与数据库数据不统一问题

如果业务对数据的缓存与数据库须要强统一时,且并发量不是很高的状况下的状况下时,就能够间接加一个分布式读写锁就能够间接解决这个问题了。能够间接利用能够加分布式读写锁保障并发读写或写写的时候按程序排好队,读读的时候相当于无锁。

并发量不是很高且业务对缓存与数据库有着强统一对要求时,通过这种形式实现最简略,且成果空谷传声。假使在这种场景下,如果还监听 binlog 通过音讯的形式提早双删的形式去保证数据一致性的话,引入了新的中间件减少了零碎的复杂度,得失相当。

3.3 超高并发场景下的分布式锁设计实践

与 ConcurrentHashMap 的设计思维有点相似,用分段锁来实现,这个是之前在网上看到的实现思路,自己并没有理论应用过,不晓得水深不深,然而能够学习一下实现思路。

如果 A 商品的库存是 2000 个,当初能够将该 A 商品的 2000 个库存利用相似 ConcurrentHashMap 的原理将不同数量段位的库存的利用取模或者是 hash 算法让其扩容到不同的节点下来,这样这 2000 的库存就程度扩容到了多个 Redis 节点上,而后申请 Redis 拿库存的时候申请本来只能从一个 Redis 上取数据,当初能够从五个 Redis 上取数据,从而能够大大提高并发效率。

第四局部:总结与思考

综上可知,Redis 分布式锁并不是相对平安,Redis 分布式锁在某种极其状况下是无奈防止的,然而个别状况下生产上这种故障的概率极低,即便偶然有问题也是能够承受。

CAP 准则指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)这三个因素最多只能同时实现两点,不可能三者兼顾。鱼和熊掌不可兼得”,要么抉择 AP 要么抉择 CP,抉择 Redis 作为分布式锁的组件在于其单线程内存操作效率很高,且在高并发场景下也能够放弃很好的性能。

如果肯定要要求分布式锁百分百牢靠,那能够选用 Zookeeper 或者 MySQL 作为分布式锁,就能完满的解决锁平安的问题,然而抉择了一致性那就要失去可用性,所以 Zookeeper 或者 MySQL 实现的分布式锁的性能远不如 Redis 实现的分布式锁。

感激浏览此篇文章的你,若有有余的中央烦请指出,大家能够一起探讨学习,共同进步。

文 /harmony

关注得物技术,做最潮技术人!

正文完
 0