死磕-java同步系列之redis分布式锁进化史

1次阅读

共计 11576 个字符,预计需要花费 29 分钟才能阅读完成。

问题

(1)redis 如何实现分布式锁?

(2)redis 分布式锁有哪些优点?

(3)redis 分布式锁有哪些缺点?

(4)redis 实现分布式锁有没有现成的轮子可以使用?

简介

Redis(全称:Remote Dictionary Server 远程字典服务)是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。

本章我们将介绍如何基于 redis 实现分布式锁,并把其实现的进化史从头到尾讲明白,以便大家在面试的时候能讲清楚 redis 分布式锁的来(忽)龙(悠)去(考)脉(官)。

实现锁的条件

基于前面关于锁(分布式锁)的学习,我们知道实现锁的条件有三个:

(1)状态(共享)变量,它是有状态的,这个状态的值标识了是否已经被加锁,在 ReentrantLock 中是通过控制 state 的值实现的,在 ZookeeperLock 中是通过控制子节点来实现的;

(2)队列,它是用来存放排队的线程,在 ReentrantLock 中是通过 AQS 的队列实现的,在 ZookeeperLock 中是通过子节点的有序性实现的;

(3)唤醒,上一个线程释放锁之后唤醒下一个等待的线程,在 ReentrantLock 中结合 AQS 的队列释放时自动唤醒下一个线程,在 ZookeeperLock 中是通过其监听机制来实现的;

那么上面三个条件是不是必要的呢?

其实不然,实现锁的必要条件只有第一个,对共享变量的控制,如果共享变量的值为 null 就给他设置个值(java 中可以使用 CAS 操作进程内共享变量),如果共享变量有值则不断重复检查其是否有值(重试),待锁内逻辑执行完毕再把共享变量的值设置回 null。

说白了,只要有个地方存这个共享变量就行了,而且要保证整个系统(多个进程)内只有这一份即可。

这也是 redis 实现分布式锁的关键【本篇文章由公众号“彤哥读源码”原创】。

redis 分布式锁进化史

进化史一——set

既然上面说了实现分布式锁只需要对共享变量控制到位即可,那么 redis 我们怎么控制这个共享变量呢?

首先,我们知道 redis 的基础命令有 get/set/del,通过这三个命令可以实现分布式锁吗?当然可以。

在获取锁之前先 get lock_user_1 看这个锁存不存在,如果不存在则再 set lock_user_1 value,如果存在则等待一段时间后再重试,最后使用完成了再删除这个锁del lock_user_1 即可。

但是,这种方案有个问题,如果一开始这个锁是不存在的,两个线程去同时 get,这个时候返回的都是 null(nil),然后这两个线程都去 set,这时候就出问题了,两个线程都可以 set 成功,相当于两个线程都获取到同一个锁了。

所以,这种方案不可行!

进化史二——setnx

上面的方案不可行的主要原因是多个线程同时 set 都是可以成功的,所以后来有了 setnx 这个命令,它是 set if not exist 的缩写,也就是如果不存在就 set。

可以看到,当重复对同一个 key 进行 setnx 的时候,只有第一次是可以成功的。

因此,方案二就是先使用 setnx lock_user_1 value 命令,如果返回 1 则表示加锁成功,如果返回 0 则表示其它线程先执行成功了,那就等待一段时间后重试,最后一样使用 del lock_user_1 释放锁。

但是,这种方案也有个问题,如果获取锁的这个客户端断线了怎么办?这个锁不是一直都不会释放吗?是的,是这样的。

所以,这种方案也不可行!

进化史三——setnx + setex

上面的方案不可行的主要原因是获取锁之后客户端断线了无法释放锁的问题,那么,我在 setnx 之后立马再执行 setex 可以吗?

答案是可以的,2.6.12 之前的版本使用 redis 实现分布式锁大家都是这么玩的。

因此,方案三就是先使用 setnx lock_user_1 value 命令拿到锁,再立即使用 setex lock_user_1 30 value 设置过期时间,最后使用 del lock_user_1 释放锁。

在 setnx 获取到锁之后再执行 setex 设置过期时间,这样就很大概率地解决了获取锁之后客户端断线不会释放锁的问题。

但是,这种方案依然有问题,如果 setnx 之后 setex 之前这个客户端就断线了呢?嗯~,似乎无解,不过这种概率实在是非常小,所以 2.6.12 之前的版本大家也都这么用,几乎没出现过什么问题。

所以,这种方案基本可用,只是不太好!

进化史四——set nx ex

上面的方案不太好的主要原因是 setnx/setex 是两条独立的命令,无法解决前者成功之后客户端断线的问题,那么,把两条命令合在一起不就行了吗?

是的,redis 官方也意识到这个问题了,所以 2.6.12 版本给 set 命令加了一些参数:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

EX,过期时间,单位秒

PX,过期时间,单位毫秒

NX,not exist,如果不存在才设置成功

XX,exist exist?如果存在才设置成功

通过这个命令我们就再也不怕客户端无故断线了【本篇文章由公众号“彤哥读源码”原创】。

因此,方案四就是先使用 set lock_user_1 value nx ex 30 获取锁,获取锁之后使用,使用完成了最后 del lock_user_1 释放锁。

然而,这种方案就没有问题吗?

当然有问题,其实这里的释放锁只要简单地执行 del lock_user_1 即可,并不会检查这个锁是不是当前客户端获取到的。

所以,这种方案还不是很完美。

进化史五——random value + lua script

上面的方案不完美的主要原因是释放锁这里控制的还不是很到位,那么有没有其它方法可以控制释放锁的线程和加锁的线程一定是同一个客户端呢?

redis 官方给出的方案是这样的:

 // 加锁
 SET resource_name my_random_value NX PX 30000
 
 // 释放锁
 if redis.call("get",KEYS[1]) == ARGV[1] then
     return redis.call("del",KEYS[1])
 else
     return 0
 end

加锁的时候,设置随机值,保证这个随机值只有当前客户端自己知道。

释放锁的时候,执行一段 lua 脚本,把这段 lua 脚本当成一个完整的命令,先检查这个锁对应的值是不是上面设置的随机值,如果是再执行 del 释放锁,否则直接返回释放锁失败。

我们知道,redis 是单线程的,所以这段 lua 脚本中的 get 和 del 不会存在并发问题,但是不能在 java 中先 get 再 del,这样会当成两个命令,会有并发问题,lua 脚本相当于是一个命令一起传输给 redis 的。

这种方案算是比较完美了,但是还有一点小缺陷,就是这个过期时间设置成多少合适呢?

设置的过小,有可能上一个线程还没执行完锁内逻辑,锁就自动释放了,导致另一个线程可以获取锁了,就出现并发问题了;

设置的过大,就要考虑客户端断线了,这个锁要等待很长一段时间。

所以,这里又衍生出一个新的问题,过期时间我设置小一点,但是快到期了它能自动续期就好了。

进化史六——redisson(redis2.8+)

上面方案的缺陷是过期时间不好把握,虽然也可以自己启一个监听线程来处理续期,但是代码实在不太好写,好在现成的轮子 redisson 已经帮我们把这个逻辑都实现好了,我们拿过来直接用就可以了。

而且,redisson 充分考虑了 redis 演化过程中留下的各种问题,单机模式、哨兵模式、集群模式,它统统都处理好了,不管是从单机进化到集群还是从哨兵进化到集群,都只需要简单地修改下配置就可以了,不用改动任何代码,可以说是非(业)常(界)方(良)便(心)。

redisson 实现的分布式锁内部使用的是 Redlock 算法,这是官方推荐的一种算法。

另外,redisson 还提供了很多分布式对象(分布式的原子类)、分布式集合(分布式的 Map/List/Set/Queue 等)、分布式同步器(分布式的 CountDownLatch/Semaphore 等)、分布式锁(分布式的公平锁 / 非公平锁 / 读写锁等),有兴趣的可以去看看,下面贴出链接:

Redlock 介绍:https://redis.io/topics/distlock

redisson 介绍:https://github.com/redisson/r…

代码实现

因为前面五种方案都已经过时,所以彤哥这里偷个懒,就不去一一实现的,我们直接看最后一种 redisson 的实现方式。

pom.xml 文件

添加 spring redis 及 redisson 的依赖,我这里使用的是 springboot 2.1.6 版本,springboot 1.x 版本的自己注意下,查看上面的 github 可以找到方法。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-21</artifactId>
    <version>3.11.0</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>

application.yml 文件

配置 redis 的连接信息,彤哥这里给出了三种方式。

spring:
  redis:
    # 单机模式
    #host: 192.168.1.102
    #port: 6379
    # password: <your passowrd>
    timeout: 6000ms  # 连接超时时长(毫秒)# 哨兵模式【本篇文章由公众号“彤哥读源码”原创】#    sentinel:
#      master: <your master>
#      nodes: 192.168.1.101:6379,192.168.1.102:6379,192.168.1.103:6379
    # 集群模式(三主三从伪集群)cluster:
      nodes:
        - 192.168.1.102:30001
        - 192.168.1.102:30002
        - 192.168.1.102:30003
        - 192.168.1.102:30004
        - 192.168.1.102:30005
        - 192.168.1.102:30006

Locker 接口

定义 Locker 接口。

public interface Locker {void lock(String key, Runnable command);
}

RedisLocker 实现类

直接使用 RedissonClient 获取锁,注意这里不需要再单独配置 RedissonClient 这个 bean,redisson 框架会根据配置自动生成 RedissonClient 的实例,我们后面说它是怎么实现的。

@Component
public class RedisLocker implements Locker {

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void lock(String key, Runnable command) {RLock lock = redissonClient.getLock(key);
        try {
            //【本篇文章由公众号“彤哥读源码”原创】lock.lock();
            command.run();} finally {lock.unlock();
        }
    }
}

测试类

启动 1000 个线程,每个线程内部打印一句话,然后睡眠 1 秒。


@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RedisLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testRedisLocker() throws IOException {for (int i = 0; i < 1000; i++) {new Thread(()->{locker.lock("lock", ()-> {
                    // 可重入锁测试
                    locker.lock("lock", ()-> {System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        try {Thread.sleep(1000);
                        } catch (InterruptedException e) {e.printStackTrace();
                        }
                    });
                });
            }, "Thread-"+i).start();}

        System.in.read();}
}

运行结果:

可以看到稳定在 1000ms 左右打印一句话,说明这个锁是可用的,而且是可重入的。

time: 1570100167046, threadName: Thread-756
time: 1570100168067, threadName: Thread-670
time: 1570100169080, threadName: Thread-949
time: 1570100170093, threadName: Thread-721
time: 1570100171106, threadName: Thread-937
time: 1570100172124, threadName: Thread-796
time: 1570100173134, threadName: Thread-944
time: 1570100174142, threadName: Thread-974
time: 1570100175167, threadName: Thread-462
time: 1570100176180, threadName: Thread-407
time: 1570100177194, threadName: Thread-983
time: 1570100178206, threadName: Thread-982
...

RedissonAutoConfiguration

刚才说 RedissonClient 不需要配置,其实它是在 RedissonAutoConfiguration 中自动配置的,我们简单看下它的源码,主要看 redisson()这个方法:


@Configuration
@ConditionalOnClass({Redisson.class, RedisOperations.class})
@AutoConfigureBefore(RedisAutoConfiguration.class)
@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
public class RedissonAutoConfiguration {

    @Autowired
    private RedissonProperties redissonProperties;
    
    @Autowired
    private RedisProperties redisProperties;
    
    @Autowired
    private ApplicationContext ctx;
    
    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {return new RedissonConnectionFactory(redisson);
    }
    
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(RedissonClient.class)
    public RedissonClient redisson() throws IOException {
        Config config = null;
        Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
        Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
        Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
        int timeout;
        if(null == timeoutValue){
            // 超时未设置则为 0
            timeout = 0;
        }else if (!(timeoutValue instanceof Integer)) {
            // 转毫秒
            Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
            timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue();} else {timeout = (Integer)timeoutValue;
        }
        
        // 看下是否给 redisson 单独写了一个配置文件
        if (redissonProperties.getConfig() != null) {
            try {InputStream is = getConfigStream();
                config = Config.fromJSON(is);
            } catch (IOException e) {
                // trying next format
                try {InputStream is = getConfigStream();
                    config = Config.fromYAML(is);
                } catch (IOException e1) {throw new IllegalArgumentException("Can't parse config", e1);
                }
            }
        } else if (redisProperties.getSentinel() != null) {
            // 如果是哨兵模式
            Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
            Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
            
            String[] nodes;
            // 看 sentinel.nodes 这个节点是列表配置还是逗号隔开的配置
            if (nodesValue instanceof String) {nodes = convert(Arrays.asList(((String)nodesValue).split(",")));
            } else {nodes = convert((List<String>)nodesValue);
            }
            
            // 生成哨兵模式的配置
            config = new Config();
            config.useSentinelServers()
                .setMasterName(redisProperties.getSentinel().getMaster())
                .addSentinelAddress(nodes)
                .setDatabase(redisProperties.getDatabase())
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
            // 如果是集群模式
            Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties);
            Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
            // 集群模式的 cluster.nodes 是列表配置
            List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject);
            
            String[] nodes = convert(nodesObject);
            
            // 生成集群模式的配置
            config = new Config();
            config.useClusterServers()
                .addNodeAddress(nodes)
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else {
            // 单机模式的配置
            config = new Config();
            String prefix = "redis://";
            Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
            // 判断是否走 ssl
            if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, redisProperties)) {prefix = "rediss://";}
            
            // 生成单机模式的配置
            config.useSingleServer()
                .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
                .setConnectTimeout(timeout)
                .setDatabase(redisProperties.getDatabase())
                .setPassword(redisProperties.getPassword());
        }
        
        return Redisson.create(config);
    }

    private String[] convert(List<String> nodesObject) {
        // 将哨兵或集群模式的 nodes 转换成标准配置
        List<String> nodes = new ArrayList<String>(nodesObject.size());
        for (String node : nodesObject) {if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {nodes.add("redis://" + node);
            } else {nodes.add(node);
            }
        }
        return nodes.toArray(new String[nodes.size()]);
    }

    private InputStream getConfigStream() throws IOException {
        // 读取 redisson 配置文件
        Resource resource = ctx.getResource(redissonProperties.getConfig());
        InputStream is = resource.getInputStream();
        return is;
    }

    
}

网上查到的资料中很多配置都是多余的(可能是版本问题),看下源码很清楚,这也是看源码的一个好处。

总结

(1)redis 由于历史原因导致有三种模式:单机、哨兵、集群;

(2)redis 实现分布式锁的进化史:set -> setnx -> setnx + setex -> set nx ex(或 px) -> set nx ex(或 px) + lua script -> redisson;

(3)redis 分布式锁有现成的轮子 redisson 可以使用;

(4)redisson 还提供了很多有用的组件,比如分布式集合、分布式同步器、分布式对象;

彩蛋

redis 分布式锁有哪些优点?

答:1)大部分系统都依赖于 redis 做缓存,不需要额外依赖其它组件(相对于 zookeeper 来说);

2)redis 可以集群部署,相对于 mysql 的单点更可靠;

3)不会占用 mysql 的连接数,不会增加 mysql 的压力;

4)redis 社区相对活跃,redisson 的实现更是稳定可靠;

5)利用过期机制解决客户端断线的问题,虽然不太及时;

6)有现成的轮子 redisson 可以使用,锁的种类比较齐全;

redis 分布式锁有哪些缺点?

答:1)集群模式下会在所有 master 节点执行加锁命令,大部分(2N+1)成功了则获得锁,节点越多,加锁的过程越慢;

2)高并发情况下,未获得锁的线程会睡眠重试,如果同一把锁竞争非常激烈,会占用非常多的系统资源;

3)历史原因导致的坑挺多的,自己很难实现出来健壮的 redis 分布式锁;

总之,redis 分布式锁的优点是大于缺点的,而且社区活跃,这也是我们大部分系统使用 redis 作为分布式锁的原因。

推荐阅读

1、死磕 java 同步系列之开篇

2、死磕 java 魔法类之 Unsafe 解析

3、死磕 java 同步系列之 JMM(Java Memory Model)

4、死磕 java 同步系列之 volatile 解析

5、死磕 java 同步系列之 synchronized 解析

6、死磕 java 同步系列之自己动手写一个锁 Lock

7、死磕 java 同步系列之 AQS 起篇

8、死磕 java 同步系列之 ReentrantLock 源码解析(一)——公平锁、非公平锁

9、死磕 java 同步系列之 ReentrantLock 源码解析(二)——条件锁

10、死磕 java 同步系列之 ReentrantLock VS synchronized

11、死磕 java 同步系列之 ReentrantReadWriteLock 源码解析

12、死磕 java 同步系列之 Semaphore 源码解析

13、死磕 java 同步系列之 CountDownLatch 源码解析

14、死磕 java 同步系列之 AQS 终篇

15、死磕 java 同步系列之 StampedLock 源码解析

16、死磕 java 同步系列之 CyclicBarrier 源码解析

17、死磕 java 同步系列之 Phaser 源码解析

18、死磕 java 同步系列之 mysql 分布式锁

19、死磕 java 同步系列之 zookeeper 分布式锁


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

正文完
 0