假如有这样一个场景,在一个购票软件上买一张票,然而此时残余票数只有一张或几张,这个时候有几十个人都在同时应用这个软件购票。在不思考任何影响下,失常的逻辑是首先判断以后是否还有残余的票,如果有,那么就进行购买并扣减库存数,否则就会提醒票数有余,购买失败。伪代码如下:void buyTicket() {
int stockNum = byTicketMapper.selectStockNum();if(stockNum>0){ //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存}else{ log.info("=====>票卖完了<====");}
}
复制代码这段代码在逻辑上没有问题,然而在并发场景下,可能会存在一个重大的问题。当残余票数为1时,有A,B两个用户同时点击了购买按钮,A用户通过了库存大于0的校验并开始执行购票逻辑,然而因为一些起因造成A用户的购票线程有短暂的阻塞。而在这个阻塞的过程中,用户B发动了购买申请,并且也通过了库存大于0的校验,直到整个购买流程执行实现并且扣减了库存。那么这个时候残余库存刚好为0,不会再有用户发动购买申请,这时用户A的购买申请阻塞被唤醒,因为在此之前曾经校验过库存大于0,所以执行完购买流程后,库存还会被扣减一次。那么此时的库存为-1,这就是常听到的超卖问题。
为了防止这个问题,咱们能够通过加锁了形式,来保障并发的安全性。像JVM提供的内置锁synchronized,JUC提供的重入锁ReentrantLock,然而这两种锁只能保障单机环境下并发平安问题,个别在理论工作中很少会部署单节点的我的项目,通常都是多节点集群部署,这两个锁就失去了意义。这个时候就能够借助redis来实现分布式锁。setnx在集群部署的状况下,通常应用redis来实现分布式锁。其中redis提供了setnx命令,标识只有key不存在时能力设值胜利,从而达到加锁的成果。上面通过redis来革新上述的代码,其形式是购票线程首先获取锁,如果获取锁胜利,那么继续执行购票业务流程,直到所有流程执行实现并扣减库存后,最终在开释锁。如果获取锁失败,那么就给出一个敌对的零碎提醒。void buyTicket() {
// 获取锁Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");if (lock) { int stockNum = byTicketMapper.selectStockNum(); if(stockNum>0){ //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存 }else{ log.info("=====>票卖完了<===="); } // 开释锁 redisTemplate.delete("lock");} else { log.info("=====>零碎忙碌,请稍后!<====");}
}
复制代码问题1:死锁问题通过下面的一顿梭哈,你认为这样就能够了吗,其实不然。构想一下,如果线程A在获取锁胜利后,在执行购票的逻辑中呈现了异样,那么这个时候就会造成锁得不到开释,其余线程始终获取不到锁,这就造成重大的死锁问题。为了防止死锁问题的呈现,咱们能够对异样进行捕捉,在finally中去开释锁,这样不论业务执行胜利或失败,最初都会去开释锁。void buyTicket() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");if (lock) { try { int stockNum = byTicketMapper.selectStockNum(); if (stockNum > 0) { //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存 } else { log.info("=====>票卖完了<===="); } }finally { redisTemplate.delete("lock"); // 开释锁 }} else { log.info("=====>零碎忙碌,请稍后!<====");}
}
复制代码你认为这就完结了吗?死锁就不会产生了吗?如果你认为这样就能防止死锁的产生,那你就太不仔细啦。如果在程序刚想像执行开释锁的逻辑时,redis服务忽然宕机了,那么这时锁开释就失败了。在将redis服务重启后,加锁的数据又被复原了,这样又呈现了死锁的景象。为了防止这个问题,能够为锁设置一个过期工夫,这样即便redis重启复原数据后,也会很快的过期掉。不过须要留神的是,在设置锁的过期工夫时,肯定要保障原子性操作,不然还是会呈现死锁问题。
//不是原子操作,会呈现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
//如果刚要执行该语句时,redis宕机了。下面的锁无奈开释
redisTemplate.expire("lock",Duration.ofSeconds(5L));
//原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", Duration.ofSeconds(5L));
复制代码问题2:锁被其余线程开释问题通过下面的又一顿梭哈,死锁的问题能够防止了,这样在高并发的状况下就能平安的执行了吗。如果锁的过期工夫设置了5秒,当A线程发动购票申请并获取到了锁,然而A线程在执行购票流程时破费了6秒,此时线程A的锁曾经过期。这时线程B从新获取了锁并且也开始执行购票流程,然而A线程要比B线程执行的要快,当A线程开释锁时,问题就呈现了。因为A线程执行的过程锁曾经过期了,那么在执行开释锁的流程时,最终被开释的是线程B的锁,这就导致B的锁被A线程开释问题。
对于这个景象,能够给每个锁设置一个惟一标识,比方像UUID,线程ID。在开释锁时,校验一下这个锁的标识是否为须要删除的锁,如果是,在进行锁的开释。public void buyTicket() {
String uuid = UUID.randomUUID().toString();// 为锁设置一个惟一标识Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, Duration.ofSeconds(5L));if (lock) { try { int stockNum = byTicketMapper.selectStockNum(); if (stockNum > 0) { //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存 } else { log.info("=====>票卖完了<===="); } }finally { String lockValue = redisTemplate.opsForValue().get("lock"); if(lockValue.equals(uuid)){ //校验标识,通过则开释锁 redisTemplate.delete("lock"); } }} else { log.info("=====>零碎忙碌,请稍后!<====");}
}
复制代码问题3:锁续期问题应用setnx命令做分布式锁时,无奈防止的一个问题就是:线程尚未执行实现,然而锁曾经过期。在解决锁被其余线程误删的代码中,并不是100%能解决的,问题点在于上面这段代码。如果线程A曾经执行到了if语句并且通过了判断,当刚要执行开释锁的逻辑时,线程A的锁过期了并且线程B从新获取到了锁,那么线程A在开释锁时,开释的是B的锁。为了齐全可能解决这个问题,能够采纳锁续期的形式,其实现形式是独自开一个线程用来定时监听线程的锁是否还被持有,如果还持有,那么就给这把锁减少一些过期工夫,这样就不会呈现上述问题了。目前市面上曾经为咱们提供了锁主动续期的中间件,比方redisson String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ // 线程A的锁过期
redisTemplate.delete("lock"); // 线程A删除了线程B的锁
}
复制代码Redissonredisson个别应用最多的场景就是分布式锁了,它不仅保障了并发场景下线程平安的问题,也解决了锁续期的问题。应用形式也比较简单,以3.5.7版本为例,首先须要配置redisson信息,依据本人的redis集群模式自由选择配置。在配置实现后,再来革新下面的购票办法。 @Bean
public RedissonClient redissonClient() { Config config = new Config(); // 单机配置 config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0); // 主从配置 // config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("",""); // 哨兵配置 // config.useSentinelServers().addSentinelAddress("").setMasterName(""); // Cluster配置 //config.useClusterServers().addNodeAddress(""); return Redisson.create(config);}
复制代码对于redisson应用起来也非常简单,通过getLock办法获取到RLock对象。通过RLock的tryLock或lock办法来进行加锁(底层都是通过Lua脚本来实现的)。当获取到锁并且扣减库存后,能够应用unlock办法进行锁开释。void buyTicket() {
RLock lock = redissonClient.getLock("lock");if (lock.tryLock()) { // 获取锁 try { int stockNum = byTicketMapper.selectStockNum(); if (stockNum > 0) { //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存 } else { log.info("=====>票卖完了<===="); } } finally { lock.unlock(); //开释锁 }} else { log.info("=====>零碎忙碌,请稍后!<====");}
}
复制代码Watch Dog机制那redisson是如何做到锁续期的呢?其实在redisson外部有一个看watch dog机制(看门狗机制),然而看门狗机制并不是在加锁时就能启动的。须要留神的是在加锁时,如果应用tryLock(long t1,long t2, TimeUnit unit)或lock(long t1,long t2, TimeUnit unit)办法并且将t2参数值设为了一个不为-1的值,那么看门口将无奈失效。看门狗在启动后会监听主线程还在执行,如果还在执行那么将会通过Lua脚本每10秒给锁续期30秒。watchlog的延时工夫默认为30秒,这个值能够在配置config时本人定义。private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) { // 如果leaseTime不是-1,那么将无奈应用看门狗 return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { if (future.isSuccess()) { Boolean ttlRemaining = (Boolean)future.getNow(); if (ttlRemaining) { // 看门口机制 RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture;}
}
复制代码private long lockWatchdogTimeout = 30000L; //默认30秒
复制代码private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) { // 每10秒执行续期 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { // 通过LUA脚本为锁续期 RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)}); future.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName()); if (!future.isSuccess()) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause()); } else { if ((Boolean)future.getNow()) { RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 每10秒执行一次 if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) { task.cancel(); }}
}
复制代码问题4:主从切换导致锁失落问题尽管redisson帮忙咱们解决了锁续期的问题,然而在redis集群架构中,因为主从复制具备肯定的延时,那么在极其状况下就会呈现这样一个问题:当一个线程获取锁胜利,并且胜利向主节点保留了锁信息,当主节点还未像从节点同步锁信息时,主节点宕机了,那么当产生故障转移从节点切换为主节点时,线程加的锁就失落了。为了解决这个问题,redis引入了红锁RedLock,RedLock与大多数中间件的选举机制相似,采纳过半的形式来决定操作胜利还是不胜利。RedLock加锁RedLock在工作中,并不承受redis的集群架构,无论是主从,哨兵还是Cluster。每台redis服务都是独立的,都是一台独立的Master节点。在加锁的过程中,RedLock会记录开始加锁时的工夫以及加锁胜利后的工夫,这两个时间差就是一台机器加锁胜利所须要的工夫。比方启动了5个redis服务,线程A设置锁的超时工夫为5秒,当像第一台redis服务加锁胜利后破费了1秒,像第二台服务加锁胜利后也破费了一秒。这个时候加到第二台机器时,曾经破费了两秒的工夫,然而加锁数并未过半,还须要加锁一台能力齐全算加锁胜利,这个时候第三台机器加锁胜利又破费了1秒。那么总的加锁工夫就是3秒,锁的理论过期工夫就为2秒。特地须要留神的是,在向redis服务建设网络连接时,要设置一个超时工夫,防止redis服务宕机时,客户端还在傻傻的期待回应,这里超时工夫官网给到倡议是5-50毫秒之间,当连贯超时时,客户端会持续向下一个节点发动连贯。
加锁失败如果因为某些起因,获取锁失败(加锁没有超半数或者取锁工夫曾经超过了无效工夫),客户端应该在所有的Redis实例上进行解锁,即使某些Redis实例基本就没有加锁胜利。失败重试在并发场景下,RedLock会呈现这样一个问题,比方有三个线程同时去获取了同一张票的锁,此时A线程曾经胜利给redis-1和reids-2加上了锁,线程B曾经胜利给redis-3,reids-4加上了锁,线程C胜利的给reids-5加上了锁,这个时候三个线程再去加锁时,没有机器可加了,发现加锁胜利数都未过半,那么就导致客户端始终获取不到锁。
当客户端无奈取到锁时,应该在随机提早肯定工夫,而后进行重试,避免多个客户端在同时争夺同一资源的锁。开释锁开释锁比较简单,向所有的Redis实例发送开释锁命令即可,不必关怀之前有没有从Redis实例胜利获取到锁.在理解了RedLock后,最初再来革新购票的代码逻辑。首先须要依据redis的实例数来定义对应的Bean实例,redis的实例起码要有三台。@Bean
public RedissonClient redissonClient() {
Config config = new Config();// 单机配置config.useSingleServer().setAddress("redis://192.168.36.128:3306").setDatabase(0);return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();// 单机配置config.useSingleServer().setAddress("redis://192.168.36.130:3306").setDatabase(0);return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();// 单机配置config.useSingleServer().setAddress("redis://192.168.36.131:3306").setDatabase(0);return Redisson.create(config);
}
复制代码在配置实现后,为每台实例都设置同一把锁,最初在调用RedissonRedLock提供的tryLock和unlock进行加锁和解锁。void buyTicket(){
RLock lock = redissonClient.getLock("lock");RLock lock2 = redissonClient2.getLock("lock");RLock lock3 = redissonClient3.getLock("lock");RedissonRedLock redLock = new RedissonRedLock(lock,lock2,lock3); // 别离像三台实例加锁if (redLock.tryLock()) { try { int stockNum = byTicketMapper.selectStockNum(); if (stockNum > 0) { //TODO 买票流程.... byTicketMapper.reduceStock(); // 扣减库存 } else { log.info("=====>票卖完了<===="); } } finally { redLock.unlock(); //开释锁 }} else { log.info("=====>零碎忙碌,请稍后!<====");}
}
复制代码总结在应用reids做分布式锁时,并没有设想中的那么简略,高并发场景下容易呈现死锁,锁被其余线程误删,锁续期,锁失落等问题,在理论开发中应该思考到这些问题并依据相应的解决办法来解决这些问题,从而保证系统的安全性。本文中可能会存在一些脱漏或谬误,后续会持续跟进。