乐趣区

SpringBoot之Redis定时发送消息

SpringBoot 之 Redis 定时发送消息

一、需求

实时发送定时公告,倒计时功能通过监听 Redis 缓存过期(Key 失效)事件。类似用途可以用于订单定时关闭,商品或活动上下架。

二、修改 redis.conf 文件,打开 notify-keyspace-events Ex 的注释,开启过期通知功能

############################# EVENT NOTIFICATION ##############################

# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
#  K     Keyspace events, published with __keyspace@<db>__ prefix.
#  E     Keyevent events, published with __keyevent@<db>__ prefix.
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
#  $     String commands
#  l     List commands
#  s     Set commands
#  h     Hash commands
#  z     Sorted set commands
#  x     Expired events (events generated every time a key expires)
#  e     Evicted events (events generated when a key is evicted for maxmemory)
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.
#
#  The "notify-keyspace-events" takes as argument a string that is composed
#  of zero or multiple characters. The empty string means that notifications
#  are disabled.
#
#  Example: to enable list and generic events, from the point of view of the
#           event name, use:
#
#  notify-keyspace-events Elg
#
#  Example 2: to get the stream of the expired keys subscribing to channel
#             name __keyevent@0__:expired use:
#
#  notify-keyspace-events Ex
#
#  By default all notifications are disabled because most users don't need
#  this feature and the feature has some overhead. Note that if you don't
#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events ""

三、重启 redis , 测试监听事件是否开启

__keyevent@__:expired 其实是 指所有库,可以指定库下标监听 16 个默认数据库的某一个,比如 keyevent@1 指定壹号 库。打开redisclientA,PSUBSCRIBE 指令订阅事件。

127.0.0.1:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1

再开启另一个redisclientB,发送过期数据指定事件 2 秒。

127.0.0.1:6379> setex test 2 2
OK

redisclientA 就会监听到 redisclientB 过期 key

127.0.0.1:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "test"

四、监听器配置和实现

KeyExpirationEvent1MessageListener 可参考 org.springframework.data.redis.listener.KeyExpirationEventMessageListener 源码实现的(默认订阅的是keyevent@*:expired),而我们目标是监听壹号库。因为 0 号库给限流和 oauth2 用了,里面存在很多短期 key,会监听许多不相干的业务 key 缓存。此外,不能给 KeyExpirationEvent1MessageListener 加上 @Component,因为存在 bean 循环依赖问题,可以通过 SpringContextHolder 解决。

@Slf4j
public class KeyExpirationEvent1MessageListener extends KeyExpirationEventMessageListener {private static final Topic KEYEVENT1_EXPIRED_TOPIC = new PatternTopic("__keyevent@1__:expired");

    /**
     * @param listenerContainer must not be {@literal null}.
     */
    public KeyExpirationEvent1MessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);
    }

    @Override
    public void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT1_EXPIRED_TOPIC);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取过期的 key
        String expireKey = message.toString();
        // 设置监听频道
        if (expireKey.startsWith(RedisConstant.NOTIFY_RECEIVE)) {log.info("过期的键值对的消息 ID:" + expireKey);
            log.info("消息监听频道 topic:" + new String(message.getChannel()));
// 获取消息发送 id,通过
            String sendId = expireKey.substring(RedisConstant.NOTIFY_RECEIVE.length());
            SysNotifySendService sysNotifySendService =  SpringContextHolder.getBean(SysNotifySendService.class);
//common 服务提供 websocket 发送远程接口 RemoteCommonService
            RemoteCommonService remoteCommonService =  SpringContextHolder.getBean(RemoteCommonService.class);
            SysNotifyReceiveService receiveService =  SpringContextHolder.getBean(SysNotifyReceiveService.class);
            SysNotifySend sysNotifySend = sysNotifySendService.getOne(Wrappers.<SysNotifySend>lambdaQuery().eq(SysNotifySend::getSendId, sendId));
            com.gdjs.gold.admin.api.dto.Message websocketMsg = new com.gdjs.gold.admin.api.dto.Message();
            websocketMsg.setSendId(sysNotifySend.getSendId());
            websocketMsg.setFrom(sysNotifySend.getSendUserId());
            websocketMsg.setDestination(ThemeEnum.BUSINESS.getTheme());
            websocketMsg.setMessage(sysNotifySend.getContent());
            remoteCommonService.sendMessage(websocketMsg);     
        }
    }

}
@Configuration
public class RedisListenerConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, ApplicationContext context) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
// 通过 new 监听器,并且往 RedisMessageListenerContainer 注册监听器
        KeyExpirationEvent1MessageListener listener = new KeyExpirationEvent1MessageListener(container);
        listener.doRegister(container);
        listener.setApplicationEventPublisher(context);
        return container;
    }

}

五、发送时候如何自定义切换指定 redis 库下标

SpringBoot 1.X 之前的版本

JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) stringRedisTemplate.getConnectionFactory();
jedisConnectionFactory.setDatabase(切换到指定的 db 上);
stringRedisTemplate.setConnectionFactory(jedisConnectionFactory);

SpringBoot 2.X 之后的版本,RedisConnectionFactory 动态切换库必须是 LettuceConnectionFactory,必须是配置 RedisTemplate 时候指定,下面看源码。

public class RedisUtil {
    /**
     * 切换 redis 数据库
     *
     * @param redisTemplate springboot 封装的 redis 对象
     * @param index         数据库下标
     */
    public static void select(RedisTemplate redisTemplate, int index) {LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();
        if (lettuceConnectionFactory != null) {lettuceConnectionFactory.setDatabase(index);
            redisTemplate.setConnectionFactory(lettuceConnectionFactory);
            lettuceConnectionFactory.resetConnection();}
    }

}
/**
 * @author caochikai
 * @date 2019/7/12
 * Redis 配置类
 */
@EnableCaching
@Configuration
@AllArgsConstructor
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfig {
//@AllArgsConstructor 是构造器注入进来 LettuceConnectionFactory
    private final LettuceConnectionFactory lcfactory;

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        // 关闭共享链接,动态切换的重点在这里
        **lcfactory.setShareNativeConnection(false);**
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setConnectionFactory(lcfactory);
        return redisTemplate;
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForZSet();
    }
}

六、设置缓存时候技巧

key 需要指定 key 命名规则前缀 RedisConstant.NOTIFY_RECEIVE 常量(通常取表名),随后加上表的主键。时间计算间距小技巧,博主业务使用的是 LocalDateTime,Duration.between(LocalDateTime.now(), 指定发送时间).getSeconds(),过期时间单位使用的是 TimeUnit 枚举。切换到指定一号库后,记得切换回来零号库,减少对其他业务的影响

/**
     * redis 缓存过期监听
     *
     * @param sysNotifySend 定时发送消息
     * @param message 消息内容
     */
    private void redisSetMsgKey(SysNotifySend sysNotifySend, Message message) {LocalDateTime sendTime = sysNotifySend.getSendTime();
        String jsonString = JSONUtil.parseObj(message).toJSONString(0);
        RedisUtil.select(redisTemplate, 1);
        redisTemplate.opsForValue().set(RedisConstant.NOTIFY_RECEIVE + sysNotifySend.getSendId(), jsonString, Duration.between(LocalDateTime.now(), sendTime).getSeconds(), TimeUnit.SECONDS);
        RedisUtil.select(redisTemplate, 0);
    }

七、参考文章如下:

  1. SpringBoot2.0 以上整合 redis 根据下标动态切换数据库
  2. redis 缓存过期策略,监听 redis 缓存
退出移动版