关于redis:阿里P6面试官Redis如何实现分布式锁锁过期了怎么办

28次阅读

共计 14654 个字符,预计需要花费 37 分钟才能阅读完成。

Redis 实现分布式锁的原理

后面讲了 Redis 在理论业务场景中的利用,那么上面再来理解一下 Redisson 功能性场景的利用,也就是大家常常应用的分布式锁的实现场景。

对于分布式锁的概念,本文就不做形容。

•引入 redisson 依赖

<dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson</artifactId>      <version>3.16.0</version>  </dependency>

•编写简略的测试代码

public class RedissonTest {private static RedissonClient redissonClient;      static {          Config config=new Config();          config.useSingleServer().setAddress("redis://192.168.221.128:6379");          redissonClient=Redisson.create(config);      }      public static void main(String[] args) throws InterruptedException {RLock rLock=redissonClient.getLock("updateOrder");          // 最多期待 100 秒、上锁 10s 当前主动解锁          if(rLock.tryLock(100,10,TimeUnit.SECONDS)){System.out.println("获取锁胜利");          }          Thread.sleep(2000);          rLock.unlock();          redissonClient.shutdown();      }  }

Redisson 分布式锁的实现原理

你们会发现,通过 redisson,非常简单就能够实现咱们所须要的性能,当然这只是 redisson 的冰山一角,redisson 最弱小的中央就是提供了分布式个性的常用工具类。使得本来作为协调单机多线程并发程序的并发程序的工具包取得了协调分布式多级多线程并发零碎的能力,升高了程序员在分布式环境下解决分布式问题的难度,上面剖析一下 RedissonLock 的实现原理

RedissonLock.tryLock

@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);    long current = System.currentTimeMillis();    long threadId = Thread.currentThread().getId();    // 通过 tryAcquire 办法尝试获取锁    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);    // lock acquired    if (ttl == null) {// 示意胜利获取到锁,间接返回        return true;}    // 省略局部代码....}

tryAcquire

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;    //leaseTime 就是租约工夫,就是 redis key 的过期工夫。if (leaseTime != -1) {// 如果设置过期工夫        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else {// 如果没设置了过期工夫,则从配置中获取 key 超时工夫, 默认是 30s 过期        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    // 当 tryLockInnerAsync 执行完结后,触发上面回调    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {// 阐明出现异常,间接返回            return;}        // lock acquired        if (ttlRemaining == null) {// 示意第一次设置锁键            if (leaseTime != -1) {// 示意设置过超时工夫,更新 internalLockLeaseTime,并返回                internalLockLeaseTime = unit.toMillis(leaseTime);            } else {//leaseTime=-1, 启动 Watch Dog                scheduleExpirationRenewal(threadId);            }        }    });    return ttlRemainingFuture;}

tryLockInnerAsync

通过 lua 脚本来实现加锁的操作

1. 判断 lock 键是否存在,不存在间接调用 hset 存储以后线程信息并且设置过期工夫, 返回 nil,通知客户端间接获取到锁。2. 判断 lock 键是否存在,存在则将重入次数加 1,并从新设置过期工夫,返回 nil,通知客户端间接获取到锁。3. 被其它线程曾经锁定,返回锁有效期的剩余时间,通知客户端须要期待。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                          "if (redis.call('exists', KEYS[1]) == 0) then" +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +                          "return nil;" +                          "end;" +                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +                          "return nil;" +                          "end;" +                          "return redis.call('pttl', KEYS[1]);",                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

对于 Lua 脚本,咱们稍后再解释。

unlock 开释锁流程

开释锁的流程,脚本看起来会略微简单一点

1. 如果 lock 键不存在,通过 publish 指令发送一个音讯示意锁曾经可用。2. 如果锁不是被以后线程锁定,则返回 nil3. 因为反对可重入,在解锁时将重入次数须要减 14. 如果计算后的重入次数 >0,则从新设置过期工夫 5. 如果计算后的重入次数 <=0,则发消息说锁曾经可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +                          "return nil;" +                          "end;" +                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +                          "if (counter > 0) then" +                          "redis.call('pexpire', KEYS[1], ARGV[2]);" +                          "return 0;" +                          "else" +                          "redis.call('del', KEYS[1]);" +                          "redis.call('publish', KEYS[2], ARGV[1]);" +                          "return 1;" +                          "end;" +                          "return nil;",                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}

RedissonLock 有竞争的状况

有竞争的状况在 redis 端的 lua 脚本是雷同的,只是不同的条件执行不同的 redis 命令。当通过 tryAcquire 发现锁被其它线程申请时,须要进入期待竞争逻辑中

1.this.await 返回 false,阐明等待时间曾经超出获取锁最大等待时间,勾销订阅并返回获取锁失败 2.this.await 返回 true,进入循环尝试获取锁。

持续看 RedissonLock.tryLock 后半局部代码如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 省略局部代码        time -= System.currentTimeMillis() - current;        if (time <= 0) {acquireFailed(waitTime, unit, threadId);            return false;        }        current = System.currentTimeMillis();       // 订阅监听 redis 音讯,并且创立 RedissonLockEntry        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);      // 阻塞期待 subscribe 的 future 的后果对象,如果 subscribe 办法调用超过了 time,阐明曾经超过了客户端设置的最大 wait time,则间接返回 false,勾销订阅,不再持续申请锁了。if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {// 勾销订阅                subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);                    }                });            }            acquireFailed(waitTime, unit, threadId); // 示意抢占锁失败            return false; // 返回 false        }        try {// 判断是否超时,如果期待超时,返回获的锁失败            time -= System.currentTimeMillis() - current;            if (time <= 0) {acquireFailed(waitTime, unit, threadId);                return false;            }            // 通过 while 循环再次尝试竞争锁            while (true) {long currentTime = System.currentTimeMillis();                ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 竞争锁,返回锁超时工夫                // lock acquired                if (ttl == null) {// 如果超时工夫为 null,阐明取得锁胜利                    return true;}                // 判断是否超时,如果超时,示意获取锁失败                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) {acquireFailed(waitTime, unit, threadId);                    return false;                }                // 通过信号量 (共享锁) 阻塞, 期待解锁音讯.  (缩小申请锁调用的频率)                // 如果剩余时间 (ttl) 小于 wait time , 就在 ttl 工夫内,从 Entry 的信号量获取一个许可(除非被中断或者始终没有可用的许可)。// 否则就在 wait time 工夫范畴内期待能够通过信号量                currentTime = System.currentTimeMillis();                if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                } else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);                }                // 更新等待时间(最大等待时间 - 曾经耗费的阻塞工夫)                time -= System.currentTimeMillis() - currentTime;                if (time <= 0) {// 获取锁失败                    acquireFailed(waitTime, unit, threadId);                    return false;                }            }        } finally {unsubscribe(subscribeFuture, threadId); // 勾销订阅        }//        return get(tryLockAsync(waitTime, leaseTime, unit));    }

锁过期了怎么办?

一般来说,咱们去取得分布式锁时,为了防止死锁的状况,咱们会对锁设置一个超时工夫,然而有一种状况是,如果在指定工夫内以后线程没有执行完,因为锁超时导致锁被开释,那么其余线程就会拿到这把锁,从而导致一些故障。

为了防止这种状况,Redisson 引入了一个 Watch Dog 机制,这个机制是针对分布式锁来实现锁的主动续约,简略来说,如果以后取得锁的线程没有执行完,那么 Redisson 会主动给 Redis 中指标 key 缩短超时工夫。

默认状况下,看门狗的续期工夫是 30s,也能够通过批改 Config.lockWatchdogTimeout 来另行指定。

@Overridepublic boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return tryLock(waitTime, -1, unit);  //leaseTime=-1}

实际上,当咱们通过 tryLock 办法没有传递超时工夫时,默认会设置一个 30s 的超时工夫,避免出现死锁的问题。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;    if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    } else {// 当 leaseTime 为 - 1 时,leaseTime=internalLockLeaseTime,默认是 30s,示意以后锁的过期工夫。//this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {// 阐明出现异常,间接返回            return;}        // lock acquired        if (ttlRemaining == null) {// 示意第一次设置锁键            if (leaseTime != -1) {// 示意设置过超时工夫,更新 internalLockLeaseTime,并返回                internalLockLeaseTime = unit.toMillis(leaseTime);            } else {//leaseTime=-1, 启动 Watch Dog                scheduleExpirationRenewal(threadId);            }        }    });    return ttlRemainingFuture;}

因为默认设置了一个 30s 的过期工夫,为了避免过期之后以后线程还未执行完,所以通过定时工作对过期工夫进行续约。

•首先,会先判断在 expirationRenewalMap 中是否存在了 entryName,这是个 map 构造,次要还是判断在这个服务实例中的加锁客户端的锁 key 是否存在,•如果曾经存在了,就间接返回;次要是思考到 RedissonLock 是可重入锁。

protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);    if (oldEntry != null) {oldEntry.addThreadId(threadId);    } else {// 第一次加锁的时候会调用,外部会启动 WatchDog        entry.addThreadId(threadId);        renewExpiration();}}

定义一个定时工作,该工作中调用 renewExpirationAsync 办法进行续约。

private void renewExpiration() {    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());    if (ee == null) {return;}    // 用到了工夫轮机制    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Override        public void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());            if (ent == null) {return;}            Long threadId = ent.getFirstThreadId();            if (threadId == null) {return;}            // renewExpirationAsync 续约租期            RFuture<Boolean> future = renewExpirationAsync(threadId);            future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock "+ getRawName() +" expiration", e);                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());                    return;                }                if (res) {// reschedule itself                    renewExpiration();                }            });        }    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 每次距离租期的 1 / 3 工夫执行    ee.setTimeout(task);}

执行 Lua 脚本,对指定的 key 进行续约。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +                          "return 1;" +                          "end;" +                          "return 0;",                          Collections.singletonList(getRawName()),                          internalLockLeaseTime, getLockName(threadId));}

Lua 脚本

Lua 是一个高效的轻量级脚本语言(和 JavaScript 相似),用规范 C 语言编写并以源代码模式凋谢,其设计目标是为了嵌入应用程序中,从而为应用程序提供灵便的扩大和定制性能。Lua 在葡萄牙语中是“月亮”的意思,它的 logo 模式卫星,寓意是 Lua 是一个“卫星语言”,可能不便地嵌入到其余语言中应用;其实在很多常见的框架中,都有嵌入 Lua 脚本的性能,比方 OpenResty、Redis 等。

应用 Lua 脚本的益处:

1. 缩小网络开销,在 Lua 脚本中能够把多个命令放在同一个脚本中运行 2. 原子操作,redis 会将整个脚本作为一个整体执行,两头不会被其余命令插入。换句话说,编写脚本的过程中无需放心会呈现竞态条件 3. 复用性,客户端发送的脚本会永远存储在 redis 中,这意味着其余客户端能够复用这一脚原本实现同样的逻辑

Lua 的下载和装置

Lua 是一个独立的脚本语言,所以它有专门的编译执行工具,上面简略带大家装置一下。

•下载 Lua 源码包:https://www.lua.org/download….•装置步骤如下

tar -zxvf lua-5.4.3.tar.gz  cd lua-5.4.3  make linux  make install

如果报错,说找不到 readline/readline.h, 能够通过 yum 命令装置

yum -y install readline-devel ncurses-devel

最初,间接输出 lua 命令即可进入 lua 的控制台。Lua 脚本有本人的语法、变量、逻辑运算符、函数等,这块我就不在这里做过多的阐明,用过 JavaScript 的同学,应该只须要花几个小时就能够全副学完,简略演示两个案例如下。

array = {"Lua", "mic"}for i= 0, 2 do   print(array[i])end
array = {"mic", "redis"}for key,value in ipairs(array)do   print(key, value)end

Redis 与 Lua

Redis 中集成了 Lua 的编译和执行器,所以咱们能够在 Redis 中定义 Lua 脚本去执行。同时,在 Lua 脚本中,能够间接调用 Redis 的命令,来操作 Redis 中的数据。

redis.call(‘set’,'hello','world')local value=redis.call(‘get’,’hello’)

redis.call 函数的返回值就是 redis 命令的执行后果,后面咱们介绍过 redis 的 5 中类型的数据返回的值的类型也都不一样,redis.call 函数会将这 5 种类型的返回值转化对应的 Lua 的数据类型

在很多状况下咱们都须要脚本能够有返回值,毕竟这个脚本也是一个咱们所编写的命令集,咱们能够像调用其余 redis 内置命令一样调用咱们本人写的脚本,所以同样 redis 会主动将脚本返回值的 Lua 数据类型转化为 Redis 的返回值类型。在脚本中能够应用 return 语句将值返回给 redis 客户端,通过 return 语句来执行,如果没有执行 return,默认返回为 nil。

Redis 中执行 Lua 脚本相干的命令

编写完脚本后最重要的就是在程序中执行脚本。Redis 提供了 EVAL 命令能够使开发者像调用其余 Redis 内置命令一样调用脚本。

EVAL 命令 - 执行脚本

[EVAL] [脚本内容] [key 参数的数量] [key …] [arg …]

能够通过 key 和 arg 这两个参数向脚本中传递数据,他们的值能够在脚本中别离应用 KEYSARGV 这两个类型的全局变量拜访。

比方咱们通过脚本实现一个 set 命令,通过在 redis 客户端中调用,那么执行的语句是:

eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

上述脚本相当于应用 Lua 脚本调用了 Redis 的 set 命令,存储了一个 key=lua,value=hello 到 Redis 中。

EVALSHA 命令

思考到咱们通过 eval 执行 lua 脚本,脚本比拟长的状况下,每次调用脚本都须要把整个脚本传给 redis,比拟占用带宽。为了解决这个问题,redis 提供了 EVALSHA 命令容许开发者通过脚本内容的 SHA1 摘要来执行脚本。该命令的用法和 EVAL 一样,只不过是将脚本内容替换成脚本内容的 SHA1 摘要

1.Redis 在执行 EVAL 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中 2. 执行 EVALSHA 命令时 Redis 会依据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”

# 将脚本退出缓存并生成 sha1 命令 script load "return redis.call('get','lua')"# ["13bd040587b891aedc00a72458cbf8588a27df90"]# 传递 sha1 的值来执行该命令 evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0

Redisson 执行 Lua 脚本

通过 lua 脚本来实现一个拜访频率限度性能。

思路,定义一个 key,key 中蕴含 ip 地址。value 为指定工夫内的拜访次数,比如说是 10 秒内只能拜访 3 次。

•定义 Lua 脚本。

local times=redis.call('incr',KEYS[1])  -- 如果是第一次进来,设置一个过期工夫  if times == 1 then     redis.call('expire',KEYS[1],ARGV[1])  end  -- 如果在指定工夫内拜访次数大于指定次数,则返回 0,示意拜访被限度  if times > tonumber(ARGV[2]) then     return 0  end  -- 返回 1,容许被拜访  return 1

•定义 controller,提供拜访测试方法

@RestController  public class RedissonController {@Autowired      RedissonClient redissonClient;      private final String LIMIT_LUA=          "local times=redis.call('incr',KEYS[1])\n" +          "if times == 1 then\n" +          "redis.call('expire',KEYS[1],ARGV[1])\n" +          "end\n" +          "if times > tonumber(ARGV[2]) then\n" +          "return 0\n" +          "end\n" +          "return 1";      @GetMapping("/lua/{id}")      public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException {List<Object> keys= Arrays.asList("LIMIT:"+id);          RFuture<Object> future=redissonClient.getScript().              evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);          return future.get().toString();      }  }

须要留神,上述脚本执行的时候会有问题,因为 redis 默认的序列化形式导致 value 的值在传递到脚本中时,转成了对象类型,须要批改 redisson.yml 文件,减少 codec 的序列化形式。

application.yml

spring:    redis:      redisson:        file: classpath:redisson.yml

redisson.yml

singleServerConfig:    address: redis://192.168.221.128:6379  codec: !<org.redisson.codec.JsonJacksonCodec> {}

Lua 脚本的原子性

redis 的脚本执行是原子的,即脚本执行期间 Redis 不会执行其余命令。所有的命令必须期待脚本执行完当前能力执行。为了避免某个脚本执行工夫过程导致 Redis 无奈提供服务。Redis 提供了 lua-time-limit 参数限度脚本的最长运行工夫。默认是 5 秒钟。

非事务性操作

当脚本运行工夫超过这个限度后,Redis 将开始承受其余命令但不会执行(以确保脚本的原子性),而是返回 BUSY 的谬误,上面演示一下这种状况。

关上两个客户端窗口,在第一个窗口中执行 lua 脚本的死循环

eval "while true do end" 0

在第二个窗口中运行get lua,会失去如下的异样。

(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.

咱们会发现执行后果是 Busy,接着咱们通过script kill 的命令终止以后执行的脚本,第二个窗口的显示又恢复正常了。

存在事务性操作

如果以后执行的 Lua 脚本对 Redis 的数据进行了批改(SET、DEL 等),那么通过 SCRIPT KILL 命令是不能终止脚本运行的,因为要保障脚本运行的原子性,如果脚本执行了一部分终止,那就违反了脚本原子性的要求。最终要保障脚本要么都执行,要么都不执行

同样关上两个窗口,第一个窗口运行如下命令

eval "redis.call('set','name','mic') while true do end" 0

在第二个窗口运行

get lua

后果一样,依然是 busy,然而这个时候通过 script kill 命令,会发现报错,没方法 kill。

(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.

遇到这种状况,只能通过 shutdown nosave 命令来强行终止 redis。

shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行长久化操作,意味着产生在上一次快照后的数据库批改都会失落。

Redisson 的 Lua 脚本

理解了 lua 之后,咱们再回过头来看看 Redisson 的 Lua 脚本,就不难理解了。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                          "if (redis.call('exists', KEYS[1]) == 0) then" +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +                          "return nil;" +                          "end;" +                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +                          "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +                          "redis.call('pexpire', KEYS[1], ARGV[1]);" +                          "return nil;" +                          "end;" +                          "return redis.call('pttl', KEYS[1]);",                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

Redis 中的 Pub/Sub 机制

上面是 Redisson 中开释锁的代码,在代码中咱们发现一个 publish 的指令redis.call('publish', KEYS[2], ARGV[1]),这个指令是干啥的呢?

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +                          "return nil;" +                          "end;" +                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +                          "if (counter > 0) then" +                          "redis.call('pexpire', KEYS[1], ARGV[2]);" +                          "return 0;" +                          "else" +                          "redis.call('del', KEYS[1]);" +                          "redis.call('publish', KEYS[2], ARGV[1]);" +                          "return 1;" +                          "end;" +                          "return nil;",                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}

Redis 提供了一组命令能够让开发者实现“公布 / 订阅”模式(publish/subscribe) . 该模式同样能够实现过程间的消息传递,它的实现原理是:

•公布 / 订阅模式蕴含两种角色,别离是发布者和订阅者。订阅者能够订阅一个或多个频道,而发布者能够向指定的频道发送音讯,所有订阅此频道的订阅者都会收到该音讯•发布者公布音讯的命令是 PUBLISH,用法是

PUBLISH channel message

比方向 channel.1 发一条音讯:hello

PUBLISH channel.1“hello”

这样就实现了音讯的发送,该命令的返回值示意接管到这条音讯的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为 0. 另外值得注意的是音讯发送进来不会长久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的音讯就收不到了

订阅者订阅音讯的命令是:

SUBSCRIBE channel [channel …]

该命令同时能够订阅多个频道,比方订阅 channel.1 的频道:SUBSCRIBE channel.1,执行 SUBSCRIBE 命令后客户端会进入订阅状态。

个别状况下,咱们不会用 pub/sub 来做音讯发送机制,毕竟有这么多 MQ 技术在。

举荐浏览:

阿里 P8 谈:播种不止 SOL 优化抓住 SQL 的实质,带你领略 SQL 的世界!

正文完
 0