Redis实现分布式锁的原理
后面讲了Redis在理论业务场景中的利用,那么上面再来理解一下Redisson功能性场景的利用,也就是大家常常应用的分布式锁的实现场景。
对于分布式锁的概念,本文就不做形容。
•引入redisson依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
•编写简略的测试代码
public class RedissonTest { private static RedissonClient redissonClient; static { Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.221.128:6379"); redissonClient=Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateOrder"); //最多期待100秒、上锁10s当前主动解锁 if(rLock.tryLock(100,10,TimeUnit.SECONDS)){ System.out.println("获取锁胜利"); } Thread.sleep(2000); rLock.unlock(); redissonClient.shutdown(); } }
Redisson分布式锁的实现原理
你们会发现,通过redisson,非常简单就能够实现咱们所须要的性能,当然这只是redisson的冰山一角,redisson最弱小的中央就是提供了分布式个性的常用工具类。使得本来作为协调单机多线程并发程序的并发程序的工具包取得了协调分布式多级多线程并发零碎的能力,升高了程序员在分布式环境下解决分布式问题的难度,上面剖析一下RedissonLock的实现原理
RedissonLock.tryLock
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //通过tryAcquire办法尝试获取锁 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { //示意胜利获取到锁,间接返回 return true; } //省略局部代码....}
tryAcquire
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //leaseTime就是租约工夫,就是redis key的过期工夫。 if (leaseTime != -1) { //如果设置过期工夫 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else {//如果没设置了过期工夫,则从配置中获取key超时工夫,默认是30s过期 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //当tryLockInnerAsync执行完结后,触发上面回调 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //阐明出现异常,间接返回 return; } // lock acquired if (ttlRemaining == null) { //示意第一次设置锁键 if (leaseTime != -1) { //示意设置过超时工夫,更新internalLockLeaseTime,并返回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,启动Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;}
tryLockInnerAsync
通过lua脚本来实现加锁的操作
1.判断lock键是否存在,不存在间接调用hset存储以后线程信息并且设置过期工夫,返回nil,通知客户端间接获取到锁。2.判断lock键是否存在,存在则将重入次数加1,并从新设置过期工夫,返回nil,通知客户端间接获取到锁。3.被其它线程曾经锁定,返回锁有效期的剩余时间,通知客户端须要期待。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
对于Lua脚本,咱们稍后再解释。
unlock开释锁流程
开释锁的流程,脚本看起来会略微简单一点
1.如果lock键不存在,通过publish
指令发送一个音讯示意锁曾经可用。2.如果锁不是被以后线程锁定,则返回nil3.因为反对可重入,在解锁时将重入次数须要减14.如果计算后的重入次数>0,则从新设置过期工夫5.如果计算后的重入次数<=0,则发消息说锁曾经可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
RedissonLock有竞争的状况
有竞争的状况在redis端的lua脚本是雷同的,只是不同的条件执行不同的redis命令。当通过tryAcquire发现锁被其它线程申请时,须要进入期待竞争逻辑中
1.this.await返回false,阐明等待时间曾经超出获取锁最大等待时间,勾销订阅并返回获取锁失败2.this.await返回true,进入循环尝试获取锁。
持续看RedissonLock.tryLock后半局部代码如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//省略局部代码 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); // 订阅监听redis音讯,并且创立RedissonLockEntry RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞期待subscribe的future的后果对象,如果subscribe办法调用超过了time,阐明曾经超过了客户端设置的最大wait time,则间接返回false,勾销订阅,不再持续申请锁了。 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { //勾销订阅 subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); //示意抢占锁失败 return false; //返回false } try { //判断是否超时,如果期待超时,返回获的锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //通过while循环再次尝试竞争锁 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,返回锁超时工夫 // lock acquired if (ttl == null) { //如果超时工夫为null,阐明取得锁胜利 return true; } //判断是否超时,如果超时,示意获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 通过信号量(共享锁)阻塞,期待解锁音讯. (缩小申请锁调用的频率) // 如果剩余时间(ttl)小于wait time ,就在 ttl 工夫内,从Entry的信号量获取一个许可(除非被中断或者始终没有可用的许可)。 // 否则就在wait time 工夫范畴内期待能够通过信号量 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新等待时间(最大等待时间-曾经耗费的阻塞工夫) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { //获取锁失败 acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); //勾销订阅 }// return get(tryLockAsync(waitTime, leaseTime, unit)); }
锁过期了怎么办?
一般来说,咱们去取得分布式锁时,为了防止死锁的状况,咱们会对锁设置一个超时工夫,然而有一种状况是,如果在指定工夫内以后线程没有执行完,因为锁超时导致锁被开释,那么其余线程就会拿到这把锁,从而导致一些故障。
为了防止这种状况,Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的主动续约,简略来说,如果以后取得锁的线程没有执行完,那么Redisson会主动给Redis中指标key缩短超时工夫。
默认状况下,看门狗的续期工夫是30s,也能够通过批改Config.lockWatchdogTimeout来另行指定。
@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1, unit); //leaseTime=-1}
实际上,当咱们通过tryLock办法没有传递超时工夫时,默认会设置一个30s的超时工夫,避免出现死锁的问题。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,示意以后锁的过期工夫。 //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { //阐明出现异常,间接返回 return; } // lock acquired if (ttlRemaining == null) { //示意第一次设置锁键 if (leaseTime != -1) { //示意设置过超时工夫,更新internalLockLeaseTime,并返回 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,启动Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;}
因为默认设置了一个30s的过期工夫,为了避免过期之后以后线程还未执行完,所以通过定时工作对过期工夫进行续约。
•首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map构造,次要还是判断在这个服务实例中的加锁客户端的锁key是否存在,•如果曾经存在了,就间接返回;次要是思考到RedissonLock是可重入锁。
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else {// 第一次加锁的时候会调用,外部会启动WatchDog entry.addThreadId(threadId); renewExpiration(); }}
定义一个定时工作,该工作中调用renewExpirationAsync
办法进行续约。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //用到了工夫轮机制 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // renewExpirationAsync续约租期 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次距离租期的1/3工夫执行 ee.setTimeout(task);}
执行Lua脚本,对指定的key进行续约。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));}
Lua脚本
Lua是一个高效的轻量级脚本语言(和JavaScript相似),用规范C语言编写并以源代码模式凋谢, 其设计目标是为了嵌入应用程序中,从而为应用程序提供灵便的扩大和定制性能。Lua在葡萄牙语中是“月亮”的意思,它的logo模式卫星,寓意是Lua是一个“卫星语言”,可能不便地嵌入到其余语言中应用;其实在很多常见的框架中,都有嵌入Lua脚本的性能,比方OpenResty、Redis等。
应用Lua脚本的益处:
1.缩小网络开销,在Lua脚本中能够把多个命令放在同一个脚本中运行2.原子操作,redis会将整个脚本作为一个整体执行,两头不会被其余命令插入。换句话说,编写脚本的过程中无需放心会呈现竞态条件3.复用性,客户端发送的脚本会永远存储在redis中,这意味着其余客户端能够复用这一脚原本实现同样的逻辑
Lua的下载和装置
Lua是一个独立的脚本语言,所以它有专门的编译执行工具,上面简略带大家装置一下。
•下载Lua源码包: https://www.lua.org/download....•装置步骤如下
tar -zxvf lua-5.4.3.tar.gz cd lua-5.4.3 make linux make install
如果报错,说找不到readline/readline.h, 能够通过yum命令装置
yum -y install readline-devel ncurses-devel
最初,间接输出lua
命令即可进入lua的控制台。Lua脚本有本人的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的阐明,用过JavaScript的同学,应该只须要花几个小时就能够全副学完,简略演示两个案例如下。
array = {"Lua", "mic"}for i= 0, 2 do print(array[i])endarray = {"mic", "redis"}for key,value in ipairs(array)do print(key, value)end
Redis与Lua
Redis中集成了Lua的编译和执行器,所以咱们能够在Redis中定义Lua脚本去执行。同时,在Lua脚本中,能够间接调用Redis的命令,来操作Redis中的数据。
redis.call(‘set’,'hello','world')local value=redis.call(‘get’,’hello’)
redis.call 函数的返回值就是redis命令的执行后果,后面咱们介绍过redis的5中类型的数据返回的值的类型也都不一样,redis.call函数会将这5种类型的返回值转化对应的Lua的数据类型
在很多状况下咱们都须要脚本能够有返回值,毕竟这个脚本也是一个咱们所编写的命令集,咱们能够像调用其余redis内置命令一样调用咱们本人写的脚本,所以同样redis会主动将脚本返回值的Lua数据类型转化为Redis的返回值类型。 在脚本中能够应用return 语句将值返回给redis客户端,通过return语句来执行,如果没有执行return,默认返回为nil。
Redis中执行Lua脚本相干的命令
编写完脚本后最重要的就是在程序中执行脚本。Redis提供了EVAL命令能够使开发者像调用其余Redis内置命令一样调用脚本。
EVAL命令-执行脚本
[EVAL] [脚本内容] [key参数的数量] [key …] [arg …]
能够通过key和arg这两个参数向脚本中传递数据,他们的值能够在脚本中别离应用KEYS和ARGV 这两个类型的全局变量拜访。
比方咱们通过脚本实现一个set命令,通过在redis客户端中调用,那么执行的语句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述脚本相当于应用Lua脚本调用了Redis的set
命令,存储了一个key=lua,value=hello到Redis中。
EVALSHA命令
思考到咱们通过eval执行lua脚本,脚本比拟长的状况下,每次调用脚本都须要把整个脚本传给redis,比拟占用带宽。为了解决这个问题,redis提供了EVALSHA命令容许开发者通过脚本内容的SHA1摘要来执行脚本。该命令的用法和EVAL一样,只不过是将脚本内容替换成脚本内容的SHA1摘要
1.Redis在执行EVAL命令时会计算脚本的SHA1摘要并记录在脚本缓存中2.执行EVALSHA命令时Redis会依据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”
# 将脚本退出缓存并生成sha1命令script load "return redis.call('get','lua')"# ["13bd040587b891aedc00a72458cbf8588a27df90"]# 传递sha1的值来执行该命令evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
Redisson执行Lua脚本
通过lua脚本来实现一个拜访频率限度性能。
思路,定义一个key,key中蕴含ip地址。 value为指定工夫内的拜访次数,比如说是10秒内只能拜访3次。
•定义Lua脚本。
local times=redis.call('incr',KEYS[1]) -- 如果是第一次进来,设置一个过期工夫 if times == 1 then redis.call('expire',KEYS[1],ARGV[1]) end -- 如果在指定工夫内拜访次数大于指定次数,则返回0,示意拜访被限度 if times > tonumber(ARGV[2]) then return 0 end -- 返回1,容许被拜访 return 1
•定义controller,提供拜访测试方法
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA= "local times=redis.call('incr',KEYS[1])\n" + "if times == 1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end\n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException { List<Object> keys= Arrays.asList("LIMIT:"+id); RFuture<Object> future=redissonClient.getScript(). evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }
须要留神,上述脚本执行的时候会有问题,因为redis默认的序列化形式导致value的值在传递到脚本中时,转成了对象类型,须要批改redisson.yml
文件,减少codec的序列化形式。
•application.yml
spring: redis: redisson: file: classpath:redisson.yml
•redisson.yml
singleServerConfig: address: redis://192.168.221.128:6379 codec: !<org.redisson.codec.JsonJacksonCodec> {}
Lua脚本的原子性
redis的脚本执行是原子的,即脚本执行期间Redis不会执行其余命令。所有的命令必须期待脚本执行完当前能力执行。为了避免某个脚本执行工夫过程导致Redis无奈提供服务。Redis提供了lua-time-limit参数限度脚本的最长运行工夫。默认是5秒钟。
非事务性操作
当脚本运行工夫超过这个限度后,Redis将开始承受其余命令但不会执行(以确保脚本的原子性),而是返回BUSY的谬误,上面演示一下这种状况。
关上两个客户端窗口,在第一个窗口中执行lua脚本的死循环
eval "while true do end" 0
在第二个窗口中运行get lua
,会失去如下的异样。
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
咱们会发现执行后果是Busy, 接着咱们通过script kill 的命令终止以后执行的脚本,第二个窗口的显示又恢复正常了。
存在事务性操作
如果以后执行的Lua脚本对Redis的数据进行了批改(SET、DEL等),那么通过SCRIPT KILL 命令是不能终止脚本运行的,因为要保障脚本运行的原子性,如果脚本执行了一部分终止,那就违反了脚本原子性的要求。最终要保障脚本要么都执行,要么都不执行
同样关上两个窗口,第一个窗口运行如下命令
eval "redis.call('set','name','mic') while true do end" 0
在第二个窗口运行
get lua
后果一样,依然是busy,然而这个时候通过script kill命令,会发现报错,没方法kill。
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到这种状况,只能通过shutdown nosave命令来强行终止redis。
shutdown nosave和shutdown的区别在于 shutdown nosave不会进行长久化操作,意味着产生在上一次快照后的数据库批改都会失落。
Redisson的Lua脚本
理解了lua之后,咱们再回过头来看看Redisson的Lua脚本,就不难理解了。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
Redis中的Pub/Sub机制
上面是Redisson中开释锁的代码,在代码中咱们发现一个publish的指令redis.call('publish', KEYS[2], ARGV[1])
,这个指令是干啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
Redis提供了一组命令能够让开发者实现“公布/订阅”模式(publish/subscribe) . 该模式同样能够实现过程间的消息传递,它的实现原理是:
•公布/订阅模式蕴含两种角色,别离是发布者和订阅者。订阅者能够订阅一个或多个频道,而发布者能够向指定的频道发送音讯,所有订阅此频道的订阅者都会收到该音讯•发布者公布音讯的命令是PUBLISH, 用法是
PUBLISH channel message
比方向channel.1发一条音讯:hello
PUBLISH channel.1 “hello”
这样就实现了音讯的发送,该命令的返回值示意接管到这条音讯的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为0. 另外值得注意的是音讯发送进来不会长久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的音讯就收不到了
订阅者订阅音讯的命令是:
SUBSCRIBE channel [channel …]
该命令同时能够订阅多个频道,比方订阅channel.1的频道:SUBSCRIBE channel.1,执行SUBSCRIBE命令后客户端会进入订阅状态。
个别状况下,咱们不会用pub/sub来做音讯发送机制,毕竟有这么多MQ技术在。
举荐浏览:
阿里 P8 谈:播种不止 SOL 优化抓住 SQL 的实质,带你领略 SQL 的世界!