SpringBoot 之 Redis 定时发送消息
一、需求
实时发送定时公告,倒计时功能通过监听 Redis 缓存过期(Key 失效)事件。类似用途可以用于订单定时关闭,商品或活动上下架。
二、修改 redis.conf 文件,打开 notify-keyspace-events Ex 的注释,开启过期通知功能
############################# EVENT NOTIFICATION ##############################
# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
# K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events ""
三、重启 redis , 测试监听事件是否开启
__keyevent@__:expired 其实是 指所有库,可以指定库下标监听 16 个默认数据库的某一个,比如 keyevent@1 指定壹号 库。打开redisclientA,PSUBSCRIBE 指令订阅事件。
127.0.0.1:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
再开启另一个redisclientB,发送过期数据指定事件 2 秒。
127.0.0.1:6379> setex test 2 2
OK
redisclientA 就会监听到 redisclientB 过期 key
127.0.0.1:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "test"
四、监听器配置和实现
KeyExpirationEvent1MessageListener 可参考 org.springframework.data.redis.listener.KeyExpirationEventMessageListener 源码实现的(默认订阅的是keyevent@*:expired),而我们目标是监听壹号库。因为 0 号库给限流和 oauth2 用了,里面存在很多短期 key,会监听许多不相干的业务 key 缓存。此外,不能给 KeyExpirationEvent1MessageListener 加上 @Component,因为存在 bean 循环依赖问题,可以通过 SpringContextHolder 解决。
@Slf4j
public class KeyExpirationEvent1MessageListener extends KeyExpirationEventMessageListener {private static final Topic KEYEVENT1_EXPIRED_TOPIC = new PatternTopic("__keyevent@1__:expired");
/**
* @param listenerContainer must not be {@literal null}.
*/
public KeyExpirationEvent1MessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);
}
@Override
public void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT1_EXPIRED_TOPIC);
}
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取过期的 key
String expireKey = message.toString();
// 设置监听频道
if (expireKey.startsWith(RedisConstant.NOTIFY_RECEIVE)) {log.info("过期的键值对的消息 ID:" + expireKey);
log.info("消息监听频道 topic:" + new String(message.getChannel()));
// 获取消息发送 id,通过
String sendId = expireKey.substring(RedisConstant.NOTIFY_RECEIVE.length());
SysNotifySendService sysNotifySendService = SpringContextHolder.getBean(SysNotifySendService.class);
//common 服务提供 websocket 发送远程接口 RemoteCommonService
RemoteCommonService remoteCommonService = SpringContextHolder.getBean(RemoteCommonService.class);
SysNotifyReceiveService receiveService = SpringContextHolder.getBean(SysNotifyReceiveService.class);
SysNotifySend sysNotifySend = sysNotifySendService.getOne(Wrappers.<SysNotifySend>lambdaQuery().eq(SysNotifySend::getSendId, sendId));
com.gdjs.gold.admin.api.dto.Message websocketMsg = new com.gdjs.gold.admin.api.dto.Message();
websocketMsg.setSendId(sysNotifySend.getSendId());
websocketMsg.setFrom(sysNotifySend.getSendUserId());
websocketMsg.setDestination(ThemeEnum.BUSINESS.getTheme());
websocketMsg.setMessage(sysNotifySend.getContent());
remoteCommonService.sendMessage(websocketMsg);
}
}
}
@Configuration
public class RedisListenerConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, ApplicationContext context) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 通过 new 监听器,并且往 RedisMessageListenerContainer 注册监听器
KeyExpirationEvent1MessageListener listener = new KeyExpirationEvent1MessageListener(container);
listener.doRegister(container);
listener.setApplicationEventPublisher(context);
return container;
}
}
五、发送时候如何自定义切换指定 redis 库下标
SpringBoot 1.X 之前的版本
JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) stringRedisTemplate.getConnectionFactory();
jedisConnectionFactory.setDatabase(切换到指定的 db 上);
stringRedisTemplate.setConnectionFactory(jedisConnectionFactory);
SpringBoot 2.X 之后的版本,RedisConnectionFactory 动态切换库必须是 LettuceConnectionFactory,必须是配置 RedisTemplate 时候指定,下面看源码。
public class RedisUtil {
/**
* 切换 redis 数据库
*
* @param redisTemplate springboot 封装的 redis 对象
* @param index 数据库下标
*/
public static void select(RedisTemplate redisTemplate, int index) {LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) redisTemplate.getConnectionFactory();
if (lettuceConnectionFactory != null) {lettuceConnectionFactory.setDatabase(index);
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
lettuceConnectionFactory.resetConnection();}
}
}
/**
* @author caochikai
* @date 2019/7/12
* Redis 配置类
*/
@EnableCaching
@Configuration
@AllArgsConstructor
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfig {
//@AllArgsConstructor 是构造器注入进来 LettuceConnectionFactory
private final LettuceConnectionFactory lcfactory;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
// 关闭共享链接,动态切换的重点在这里
**lcfactory.setShareNativeConnection(false);**
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setConnectionFactory(lcfactory);
return redisTemplate;
}
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForHash();
}
@Bean
public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {return redisTemplate.opsForValue();
}
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForList();
}
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForSet();
}
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForZSet();
}
}
六、设置缓存时候技巧
key 需要指定 key 命名规则前缀 RedisConstant.NOTIFY_RECEIVE 常量(通常取表名),随后加上表的主键。时间计算间距小技巧,博主业务使用的是 LocalDateTime,Duration.between(LocalDateTime.now(), 指定发送时间).getSeconds(),过期时间单位使用的是 TimeUnit 枚举。切换到指定一号库后,记得切换回来零号库,减少对其他业务的影响
/**
* redis 缓存过期监听
*
* @param sysNotifySend 定时发送消息
* @param message 消息内容
*/
private void redisSetMsgKey(SysNotifySend sysNotifySend, Message message) {LocalDateTime sendTime = sysNotifySend.getSendTime();
String jsonString = JSONUtil.parseObj(message).toJSONString(0);
RedisUtil.select(redisTemplate, 1);
redisTemplate.opsForValue().set(RedisConstant.NOTIFY_RECEIVE + sysNotifySend.getSendId(), jsonString, Duration.between(LocalDateTime.now(), sendTime).getSeconds(), TimeUnit.SECONDS);
RedisUtil.select(redisTemplate, 0);
}
七、参考文章如下:
- SpringBoot2.0 以上整合 redis 根据下标动态切换数据库
- redis 缓存过期策略,监听 redis 缓存