乐趣区

关于redis:深入理解redisRedis分布式锁

1. 锁的品种

2. 一个健壮性高的分布式锁应该具备的特质

3. 单个 redis 分布式锁的演变

4. 多 redis 分布式锁

5. 总结

1. 锁的品种
咱们在日常的开发流动中,个别把锁分为两类:
1)同一个 JVM 里的锁 ,比方 synchronized 和 Lock,ReentrantLock 等等
2) 跨 JVM 的分布式锁,因为服务是集群部署的,单机版的锁不再起作用,资源在不同的服务器之间共享。

2. 一个健壮性高的分布式锁应该具备的特质
1) 独占性 任何时刻只能有 一个线程 持有锁
2) 高可用 在 redis 集群环境下, 不能因为某个节点挂了而呈现锁生效的状况
3) 防死锁 不能有死锁状况,要有 超时管制的性能
4) 不乱抢 不能 unlock 他人的锁, 本人的锁只能本人开释
5) 重入性 同一个节点的同一个线程取得锁之后, 能够再次取得这个锁

3. 单个 redis 分布式锁的演变

版本 1:单机版的锁

咱们先来看这样无锁的代码:

@RestController
public class GoodController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/buy_goods")
    public String buy_Goods()
    {String result = stringRedisTemplate.opsForValue().get("goods:001");
        int goodsNumber = result == null ? 0 : Integer.parseInt(result);

        if(goodsNumber > 0)
        {
            int realNumber = goodsNumber - 1;
            stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
            System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
            return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
        }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
        }

        return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
    }
}

以上的程序在进行商品销售的时候,并没有加锁,在并发下会造成超卖的景象。

应用 jMeter 进行压力测试:

因为在单机版的状况下,咱们能够应用 synchronize 或者 lock 来进行解决,上代码:

   @GetMapping("/buy_goods")
    public String buy_Goods() {synchronized (this) {String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if (goodsNumber > 0) {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort;
            } else {System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort);
            }

            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort;
        }
    }

运行后果:

然而这只是对单机版的程序无效,咱们启动两个微服务,再用 nginx 配置一下负载平衡,照样会产生超卖的景象:

服务器 1:

服务器 2:

看,第 189 件库存被卖了两次。

问题:分布式部署后,单机锁还是呈现超卖景象,这个时候就须要分布式锁!

版本 2:redis 的分布式锁

咱们应用redis 来进行加锁,避免超卖景象

    @GetMapping("/buy_goods/v2")
    public String buy_GoodsV2() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
        // 应用 redis 进行加锁 
        Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
        if(!flagLock)
        {return "争夺锁失败,o(╥﹏╥)o";
        }

        String result = stringRedisTemplate.opsForValue().get("goods:001");
        int goodsNumber = result == null ? 0 : Integer.parseInt(result);

        if(goodsNumber > 0)
        {
            int realNumber = goodsNumber - 1;
            stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
            stringRedisTemplate.delete(key);
            System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
            return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
        }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
        }

        return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
    }

此时,咱们运行一下代码:

服务 1:

服务 2:

问题:为什么卖了一件就卖不动了呢?因为咱们没有在卖完之后,没有进行对分布式锁的 key 进行解锁操作。

版本 3:在 finally 中,解除该锁

@GetMapping("/buy_goods/v3")
    public String buy_GoodsV3() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
            if(!flagLock)
            {return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {stringRedisTemplate.delete(key);
        }
    }

运行后果;
服务 1:

服务 2:

问题:如果 服务器宕机 了,代码层面基本没有走到 finally 这一块,就没有方法保障解锁,这个 key 没有被删除 咱们须要给 key 减少一个过期工夫!

版本 4:给 key 减少过期工夫

 @GetMapping("/buy_goods/v4")
    public String buy_GoodsV4() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
            // 减少过期工夫
            stringRedisTemplate.expire(key,10L, TimeUnit.SECONDS);
            if(!flagLock)
            {return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {stringRedisTemplate.delete(key);
        }
    }

问题:设置 key 和设置过期工夫不是原子性的,可能在这个期间,服务器宕机也是可能的。

版本 5:将设置 key 和设置 key 的过期工夫合并成一行,作为一个原子性操作

 @GetMapping("/buy_goods/v5")
    public String buy_GoodsV5() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            // 设置 key 和 key 的过期工夫合并为一行,是原子操作,底层为 setnx 命令
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {stringRedisTemplate.delete(key);
        }
    }

问题:delete key 的时候,可能咱们这个锁曾经过期了,删的是下一个线程的锁。

版本 6:删除 key 的时候,只能删除本人的,不能删除他人的,加一层判断

  @GetMapping("/buy_goods/v6")
    public String buy_GoodsV6(){
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            // 删除 key 的时候做一个判断
            if (stringRedisTemplate.opsForValue().get(key).equals(value)) {stringRedisTemplate.delete(key);
            }
        }
    }

问题:finally 块的判断 +del 删除操作不是原子性的, 可能判断完之后,锁就过期了,又删除了他人的锁。

版本 7:用 Lua 脚本,将保障判断和删除锁的原子性

    @GetMapping("/buy_goods/v7")
    public String buy_GoodsV7() throws Exception {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {Jedis jedis = RedisUtils.getJedis();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1]" +
                    "then" +
                    "return redis.call('del', KEYS[1])" +
                    "else" +
                    "return 0" +
                    "end";

            try {Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(value));
                if ("1".equals(result.toString())) {System.out.println("------del REDIS_LOCK_KEY success");
                }else{System.out.println("------del REDIS_LOCK_KEY error");
                }
            } finally {if(null != jedis) {jedis.close();
                }
            }
        }
    }

到这里,咱们根本的一个 redis 锁就造成了,个别公司写到这里差不太多了。

问题:此时咱们要确保,业务逻辑的运行工夫,要比咱们加锁的 key 过期工夫要短 如果业务逻辑运行工夫比咱们们的锁过期工夫更长,又会呈现锁隐没景象。

版本 8:应用 redission,不仅能解决后面所有问题,redission 自带的 watchDog,可能 定时刷新锁的过期工夫。

    @Autowired
    private Redisson redisson;


    @GetMapping("/buy_goods/v8")
    public String buy_GoodsV8()
    {
        String key = "redisLock";

        RLock redissonLock = redisson.getLock(key);
        redissonLock.lock();

        try
        {String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{System.out.println("商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄 / 流动完结 / 调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        }finally {if(redissonLock.isLocked() && redissonLock.isHeldByCurrentThread())
            {redissonLock.unlock();
            }
        }
    }

锁刷新要害逻辑:

![image.png](/img/bVcXV6s)

以上咱们便实现了单机版 redis 锁的编写。

问题:当初咱们都是用主从构造的 redis,当主节点的数据还没来得及同步到从节点,redis 主节点宕机了,仍然会造成锁失落。

4. 多 redis 分布式锁

咱们先将下面形容的问题再反复一次:

当用户调用 redis 的主节点,而且加锁胜利的时候,主节点还没来得及同步数据到从节点,主节点就挂了,导致锁失落,前面的线程就又开始加锁,就会造成脏数据。

解决方案:Redlock 算法

锁由多个 redis(都是主节点)一起保护,如果有了其中一个 redis 产生故障,还有其它 redis 能够兜底,锁依然是存在的。RedLock 算法是实现高牢靠分布式锁的一种无效的解决方案,能够在理论开发中应用。

@Configuration
public class RedisConfig extends CachingConfigurerSupport {


    /**
     * @param lettuceConnectionFactory
     * @return redis 序列化的工具配置类,上面这个请肯定开启配置
     * 127.0.0.1:6379> keys *
     * 1) "ord:102"  序列化过
     * 2) "\xac\xed\x00\x05t\x00\aord:102"   家养,没有序列化过
     */
    @Bean
    public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        // 设置 key 序列化形式 string
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        // 设置 value 的序列化形式 json
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }


    @Bean
    public Redisson redisson()
    {Config config = new Config();

        config.useSingleServer().setAddress("redis://192.168.111.140:6379").setDatabase(0);

        return (Redisson) Redisson.create(config);
    }


    @Bean
    public Redisson redissonClient1()
    {Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6380").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

    @Bean
    public Redisson redissonClient2()
    {Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6381").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

    @Bean
    public Redisson redissonClient3()
    {Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6382").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

}
public class RedLockController {


    public static final String CACHE_KEY_REDLOCK = "REDLOCK";

    @Autowired
    RedissonClient redissonClient1;

    @Autowired
    RedissonClient redissonClient2;

    @Autowired
    RedissonClient redissonClient3;

    @GetMapping(value = "/redlock")
    public void getlock() {
        //CACHE_KEY_REDLOCK 为 redis 分布式锁的 key
        RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
        RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
        RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
        // 三个锁汇聚成 redLock
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
        boolean isLock;
        try {
            //waitTime 锁的等待时间解决, 失常状况下 等 5s
            //leaseTime 就是 redis key 的过期工夫, 失常状况下等 5 分钟。isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS);
            if (isLock) {
                //TODO if get lock success, do something;
                // 暂停 20 秒钟线程
                try {TimeUnit.SECONDS.sleep(20);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        } catch (Exception e) {e.printStackTrace();
        } finally {
            // 无论如何, 最初都要解锁
            redLock.unlock();
            System.out.println(Thread.currentThread().getName() + "\t" + "redLock.unlock()");
        }
    }

}

5. 总结

这次咱们讲了分布式锁的演变

无锁 ->synchronized 单机锁 -> 单机 redis 分布式锁 -> 多机 redis 分布式锁。

退出移动版