关于redis:深入理解redisRedis分布式锁

1.锁的品种

2.一个健壮性高的分布式锁应该具备的特质

3.单个redis分布式锁的演变

4.多redis分布式锁

5.总结

1.锁的品种
咱们在日常的开发流动中,个别把锁分为两类:
1)同一个JVM里的锁,比方synchronized和Lock,ReentrantLock等等
2)跨JVM的分布式锁,因为服务是集群部署的,单机版的锁不再起作用,资源在不同的服务器之间共享。

2.一个健壮性高的分布式锁应该具备的特质
1)独占性 任何时刻只能有一个线程持有锁
2)高可用 在redis集群环境下,不能因为某个节点挂了而呈现锁生效的状况
3)防死锁 不能有死锁状况,要有超时管制的性能
4)不乱抢 不能unlock他人的锁,本人的锁只能本人开释
5)重入性 同一个节点的同一个线程取得锁之后,能够再次取得这个锁

3.单个redis分布式锁的演变

版本1:单机版的锁

咱们先来看这样无锁的代码:

@RestController
public class GoodController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/buy_goods")
    public String buy_Goods()
    {
        String result = stringRedisTemplate.opsForValue().get("goods:001");
        int goodsNumber = result == null ? 0 : Integer.parseInt(result);

        if(goodsNumber > 0)
        {
            int realNumber = goodsNumber - 1;
            stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
            System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
            return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
        }else{
            System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
        }

        return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
    }
}

以上的程序在进行商品销售的时候,并没有加锁,在并发下会造成超卖的景象。

应用jMeter进行压力测试:

因为在单机版的状况下,咱们能够应用synchronize或者lock来进行解决,上代码:

   @GetMapping("/buy_goods")
    public String buy_Goods() {
        synchronized (this) {
            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if (goodsNumber > 0) {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件" + "\t 服务器端口:" + serverPort;
            } else {
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort);
            }

            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾" + "\t 服务器端口:" + serverPort;
        }
    }

运行后果:

然而这只是对单机版的程序无效,咱们启动两个微服务,再用nginx配置一下负载平衡,照样会产生超卖的景象:

服务器1:

服务器2:

看,第189件库存被卖了两次。

问题:分布式部署后,单机锁还是呈现超卖景象,这个时候就须要分布式锁!

版本2:redis的分布式锁

咱们应用redis来进行加锁,避免超卖景象

    @GetMapping("/buy_goods/v2")
    public String buy_GoodsV2() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
        //应用redis进行加锁 
        Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
        if(!flagLock)
        {
            return "争夺锁失败,o(╥﹏╥)o";
        }

        String result = stringRedisTemplate.opsForValue().get("goods:001");
        int goodsNumber = result == null ? 0 : Integer.parseInt(result);

        if(goodsNumber > 0)
        {
            int realNumber = goodsNumber - 1;
            stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
            stringRedisTemplate.delete(key);
            System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
            return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
        }else{
            System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
        }

        return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
    }

此时,咱们运行一下代码:

服务1:

服务2:

问题:为什么卖了一件就卖不动了呢?因为咱们没有在卖完之后,没有进行对分布式锁的key进行解锁操作。

版本3:在finally中,解除该锁

@GetMapping("/buy_goods/v3")
    public String buy_GoodsV3() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
            if(!flagLock)
            {
                return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            stringRedisTemplate.delete(key);
        }
    }

运行后果;
服务1:

服务2:

问题:如果服务器宕机了,代码层面基本没有走到finally这一块,就没有方法保障解锁,这个key没有被删除咱们须要给key减少一个过期工夫!

版本4:给key减少过期工夫

 @GetMapping("/buy_goods/v4")
    public String buy_GoodsV4() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
            //减少过期工夫
            stringRedisTemplate.expire(key,10L, TimeUnit.SECONDS);
            if(!flagLock)
            {
                return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            stringRedisTemplate.delete(key);
        }
    }

问题:设置key和设置过期工夫不是原子性的,可能在这个期间,服务器宕机也是可能的。

版本5:将设置key和设置key的过期工夫合并成一行,作为一个原子性操作

 @GetMapping("/buy_goods/v5")
    public String buy_GoodsV5() {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            //设置key和key的过期工夫合并为一行,是原子操作,底层为setnx命令
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {
                return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            stringRedisTemplate.delete(key);
        }
    }

问题:delete key的时候,可能咱们这个锁曾经过期了,删的是下一个线程的锁。

版本6:删除key的时候,只能删除本人的,不能删除他人的,加一层判断

  @GetMapping("/buy_goods/v6")
    public String buy_GoodsV6(){
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {
                return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            //删除key的时候做一个判断
            if (stringRedisTemplate.opsForValue().get(key).equals(value)) {
                stringRedisTemplate.delete(key);
            }
        }
    }

问题:finally块的判断+del删除操作不是原子性的,可能判断完之后,锁就过期了,又删除了他人的锁。

版本7:用Lua脚本,将保障判断和删除锁的原子性

    @GetMapping("/buy_goods/v7")
    public String buy_GoodsV7() throws Exception {
        String key = "redisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try {
            Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);

            if(!flagLock)
            {
                return "抢锁失败,o(╥﹏╥)o";
            }

            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        } finally {
            Jedis jedis = RedisUtils.getJedis();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                    "then " +
                    "return redis.call('del', KEYS[1]) " +
                    "else " +
                    "   return 0 " +
                    "end";

            try {
                Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(value));
                if ("1".equals(result.toString())) {
                    System.out.println("------del REDIS_LOCK_KEY success");
                }else{
                    System.out.println("------del REDIS_LOCK_KEY error");
                }
            } finally {
                if(null != jedis) {
                    jedis.close();
                }
            }
        }
    }

到这里,咱们根本的一个redis锁就造成了,个别公司写到这里差不太多了。

问题:此时咱们要确保,业务逻辑的运行工夫,要比咱们加锁的key过期工夫要短如果业务逻辑运行工夫比咱们们的锁过期工夫更长,又会呈现锁隐没景象。

版本8:应用redission,不仅能解决后面所有问题,redission自带的watchDog,可能定时刷新锁的过期工夫。

    @Autowired
    private Redisson redisson;


    @GetMapping("/buy_goods/v8")
    public String buy_GoodsV8()
    {
        String key = "redisLock";

        RLock redissonLock = redisson.getLock(key);
        redissonLock.lock();

        try
        {
            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if(goodsNumber > 0)
            {
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort);
                return "你曾经胜利秒杀商品,此时还残余:" + realNumber + "件"+"\t 服务器端口:"+serverPort;
            }else{
                System.out.println("商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort);
            }
            return "商品曾经售罄/流动完结/调用超时,欢送下次光顾"+"\t 服务器端口:"+serverPort;
        }finally {
            if(redissonLock.isLocked() && redissonLock.isHeldByCurrentThread())
            {
                redissonLock.unlock();
            }
        }
    }

锁刷新要害逻辑:

![image.png](/img/bVcXV6s)

以上咱们便实现了单机版redis锁的编写。

问题:当初咱们都是用主从构造的redis,当主节点的数据还没来得及同步到从节点,redis主节点宕机了,仍然会造成锁失落。

4.多redis分布式锁

咱们先将下面形容的问题再反复一次:

当用户调用redis的主节点,而且加锁胜利的时候,主节点还没来得及同步数据到从节点,主节点就挂了,导致锁失落,前面的线程就又开始加锁,就会造成脏数据。

解决方案:Redlock算法

锁由多个redis(都是主节点)一起保护,如果有了其中一个redis产生故障,还有其它redis能够兜底,锁依然是存在的。RedLock算法是实现高牢靠分布式锁的一种无效的解决方案,能够在理论开发中应用。

@Configuration
public class RedisConfig extends CachingConfigurerSupport {


    /**
     * @param lettuceConnectionFactory
     * @return redis序列化的工具配置类,上面这个请肯定开启配置
     * 127.0.0.1:6379> keys *
     * 1) "ord:102"  序列化过
     * 2) "\xac\xed\x00\x05t\x00\aord:102"   家养,没有序列化过
     */
    @Bean
    public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        //设置key序列化形式string
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //设置value的序列化形式json
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }


    @Bean
    public Redisson redisson()
    {
        Config config = new Config();

        config.useSingleServer().setAddress("redis://192.168.111.140:6379").setDatabase(0);

        return (Redisson) Redisson.create(config);
    }


    @Bean
    public Redisson redissonClient1()
    {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6380").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

    @Bean
    public Redisson redissonClient2()
    {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6381").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

    @Bean
    public Redisson redissonClient3()
    {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.111.140:6382").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

}
public class RedLockController {


    public static final String CACHE_KEY_REDLOCK = "REDLOCK";

    @Autowired
    RedissonClient redissonClient1;

    @Autowired
    RedissonClient redissonClient2;

    @Autowired
    RedissonClient redissonClient3;

    @GetMapping(value = "/redlock")
    public void getlock() {
        //CACHE_KEY_REDLOCK为redis 分布式锁的key
        RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
        RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
        RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
        //三个锁汇聚成redLock
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
        boolean isLock;
        try {
            //waitTime 锁的等待时间解决,失常状况下 等5s
            //leaseTime就是redis key的过期工夫,失常状况下等5分钟。
            isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS);
            if (isLock) {
                //TODO if get lock success, do something;
                //暂停20秒钟线程
                try {
                    TimeUnit.SECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 无论如何, 最初都要解锁
            redLock.unlock();
            System.out.println(Thread.currentThread().getName() + "\t" + "redLock.unlock()");
        }
    }

}

5.总结

这次咱们讲了分布式锁的演变

无锁->synchronized单机锁->单机redis分布式锁->多机redis分布式锁。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理