关于后端:基于Redis实现的分布式锁

3次阅读

共计 10577 个字符,预计需要花费 27 分钟才能阅读完成。

\## 基于 Redis 实现的分布式锁
Spring Cloud 分布式环境下,同一个服务都是部署在不同的机器上,这种状况无奈像单体架构下数据一致性问题采纳加锁就实现数据一致性问题,在高并发状况下,对于分布式架构显然是不适合的,针对这种状况咱们就须要用到分布式锁了。

\## 哪些场景须要用分布式锁

场景一: 比拟敏感的数据比方金额批改,同一时间只能有一个人操作,设想下 2 集体同时批改金额,一个加金额一个减金额,为了避免同时操作造成数据不统一,须要锁,如果是数据库须要的就是行锁或表锁,如果是在集群里,多个客户端同时批改一个共享的数据就须要分布式锁。

场景二: 比方多台机器都能够定时执行某个工作,如果限度工作每次只能被一台机器执行,不能反复执行,就能够用分布式锁来做标记。

场景三: 比方秒杀场景,要求并发量很高,那么同一件商品只能被一个用户抢到,那么就能够应用分布式锁实现。

\## 分布式锁实现形式:

– 1、基于数据库实现分布式锁 
– 2、基于缓存(redis,memcached,tair)实现分布式锁
– 3、基于 Zookeeper 实现分布式锁
 
为什么不应用数据库?

数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉疾速切换到备库上。

没有生效工夫?只有做一个定时工作,每隔肯定工夫把数据库中的超时数据清理一遍。

非阻塞的?搞一个 while 循环,直到 insert 胜利再返回胜利。

非重入的?在数据库表中加个字段,记录以后取得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果以后机器的主机信息和线程信息在数据库能够查到的话,间接把锁调配给他就能够了。

大量申请下数据库往往是零碎的瓶颈,大量连贯,而后 sql 查问,简直所有工夫都节约到这些下面,所以往往状况下能内存操作就在内存操作,应用基于内存操作的 Redis 实现分布式锁,也能够依据需要抉择 ZooKeeper 来实现。

通过 Redis 的 Redlock 和 ZooKeeper 来加锁,性能有了比拟大的晋升,个别状况咱们依据理论场景抉择应用。

\##  分布式锁应该满足要求

– 互斥性 能够保障在分布式部署的利用集群中,同一个办法在同一时间只能被一台机器上的一个线程执行。
– 这把锁要是一把可重入锁(防止死锁)
– 不会产生死锁:有一个客户端在持有锁的过程中解体而没有解锁,也能保障其余客户端可能加锁
– 这把锁最好是一把阻塞锁(依据业务需要思考要不要这条)
– 有高可用的获取锁和开释锁性能
– 获取锁和开释锁的性能要好

\## Redis 实现分布式锁

Redis 实现分布式锁利用 \`SETNX\` 和 \`SETEX\`

根本命令次要有:

– SETNX(SET If Not Exists):当且仅当 Key 不存在时,则能够设置,否则不做任何动作。

当且仅当 key 不存在,将 key 的值设为 value,并返回 1;若给定的 key 曾经存在,则 SETNX 不做任何动作,并返回 0。

– SETEX:基于 SETNX 性能外, 还能够设置超时工夫,避免死锁。

分布式锁

分布式锁其实大白话,实质上要实现的指标 (客户端) 在 redis 中占一个地位,等到这个客户试用,别的人进来就必须得等着,等我试用完了,走了,你再来。感觉跟多线程锁一样,意思大抵是一样的,多线程是针对单机的,在同一个 Jvm 中,然而分布式石锁,是跨机器的,多个过程不同机器上发来得申请,去对同一个数据进行操作。

比方,分布式架构下的秒杀零碎,几万人对 10 个商品进行抢购,10 个商品存在 redis 中,就是示意 10 个地位,第一个人进来了,商品就剩 9 个了,第二个人进来就剩 8 个,在第一个人进来的时候,其他人必须等到 10 个商品数量胜利减去 1 之后你能力进来。

这个过程中第一个人进来的时候还没操作减 1 而后异样了,没有开释锁,而后前面人始终期待着,这就是死锁。真对这种状况能够设置超时工夫,如果超过 10s 中还是没进去,就让他超时生效。

redis 中提供了 \`setnx(set if not exists)\` 指令

\> setnx lock:codehole true  -- 锁定  
OK  
... do something xxxx...  数量减 1  
\> del lock:codehole           -- 开释锁  
(integer) 1  -- 胜利  

如果在减 1 期间产生异样 del 指令没有被调用 而后就始终等着,锁永远不会开释。

redis Redis 2.8 版本中提供了 setex(set if not exists) 指令
 setnx 和 expire 两个指令形成一个原子操作
给锁加上一个过期工夫

\> setex lock:codehole true  
OK  
\> expire lock:codehole 5  
... do something xxxx ...  
\> del lock:codehole  
(integer) 1  

\`SETEX 实现原理 \`

通过 SETNX 设置 Key-Value 来取得锁,随即进入死循环,每次循环判断,如果存在 Key 则持续循环,如果不存在 Key,则跳出循环,当前任务执行实现后,删除 Key 以开释锁。

实现步骤

pom.xml 导入 Redis 依赖

     <!\-\- redis-->  
        <dependency>  
            <groupId>org.springframework.boot</groupId>  
            <artifactId>spring-boot-starter-data-redis</artifactId>  
        </dependency>

        <dependency>  
            <groupId>org.projectlombok</groupId>  
            <artifactId>lombok</artifactId>  
            <version>1.16.10</version>  
            <scope>provided</scope>  
        </dependency>

  

增加配置文件 application.yml:

server:  
  port: 8080

  
spring:  
  profiles: dev  
  data:  
  redis:  
    # Redis 数据库索引(默认为 0)database: 0  
    # Redis 服务器地址  
    host: 127.0.0.1  
    # Redis 服务器连贯端口  
    port: 6379  
    # Redis 服务器连贯明码(默认为空)password:  

全局锁类

[@Data](https://my.oschina.net/difrik)  
public class Lock {  
    /**  
     \* key 名  
     */  
    private String name;  
    /**  
     \* value 值  
     */  
    private String value;

    public Lock(String name, String value) {  
        this.name = name;  
        this.value = value;  
    }

}

  

分布式锁类


[@Slf4j](https://my.oschina.net/slf4j)  
[@Component](https://my.oschina.net/u/3907912)  
public class DistributedLockConfig {

  
    /**  
     \* 单个业务持有锁的工夫 30s,避免死锁  
     */  
    private final static long LOCK_EXPIRE = 30 * 1000L;  
    /**  
     \* 默认 30ms 尝试一次  
     */  
    private final static long LOCK\_TRY\_INTERVAL = 30L;  
    /**  
     \* 默认尝试 20s  
     */  
    private final static long LOCK\_TRY\_TIMEOUT = 20 * 1000L;

    private RedisTemplate template;

    public void setTemplate(RedisTemplate template) {this.template = template;}

    /**  
     \* 尝试获取全局锁  
     *  
     \* [@param](https://my.oschina.net/u/2303379) lock 锁的名称  
     \* [@return](https://my.oschina.net/u/556800) true 获取胜利,false 获取失败  
     */  
    public boolean tryLock(Lock lock) {return getLock(lock, LOCK\_TRY\_TIMEOUT, LOCK\_TRY\_INTERVAL, LOCK_EXPIRE);  
    }

    /**  
     \* 尝试获取全局锁  
     \* SETEX:能够设置超时工夫  
     *  
     \* @param lock    锁的名称  
     \* @param timeout 获取超时工夫 单位 ms  
     \* @return true 获取胜利,false 获取失败  
     */  
    public boolean tryLock(Lock lock, long timeout) {return getLock(lock, timeout, LOCK\_TRY\_INTERVAL, LOCK_EXPIRE);  
    }

    /**  
     \* 尝试获取全局锁  
     *  
     \* @param lock        锁的名称  
     \* @param timeout     获取锁的超时工夫  
     \* @param tryInterval 多少毫秒尝试获取一次  
     \* @return true 获取胜利,false 获取失败  
     */  
    public boolean tryLock(Lock lock, long timeout, long tryInterval) {return getLock(lock, timeout, tryInterval, LOCK_EXPIRE);  
    }

    /**  
     \* 尝试获取全局锁  
     *  
     \* @param lock           锁的名称  
     \* @param timeout        获取锁的超时工夫  
     \* @param tryInterval    多少毫秒尝试获取一次  
     \* @param lockExpireTime 锁的过期  
     \* @return true 获取胜利,false 获取失败  
     */  
    public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {return getLock(lock, timeout, tryInterval, lockExpireTime);  
    }

  
    /**  
     \* 操作 redis 获取全局锁  
     *  
     \* @param lock           锁的名称  
     \* @param timeout        获取的超时工夫  
     \* @param tryInterval    多少 ms 尝试一次  
     \* @param lockExpireTime 获取胜利后锁的过期工夫  
     \* @return true 获取胜利,false 获取失败  
     */  
    public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {

        try {if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {return false;}  
            long startTime = System.currentTimeMillis();  
            do {if (!template.hasKey(lock.getName())) {ValueOperations<String, String> ops = template.opsForValue();  
                    ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS);  
                    return true;  
                } else {  
                    // 存在锁  
                    log.debug("lock is exist!!!");  
                }

                // 尝试超过了设定值之后间接跳出循环  
                if (System.currentTimeMillis() - startTime > timeout) {return false;}

                // 每隔多长时间尝试获取  
                Thread.sleep(tryInterval);  
            }  
            while (template.hasKey(lock.getName()));  
        } catch (InterruptedException e) {log.error(e.getMessage());  
            return false;  
        }  
        return false;  
    }

    /**  
     \* 获取锁  
     \* SETNX(SET If Not Exists):当且仅当 Key 不存在时,则能够设置,否则不做任何动作。*/  
    public Boolean getLockNoTime(Lock lock) {if (!StringUtils.isEmpty(lock.getName())) {return false;}

        // setIfAbsent 底层封装命令 是 setNX()  
        boolean falg = template.opsForValue().setIfAbsent(lock.getName(), lock.getValue());

        return false;  
    }

    /**  
     \* 开释锁  
     */  
    public void releaseLock(Lock lock) {if (!StringUtils.isEmpty(lock.getName())) {template.delete(lock.getName());  
        }  
    }

}

测试方法

    @RequestMapping("test")  
    public String index() {distributedLockConfig.setTemplate(redisTemplate);  
        Lock lock = new Lock("test", "test");  
        if (distributedLockConfig.tryLock(lock)) {  
            try {  
                // 为了演示锁的成果,这里睡眠 5000 毫秒  
                System.out.println("执行办法");  
                Thread.sleep(5000);  
            } catch (Exception e) {e.printStackTrace();  
            }  
            distributedLockConfig.releaseLock(lock);  
        }  
        return "hello world!";  
    }

开启两个浏览器窗口,执行办法,咱们能够看到两个浏览器在期待执行,当一个返回  hello world! 之后,如果没超时执行另一个也会返回 hello world! 两个办法彼此先后返回,阐明分布式锁执行胜利。

然而存在一个问题:

这段办法是先去查问 key 是否存在 redis 中,如果存在走循环,而后依据间隔时间去期待尝试获取,如果不存在则进行获取锁,如果等待时间超过超时工夫返回 false。

– 1 这种形式性能问题很差,每次获取锁都要进行期待,很是浪费资源,
– 2 如果在判断锁是否存在这儿 2 个或者 2 个以上的线程都查到 redis 中存在 key,同一时刻就无奈保障一个客户端持有锁,不具备排他性。

如果在集群环境下也会存在问题

如果在哨兵模式中 主节点获取到锁之后, 数据没有同步到从节点主节点挂掉了,这样数据完整性不能保障,另一个客户端申请过去,就会一把锁被两个客户端持有,会导致数据一致性出问题。

![在这里插入图片形容](https://img-blog.csdnimg.cn/20200502173534818.png?x-oss-process=image/watermark,type\_ZmFuZ3poZW5naGVpdGk,shadow\_10,text\_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zODM2MTM0Nw==,size\_16,color\_FFFFFF,t\_70)

对此 Redis 中还提供了另外一种实现分布式锁的办法 Redlock

\## 利用 Redlock

Redlock 是 redis 官网提出的实现分布式锁管理器的算法。这个算法会比个别的一般办法更加安全可靠。

为什么抉择红锁?
在集群中须要半数以上的节点批准能力取得锁,保障了数据的完整性,不会因为主节点数据存在,主节点挂了之后没有同步到从节点,导致数据失落。

Redlock 算法

应用场景

对于 Redis 集群模式尽量采纳这种分布式锁,保障高可用,数据一致性,就应用 Redlock 分布式锁。

pom.xml 减少依赖

<dependency>  
    <groupId>org.redisson</groupId>  
        <artifactId>redisson</artifactId>  
        <version>3.7.0</version>  
</dependency>  

获取锁后须要解决的逻辑

/**  
 \* 获取锁后须要解决的逻辑  
 */  
public interface AquiredLockWorker<T> {T invokeAfterLockAquire() throws Exception;  
}  

获取锁治理类

/**  
 \* 获取锁治理类  
 */  
public interface DistributedLocker {

    /**  
     \* 获取锁  
     \* @param resourceName  锁的名称  
     \* @param worker 获取锁后的解决类  
     \* @param <T>  
     \* @return 解决完具体的业务逻辑要返回的数据  
     \* @throws UnableToAquireLockException  
     \* @throws Exception  
     */  
    <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;

    <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;

}  

异样类

/**  
 \* 异样类  
 */  
public class UnableToAquireLockException extends RuntimeException {public UnableToAquireLockException() { }

    public UnableToAquireLockException(String message) {super(message);  
    }

    public UnableToAquireLockException(String message, Throwable cause) {super(message, cause);  
    }  
}  

获取 RedissonClient 连贯类

/**  
 \* 获取 RedissonClient 连贯类  
 */  
@Component  
public class RedissonConnector {  
    RedissonClient redisson;  
    @PostConstruct  
    public void init(){redisson = Redisson.create();  
    }

    public RedissonClient getClient(){return redisson;}

}  

分布式锁实现

@Component  
public class RedisLocker  implements DistributedLocker{

    private final static String LOCKER_PREFIX = "lock:";

    @Autowired  
    RedissonConnector redissonConnector;  
    @Override  
    public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {return lock(resourceName, worker, 100);  
    }

    @Override  
    public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {RedissonClient redisson= redissonConnector.getClient();  
        RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);  
        // Wait for 100 seconds seconds and automatically unlock it after lockTime seconds  
        boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);  
        if (success) {  
            try {return worker.invokeAfterLockAquire();  
            } finally {lock.unlock();  
            }  
        }  
        throw new UnableToAquireLockException();}  
}

测试方法

   ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);  
        for (int i = 0; i < 50; i++) {scheduledExecutorService.execute(new Worker());  
        }  
        scheduledExecutorService.shutdown();

  
 // 工作  
    class Worker implements Runnable {public Worker() { }  
        @Override  
        public void run() {  
            try {redisLocker.lock("tizz1100", new AquiredLockWorker<Object>() {  
                    @Override  
                    public Object invokeAfterLockAquire() {doTask();  
                        return null;  
                    }  
                });  
            } catch (Exception e) {}}

        void doTask() {System.out.println(Thread.currentThread().getName() + "----------" + LocalDateTime.now());  
            System.out.println(Thread.currentThread().getName() + "start");  
            Random random = new Random();  
            int _int = random.nextInt(200);  
            System.out.println(Thread.currentThread().getName() + "sleep" + _int + "millis");  
            try {Thread.sleep(_int);  
            } catch (InterruptedException e) {e.printStackTrace();  
            }  
            System.out.println(Thread.currentThread().getName() + "end");  
           
        }

    }  

参考资料:

https://blog.csdn.net/yue\_2018/article/details/89784454

https://blog.csdn.net/weixin\_34410662/article/details/85600084?utm\_medium=distribute.pc\_relevant.none-task-blog-OPENSEARCH-2&depth\_1-utm\_source=distribute.pc\_relevant.none-task-blog-OPENSEARCH-2

[代码地址](https://github.com/pomestyle/SpringBoot/tree/master/Springboot-Redis-SETEX)

正文完
 0