乐趣区

关于java:Redis-发布订阅小功能大用处真没那么废材

明天小黑哥来跟大家介绍一下 Redis 公布 / 订阅性能。

兴许有的小伙伴对这个性能比拟生疏,不太分明这个性能是干什么的,没关系小黑哥先来举个例子。

假如咱们有这么一个业务场景,在网站下单领取当前,须要告诉库存服务进行发货解决。

下面业务实现不难,咱们只有让库存服务提供给相干的给口,下单领取之后只有调用库存服务即可。

前面如果又有新的业务,比如说积分服务,他须要获取下单领取的后果,而后减少用户的积分。

这个实现也不难,让积分服务同样提供一个接口,下单领取之后只有调用库存服务即可。

如果就两个业务须要获取下单领取的后果,那也还好,程序革新也快。可是随着业务一直的倒退,越来越多的新业务说是要下单领取的后果。

这时咱们会发现下面这样的零碎架构存在很多问题:

第一,下单领取业务与其余业务重度耦合,每当有个新业务须要领取后果,就须要改变下单领取的业务。

第二,如果调用业务过多,会导致下单领取接口响应工夫变长。另外,如果有任一上游接口响应变慢,就会同步导致下单领取接口响应也变长。

第三,如果任一上游接口失败,可能导致数据不统一的状况。比如说下图,先调用 A,胜利之后再调用 B,最初再调用 C。

如果在调用 B 接口的产生异样,此时可能就导致下单领取接口返回失败,然而此时 A 接口其实曾经调用胜利,这就代表它外部曾经解决下单领取胜利的后果。

这样就会导致 A,B,C 三个上游接口,A 获取胜利获取领取后果,然而 B,C 没有拿到,导致三者零碎数据不统一的状况。

其实咱们认真想一下,对于下单领取业务来讲,它其实不须要关怀上游调用后果,只有有某种机制告诉能告诉到他们就能够了。

讲到这里,这就须要引入明天须要介绍公布订阅机制。

Redis 公布与订阅

Redis 提供了基于「公布 / 订阅」模式的音讯机制,在这种模式下,音讯发布者与订阅者不须要进行间接通信。

如上图所示,音讯发布者只须要想指定的频道公布音讯,订阅该频道的每个客户端都能够承受到到这个音讯。

应用 Redis 公布订阅这种机制,对于下面业务,下单领取业务只须要向 领取后果 这个频道发送音讯,其余上游业务订阅 领取后果 这个频道,就能收相应音讯,而后做出业务解决即可。

这样就能够解耦零碎上下游之间调用关系。

接下来咱们来看下,咱们来看下如何应用 Redis 公布订阅性能。

Redis 中提供了一组命令,能够用于公布音讯,订阅频道,勾销订阅以及依照模式订阅。

首先咱们来看下如何公布一条音讯,其实很简略只有应用 publish 指令:

publish channel message

上图中,咱们应用 publish 指令向 pay_result 这个频道发送了一条音讯。咱们能够看到 redis 向咱们返回 0,这其实代表以后订阅者个数,因为此时没有订阅,所以返回后果为 0。

接下来咱们应用 subscribe 订阅一个或多个频道

subscribe channel [channel ...]

如上图所示,咱们订阅 pay_result 这个频道,当有其余客户端往这个频道发送音讯,

以后订阅者就会收到音讯。

咱们子在应用订阅命令,须要次要几点:

第一,客户端执行订阅指令之后,就会进入订阅状态,之后就只能接管 subscribepsubscribeunsubscribepunsubscribe 这四个命令。

第二,新订阅的客户端,是 无奈收到这个频道之前的音讯,这是因为 Redis 并不会对公布的音讯长久化的。

相比于很多业余 MQ,比方 kafka、rocketmq 来说,redis 公布订阅性能就显得有点简陋了。不过 redis 公布订阅性能胜在简略,如果以后场景能够容忍这些毛病,还是能够抉择应用的。

除了下面的性能以外的,Redis 还反对模式匹配的订阅形式。简略来说,客户端能够订阅一个带 * 号的模式,如果某些频道的名字与这个模式匹配,那么当其余客户端发送给音讯给这些频道时,订阅这个模式的客户端也将会到收到音讯。

应用 Redis 订阅模式,咱们须要应用一个新的指令 psubscribe

咱们执行上面这个指令:

psubscribe pay.*

那么一旦有其余客户端往 pay 结尾的频道,比方 pay_resultpay_xxx,咱们都能够收到音讯。

如果须要勾销订阅模式,咱们须要应用相应punsubscribe 指令,比方勾销下面订阅的模式:

punsubscribe pay.*

Redis 客户端公布订阅应用形式

基于 Jedis 开发公布 / 订阅

聊完 Redis 公布订阅指令,咱们来看下 Java Redis 客户端如何应用公布订阅。

上面的例子次要基于 Jedis,maven 版本为:

<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>3.1.0</version>
</dependency>

其余 Redis 客户端大同小异。

jedis 公布代码比较简单,只须要调用 Jedis 类的 publish 办法。

// 生产环境千万不要这么应用哦,举荐应用 JedisPool 线程池的形式 
Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxxxx");
jedis.publish("pay_result", "hello world");

订阅的代码就绝对简单了,咱们须要继承 JedisPubSub 实现外面的相干办法,一旦有其余客户端往订阅的频道上发送音讯,将会调用 JedisPubSub 相应的办法。

private static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {System.out.println("收到订阅频道:" + channel + "音讯:" + message);

    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {System.out.println("收到具体订阅频道:" + channel + "订阅模式:" + pattern + "音讯:" + message);
    }

}

其次咱们须要调用 Jedis 类的 subscribe 办法:

Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("xxx");
jedis.subscribe(new MyListener(), "pay_result");

当有其余客户端往 pay_result频道发送音讯时,订阅将会收到音讯。

不过须要留神的是,jedis#subscribe 是一个阻塞办法,调用之后将会阻塞主线程的,所以如果须要在正式我的项目应用须要应用异步线程运行,这里就不演示具体的代码了。

基于 Spring-Data-Redis 开发公布订阅

原生 jedis 公布订阅操作,相对来说还是有点简单。当初咱们很多利用曾经基于 SpringBoot 开发,应用 spring-boot-starter-data-redis,能够简化公布订阅开发。

首先咱们须要引入相应的 startter 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>lettuce-core</artifactId>
            <groupId>io.lettuce</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

这里咱们应用 Jedis 当做底层连贯客户端,所以须要排除 lettuce,而后引入 Jedis 依赖。

而后咱们须要创立一个音讯接管类,外面须要有办法生产音讯:

@Slf4j
public class Receiver {private AtomicInteger counter = new AtomicInteger();

    public void receiveMessage(String message) {log.info("Received <" + message + ">");
        counter.incrementAndGet();}

    public int getCount() {return counter.get();
    }
}

接着咱们只须要注入 Spring- Redis 相干 Bean,比方:

  • StringRedisTemplate,用来操作 Redis 命令
  • MessageListenerAdapter,音讯监听器,能够在这个类注入咱们下面创立音讯承受类 Receiver
  • RedisConnectionFactory, 创立 Redis 底层连贯
@Configuration
public class MessageConfiguration {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅指定频道应用 ChannelTopic
        // 订阅模式应用 PatternTopic
        container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        // 注入 Receiver,指定类中的承受办法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver() {return new Receiver();
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {return new StringRedisTemplate(connectionFactory);
    }

}

最初咱们应用 StringRedisTemplate#convertAndSend 发送音讯,同时 Receiver 将会收到一条音讯。

@SpringBootApplication
public class MessagingRedisApplication {public static void main(String[] args) throws InterruptedException {ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        Receiver receiver = ctx.getBean(Receiver.class);

        while (receiver.getCount() == 0) {template.convertAndSend("pay_result", "Hello from Redis!");
            Thread.sleep(500L);
        }

        System.exit(0);
    }
}

Redis 公布订阅理论利用

Redis Sentinel 节点发现

Redis Sentinel 是 Redis 一套高可用计划,能够在主节点故障的时候,主动将从节点晋升为主节点,从而转移故障。

明天这里咱们不具体解释 Redis Sentinel 具体原理,次要来看下 Redis Sentinel 如何应用公布订阅机制。

Redis Sentinel 节点次要应用公布订阅机制,实现新节点的发现,以及替换主节点的之间的状态。

如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello 频道发送音讯,并且每个 Sentinel 都会订阅这个节点。

这样一旦有节点往这个频道发送音讯,其余节点就能够立即收到音讯。

这样一旦有的新节点退出,它往这个频道发送音讯,其余节点收到之后,判断本地列表并没有这个节点,于是就能够当做新的节点退出本地节点列表。

除此之外,每次往这个频道发送音讯内容能够蕴含节点的状态信息,这样能够作为前面 Sentinel 领导者选举的根据。

以上都是对于 Redis 服务端来讲,对于客户端来讲,咱们也能够用到公布订阅机制。

Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过公布订阅对外提供。

对于咱们客户端来讲,比较关心切换之后的主节点,这样咱们及时切换主节点的连贯(旧节点此时已故障,不能再承受操作指令),

客户端能够订阅 +switch-master频道,一旦 Redis Sentinel 完结了对主节点的故障转移就会公布主节点的的音讯。

redission 分布式锁

redission 开源框架提供一些便捷操作 Redis 的办法,其中比拟闻名的 redission 基于 Redis 的实现分布式锁。

明天咱们来看下 Redis 的实现分布式锁中如何应用 Redis 公布订阅机制,进步加锁的性能。

PS:redission 分布式锁实现原理,能够参考之前写过的文章:

  1. 可重入分布式锁的实现形式
  2. Redis 分布式锁,看似简略,其实真不简单

首先咱们来看下 redission 加锁的办法:

Redisson redisson = ....
RLock redissonLock = redisson.getLock("xxxx");
redissonLock.lock();

RLock 继承自 Java 规范的 Lock 接口, 调用 lock 办法, 如果以后锁已被其余客户端获取,那么以后加锁的线程将会被阻塞,直到其余客户端开释这把锁。

这里其实有个问题,以后阻塞的线程如何感知分布式锁已被开释呢?

这里其实有两种实现办法:

第一钟,定时查问散布时锁的状态,一旦查到锁已被开释(Redis 中不存在这个键值),那么就去加锁。

实现伪码如下:

while (true) {boolean result=lock();
  if (!result) {Thread.sleep(N);
  }
}

这种形式实现起来起来简略,不过毛病也比拟多。

如果定时工作工夫过短,将会导致查问次数过多,其实这些都是有效查问。

如果定时工作休眠工夫过长,那又会导致加锁工夫过长,导致加锁性能不好。

那么第二种实现计划,就是采纳服务告诉的机制,当分布式锁被开释之后,客户端能够收到锁开释的音讯,而后第一工夫再去加锁。

这个服务告诉的机制咱们能够应用 Redis 公布订阅模式。

当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx(xx 代表锁的名称)频道,应用异步线程监听音讯,而后利用 Java 中 Semaphore 使以后线程进入阻塞。

一旦其余客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx 发送解锁音讯。

等异步线程收到音讯,将会调用 Semaphore 开释信号量,从而让以后被阻塞的线程唤醒去加锁。

ps:这里只是简略形容了 redission 加锁局部原理,出于篇幅,这里就不再音讯解析源码。

感兴趣的小伙伴能够本人看下 redission 加锁的源码。

通过公布订阅机制,被阻塞的线程能够及时被唤醒,缩小有效的空转的查问,无效的进步的加锁的效率。

ps: 这种形式,性能的确进步,然而实现起来的复杂度也很高,这部分源码有点货色,快看晕了。

总结

明天咱们次要介绍 Redis 公布订阅性能,次要对应的 Redis 命令为:

  • subscribe channel [channel …] 订阅一个或多个频道
  • unsubscribe channel 退订指定频道
  • publish channel message 发送音讯
  • psubscribe pattern 订阅指定模式
  • punsubscribe pattern 退订指定模式

咱们能够利用 Redis 公布订阅性能,实现的简略 MQ 性能,实现上下游的解耦。

不过须要留神了,因为 Redis 公布的音讯不会被长久化,这就会导致新订阅的客户端将不会收到历史音讯。

所以,如果以后的业务场景不能容忍这些毛病,那还是用业余 MQ 吧。

最初介绍了两个应用 Redis 公布订阅性能应用场景供大家参考。

欢送关注我的公众号:程序通事,取得日常干货推送。如果您对我的专题内容感兴趣,也能够关注我的博客:studyidea.cn

退出移动版