乐趣区

关于后端:Redis分布式锁存在的问题

假如有这样一个场景,在一个购票软件上买一张票,然而此时残余票数只有一张或几张,这个时候有几十个人都在同时应用这个软件购票。在不思考任何影响下,失常的逻辑是首先判断以后是否还有残余的票,如果有,那么就进行购买并扣减库存数,否则就会提醒票数有余,购买失败。伪代码如下:void buyTicket() {

int stockNum = byTicketMapper.selectStockNum();
if(stockNum>0){
    //TODO 买票流程....
    byTicketMapper.reduceStock(); // 扣减库存}else{log.info("=====> 票卖完了 <====");
}

}
复制代码这段代码在逻辑上没有问题,然而在并发场景下,可能会存在一个重大的问题。当残余票数为 1 时,有 A,B 两个用户同时点击了购买按钮,A 用户通过了库存大于 0 的校验并开始执行购票逻辑,然而因为一些起因造成 A 用户的购票线程有短暂的阻塞。而在这个阻塞的过程中,用户 B 发动了购买申请,并且也通过了库存大于 0 的校验,直到整个购买流程执行实现并且扣减了库存。那么这个时候残余库存刚好为 0,不会再有用户发动购买申请,这时用户 A 的购买申请阻塞被唤醒,因为在此之前曾经校验过库存大于 0,所以执行完购买流程后,库存还会被扣减一次。那么此时的库存为 -1,这就是常听到的超卖问题。

为了防止这个问题,咱们能够通过加锁了形式,来保障并发的安全性。像 JVM 提供的内置锁 synchronized,JUC 提供的重入锁 ReentrantLock,然而这两种锁只能保障单机环境下并发平安问题,个别在理论工作中很少会部署单节点的我的项目,通常都是多节点集群部署,这两个锁就失去了意义。这个时候就能够借助 redis 来实现分布式锁。setnx 在集群部署的状况下,通常应用 redis 来实现分布式锁。其中 redis 提供了 setnx 命令,标识只有 key 不存在时能力设值胜利,从而达到加锁的成果。上面通过 redis 来革新上述的代码,其形式是购票线程首先获取锁,如果获取锁胜利,那么继续执行购票业务流程,直到所有流程执行实现并扣减库存后,最终在开释锁。如果获取锁失败,那么就给出一个敌对的零碎提醒。void buyTicket() {

// 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {int stockNum = byTicketMapper.selectStockNum();
    if(stockNum>0){
        //TODO 买票流程....
        byTicketMapper.reduceStock(); // 扣减库存}else{log.info("=====> 票卖完了 <====");
    }
    // 开释锁
    redisTemplate.delete("lock");
} else {log.info("=====> 零碎忙碌,请稍后!<====");
}

}
复制代码问题 1:死锁问题通过下面的一顿梭哈,你认为这样就能够了吗,其实不然。构想一下,如果线程 A 在获取锁胜利后,在执行购票的逻辑中呈现了异样,那么这个时候就会造成锁得不到开释,其余线程始终获取不到锁,这就造成重大的死锁问题。为了防止死锁问题的呈现,咱们能够对异样进行捕捉,在 finally 中去开释锁,这样不论业务执行胜利或失败,最初都会去开释锁。void buyTicket() {

Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
if (lock) {
    try {int stockNum = byTicketMapper.selectStockNum();
        if (stockNum > 0) {
            //TODO 买票流程....
            byTicketMapper.reduceStock(); // 扣减库存} else {log.info("=====> 票卖完了 <====");
        }
    }finally {redisTemplate.delete("lock");   // 开释锁
    }
} else {log.info("=====> 零碎忙碌,请稍后!<====");
}

}
复制代码你认为这就完结了吗?死锁就不会产生了吗?如果你认为这样就能防止死锁的产生,那你就太不仔细啦。如果在程序刚想像执行开释锁的逻辑时,redis 服务忽然宕机了,那么这时锁开释就失败了。在将 redis 服务重启后,加锁的数据又被复原了,这样又呈现了死锁的景象。为了防止这个问题,能够为锁设置一个过期工夫,这样即便 redis 重启复原数据后,也会很快的过期掉。不过须要留神的是,在设置锁的过期工夫时,肯定要保障原子性操作,不然还是会呈现死锁问题。
// 不是原子操作,会呈现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent(“lock”, “1”);
// 如果刚要执行该语句时,redis 宕机了。下面的锁无奈开释
redisTemplate.expire(“lock”,Duration.ofSeconds(5L));

// 原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent(“lock”, “1”, Duration.ofSeconds(5L));
复制代码问题 2:锁被其余线程开释问题通过下面的又一顿梭哈,死锁的问题能够防止了,这样在高并发的状况下就能平安的执行了吗。如果锁的过期工夫设置了 5 秒,当 A 线程发动购票申请并获取到了锁,然而 A 线程在执行购票流程时破费了 6 秒,此时线程 A 的锁曾经过期。这时线程 B 从新获取了锁并且也开始执行购票流程,然而 A 线程要比 B 线程执行的要快,当 A 线程开释锁时,问题就呈现了。因为 A 线程执行的过程锁曾经过期了,那么在执行开释锁的流程时,最终被开释的是线程 B 的锁,这就导致 B 的锁被 A 线程开释问题。

对于这个景象,能够给每个锁设置一个惟一标识,比方像 UUID,线程 ID。在开释锁时,校验一下这个锁的标识是否为须要删除的锁,如果是,在进行锁的开释。public void buyTicket() {

String uuid = UUID.randomUUID().toString();
// 为锁设置一个惟一标识
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, Duration.ofSeconds(5L));
if (lock) {
    try {int stockNum = byTicketMapper.selectStockNum();
        if (stockNum > 0) {
            //TODO 买票流程....
            byTicketMapper.reduceStock(); // 扣减库存} else {log.info("=====> 票卖完了 <====");
        }
    }finally {String lockValue = redisTemplate.opsForValue().get("lock");
        if(lockValue.equals(uuid)){ // 校验标识,通过则开释锁
            redisTemplate.delete("lock");   
        }
    }
} else {log.info("=====> 零碎忙碌,请稍后!<====");
}

}
复制代码问题 3:锁续期问题应用 setnx 命令做分布式锁时,无奈防止的一个问题就是:线程尚未执行实现,然而锁曾经过期。在解决锁被其余线程误删的代码中,并不是 100% 能解决的,问题点在于上面这段代码。如果线程 A 曾经执行到了 if 语句并且通过了判断,当刚要执行开释锁的逻辑时,线程 A 的锁过期了并且线程 B 从新获取到了锁,那么线程 A 在开释锁时,开释的是 B 的锁。为了齐全可能解决这个问题,能够采纳锁续期的形式,其实现形式是独自开一个线程用来定时监听线程的锁是否还被持有,如果还持有,那么就给这把锁减少一些过期工夫,这样就不会呈现上述问题了。目前市面上曾经为咱们提供了锁主动续期的中间件,比方 redisson String lockValue = redisTemplate.opsForValue().get(“lock”);
if(lockValue.equals(uuid)){// 线程 A 的锁过期

  redisTemplate.delete("lock");   // 线程 A 删除了线程 B 的锁

}
复制代码 Redissonredisson 个别应用最多的场景就是分布式锁了,它不仅保障了并发场景下线程平安的问题,也解决了锁续期的问题。应用形式也比较简单,以 3.5.7 版本为例,首先须要配置 redisson 信息,依据本人的 redis 集群模式自由选择配置。在配置实现后,再来革新下面的购票办法。@Bean

public RedissonClient redissonClient() {Config config = new Config();
    // 单机配置
    config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0);
    // 主从配置
    // config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("","");
    // 哨兵配置
    // config.useSentinelServers().addSentinelAddress("").setMasterName("");
    // Cluster 配置
    //config.useClusterServers().addNodeAddress("");
    return Redisson.create(config);
}

复制代码对于 redisson 应用起来也非常简单,通过 getLock 办法获取到 RLock 对象。通过 RLock 的 tryLock 或 lock 办法来进行加锁(底层都是通过 Lua 脚本来实现的)。当获取到锁并且扣减库存后,能够应用 unlock 办法进行锁开释。void buyTicket() {

RLock lock = redissonClient.getLock("lock");
if (lock.tryLock()) {  // 获取锁
    try {int stockNum = byTicketMapper.selectStockNum();
        if (stockNum > 0) {
            //TODO 买票流程....
            byTicketMapper.reduceStock(); // 扣减库存} else {log.info("=====> 票卖完了 <====");
        }
    } finally {lock.unlock(); // 开释锁
    }
} else {log.info("=====> 零碎忙碌,请稍后!<====");
}

}
复制代码 Watch Dog 机制那 redisson 是如何做到锁续期的呢?其实在 redisson 外部有一个看 watch dog 机制 (看门狗机制), 然而看门狗机制并不是在加锁时就能启动的。须要留神的是在加锁时,如果应用 tryLock(long t1,long t2, TimeUnit unit) 或 lock(long t1,long t2, TimeUnit unit)办法并且将 t2 参数值设为了一个不为 - 1 的值,那么看门口将无奈失效。看门狗在启动后会监听主线程还在执行,如果还在执行那么将会通过 Lua 脚本每 10 秒给锁续期 30 秒。watchlog 的延时工夫默认为 30 秒,这个值能够在配置 config 时本人定义。private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {

if (leaseTime != -1L) { // 如果 leaseTime 不是 -1,那么将无奈应用看门狗
    return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    ttlRemainingFuture.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {if (future.isSuccess()) {Boolean ttlRemaining = (Boolean)future.getNow();
                if (ttlRemaining) {
                    // 看门口机制
                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                }

            }
        }
    });
    return ttlRemainingFuture;
}

}
复制代码 private long lockWatchdogTimeout = 30000L; // 默认 30 秒
复制代码 private void scheduleExpirationRenewal(final long threadId) {

if (!expirationRenewalMap.containsKey(this.getEntryName())) {
    // 每 10 秒执行续期
    Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {
        // 通过 LUA 脚本为锁续期
            RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
            future.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                    if (!future.isSuccess()) {RedissonLock.log.error("Can't update lock "+ RedissonLock.this.getName() +" expiration", future.cause());
                    } else {if ((Boolean)future.getNow()) {RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
        }
    }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 每 10 秒执行一次
    if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {task.cancel();
    }

}

}
复制代码问题 4:主从切换导致锁失落问题尽管 redisson 帮忙咱们解决了锁续期的问题,然而在 redis 集群架构中,因为主从复制具备肯定的延时,那么在极其状况下就会呈现这样一个问题:当一个线程获取锁胜利,并且胜利向主节点保留了锁信息,当主节点还未像从节点同步锁信息时,主节点宕机了,那么当产生故障转移从节点切换为主节点时,线程加的锁就失落了。为了解决这个问题,redis 引入了红锁 RedLock,RedLock 与大多数中间件的选举机制相似,采纳过半的形式来决定操作胜利还是不胜利。RedLock 加锁 RedLock 在工作中,并不承受 redis 的集群架构,无论是主从,哨兵还是 Cluster。每台 redis 服务都是独立的,都是一台独立的 Master 节点。在加锁的过程中,RedLock 会记录开始加锁时的工夫以及加锁胜利后的工夫,这两个时间差就是一台机器加锁胜利所须要的工夫。比方启动了 5 个 redis 服务,线程 A 设置锁的超时工夫为 5 秒,当像第一台 redis 服务加锁胜利后破费了 1 秒,像第二台服务加锁胜利后也破费了一秒。这个时候加到第二台机器时,曾经破费了两秒的工夫,然而加锁数并未过半,还须要加锁一台能力齐全算加锁胜利,这个时候第三台机器加锁胜利又破费了 1 秒。那么总的加锁工夫就是 3 秒,锁的理论过期工夫就为 2 秒。特地须要留神的是,在向 redis 服务建设网络连接时,要设置一个超时工夫,防止 redis 服务宕机时,客户端还在傻傻的期待回应,这里超时工夫官网给到倡议是 5 -50 毫秒之间,当连贯超时时,客户端会持续向下一个节点发动连贯。

加锁失败如果因为某些起因,获取锁失败(加锁没有超半数或者取锁工夫曾经超过了无效工夫),客户端应该在所有的 Redis 实例上进行解锁,即使某些 Redis 实例基本就没有加锁胜利。失败重试在并发场景下,RedLock 会呈现这样一个问题,比方有三个线程同时去获取了同一张票的锁,此时 A 线程曾经胜利给 redis- 1 和 reids- 2 加上了锁,线程 B 曾经胜利给 redis-3,reids- 4 加上了锁,线程 C 胜利的给 reids- 5 加上了锁,这个时候三个线程再去加锁时,没有机器可加了,发现加锁胜利数都未过半,那么就导致客户端始终获取不到锁。

当客户端无奈取到锁时,应该在随机提早肯定工夫,而后进行重试, 避免多个客户端在同时争夺同一资源的锁。开释锁开释锁比较简单,向所有的 Redis 实例发送开释锁命令即可,不必关怀之前有没有从 Redis 实例胜利获取到锁. 在理解了 RedLock 后,最初再来革新购票的代码逻辑。首先须要依据 redis 的实例数来定义对应的 Bean 实例,redis 的实例起码要有三台。@Bean
public RedissonClient redissonClient() {

Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.128:3306").setDatabase(0);
return Redisson.create(config);

}

@Bean
public RedissonClient redissonClient2() {

Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.130:3306").setDatabase(0);
return Redisson.create(config);

}

@Bean
public RedissonClient redissonClient3() {

Config config = new Config();
// 单机配置
config.useSingleServer().setAddress("redis://192.168.36.131:3306").setDatabase(0);
return Redisson.create(config);

}
复制代码在配置实现后,为每台实例都设置同一把锁,最初在调用 RedissonRedLock 提供的 tryLock 和 unlock 进行加锁和解锁。void buyTicket(){

RLock lock = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
RedissonRedLock redLock = new RedissonRedLock(lock,lock2,lock3); // 别离像三台实例加锁
if (redLock.tryLock()) {
    try {int stockNum = byTicketMapper.selectStockNum();
        if (stockNum > 0) {
            //TODO 买票流程....
            byTicketMapper.reduceStock(); // 扣减库存} else {log.info("=====> 票卖完了 <====");
        }
    } finally {redLock.unlock();  // 开释锁
    }
} else {log.info("=====> 零碎忙碌,请稍后!<====");
}

}
复制代码总结在应用 reids 做分布式锁时,并没有设想中的那么简略,高并发场景下容易呈现死锁,锁被其余线程误删,锁续期,锁失落等问题,在理论开发中应该思考到这些问题并依据相应的解决办法来解决这些问题,从而保证系统的安全性。本文中可能会存在一些脱漏或谬误,后续会持续跟进。

退出移动版