共计 7347 个字符,预计需要花费 19 分钟才能阅读完成。
序
本文主要研究一下 redisson 的 DelayedQueue
maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.1</version>
</dependency>
实例
@Test
public void testDelayedQueue() throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setAddress(“redis://192.168.99.100:6379”);
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue(“dest_queue1”);
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
delayedQueue.offer(“demo”, 10, TimeUnit.SECONDS);
Assert.assertFalse(blockingQueue.contains(“demo”));
TimeUnit.SECONDS.sleep(15);
Assert.assertTrue(blockingQueue.contains(“demo”));
}
这里使用了两个 queue,对 delayedQueue 的 offer 操作是直接进入 delayedQueue,但是 delay 是作用在目标队列上,这里就是 RBlockingQueue
源码解析
RDelayedQueue.offer
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
private final QueueTransferService queueTransferService;
private final String channelName;
private final String queueName;
private final String timeoutSetName;
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName(“redisson_delay_queue_channel”, getName());
queueName = prefixName(“redisson_delay_queue”, getName());
timeoutSetName = prefixName(“redisson_delay_queue_timeout”, getName());
//QueueTransferTask task = ……
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
public void offer(V e, long delay, TimeUnit timeUnit) {
get(offerAsync(e, delay, timeUnit));
}
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = PlatformDependent.threadLocalRandom().nextLong();
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
“local value = struct.pack(‘dLc0’, tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);”
+ “redis.call(‘zadd’, KEYS[2], ARGV[1], value);”
+ “redis.call(‘rpush’, KEYS[3], value);”
// if new object added to queue head when publish its startTime
// to all scheduler workers
+ “local v = redis.call(‘zrange’, KEYS[2], 0, 0); ”
+ “if v[1] == value then ”
+ “redis.call(‘publish’, KEYS[4], ARGV[1]); ”
+ “end;”
,
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
public ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
public static String prefixName(String prefix, String name) {
if (name.contains(“{“)) {
return prefix + “:” + name;
}
return prefix + “:{” + name + “}”;
}
//……
}
这里使用的是一段 lua 脚本,其中 keys 参数数组有四个值,KEYS[1]为 getName(), KEYS[2]为 timeoutSetName, KEYS[3]为 queueName, KEYS[4]为 channelName
变量有三个,ARGV[1]为 timeout,ARGV[2]为 randomId,ARGV[3]为 encode(e)
这段 lua 脚本对 timeoutSetName 的 zset 添加一个结构体,其 score 为 timeout 值;对 queueName 的 list 的表尾添加结构体;然后判断 timeoutSetName 的 zset 的第一个元素是否是当前的结构体,如果是则对 channel 发布 timeout 消息
queueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
“local expiredValues = redis.call(‘zrangebyscore’, KEYS[2], 0, ARGV[1], ‘limit’, 0, ARGV[2]); ”
+ “if #expiredValues > 0 then ”
+ “for i, v in ipairs(expiredValues) do ”
+ “local randomId, value = struct.unpack(‘dLc0’, v);”
+ “redis.call(‘rpush’, KEYS[1], value);”
+ “redis.call(‘lrem’, KEYS[3], 1, v);”
+ “end; ”
+ “redis.call(‘zrem’, KEYS[2], unpack(expiredValues));”
+ “end; ”
// get startTime from scheduler queue head task
+ “local v = redis.call(‘zrange’, KEYS[2], 0, 0, ‘WITHSCORES’); ”
+ “if v[1] ~= nil then ”
+ “return v[2]; ”
+ “end ”
+ “return nil;”,
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic<Long> getTopic() {
return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
queueTransferService.schedule(queueName, task);
RedissonDelayedQueue 构造器里头对 QueueTransferTask 进行调度
调度执行的是 pushTaskAsync 方法,主要就是将到期的元素从元素队列移到目标队列
这里使用一段 lua 脚本,KEYS[1]为 getName(),KEYS[2]为 timeoutSetName,KEYS[3]为 queueName;ARGV[1]为当前时间戳,ARGV[2]为 100
这里调用 zrangebyscore,对 timeoutSetName 的 zset 使用 timeout 参数进行排序,取得分介于 0 和当前时间戳的元素,取前 200 条
如果有值表示该元素需要移交到目标队列,然后调用 rpush 移交到目标队列,再调用 lrem 从元素队列移除,最后在从 timeoutSetName 的 zset 中删除掉已经处理的这些元素
处理完过元素转移之后,再取 timeoutSetName 的 zset 的第一个元素的得分返回,如果没有返回 nil
QueueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java
public class QueueTransferService {
private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
public synchronized void schedule(String name, QueueTransferTask task) {
QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
if (oldTask == null) {
task.start();
} else {
oldTask.incUsage();
}
}
public synchronized void remove(String name) {
QueueTransferTask task = tasks.get(name);
if (task != null) {
if (task.decUsage() == 0) {
tasks.remove(name, task);
task.stop();
}
}
}
}
这里的 schedule 方法首先添加到 ConcurrentMap 中,如果该任务已经存在,则调用 oldTask.incUsage(),不存在则启动该任务
QueueTransferTask.start
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java
public void start() {
RTopic<Long> schedulerTopic = getTopic();
statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
pushTask();
}
});
messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long startTime) {
scheduleTask(startTime);
}
});
}
private void scheduleTask(final Long startTime) {
TimeoutTask oldTimeout = lastTimeout.get();
if (startTime == null) {
return;
}
if (oldTimeout != null) {
oldTimeout.getTask().cancel();
}
long delay = startTime – System.currentTimeMillis();
if (delay > 10) {
Timeout timeout = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();
TimeoutTask currentTimeout = lastTimeout.get();
if (currentTimeout.getTask() == timeout) {
lastTimeout.compareAndSet(currentTimeout, null);
}
}
}, delay, TimeUnit.MILLISECONDS);
if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
timeout.cancel();
}
} else {
pushTask();
}
}
private void pushTask() {
RFuture<Long> startTimeFuture = pushTaskAsync();
startTimeFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error(future.cause().getMessage(), future.cause());
scheduleTask(System.currentTimeMillis() + 5 * 1000L);
return;
}
if (future.getNow() != null) {
scheduleTask(future.getNow());
}
}
});
}
这里用到了 RTopic,添加了 StatusListener 以及 MessageListener
StatusListener 在订阅的时候触发 pushTask,MessageListener 主要是调用 scheduleTask
pushTaskAsync 在 RedissonDelayedQueue 的实现就是上面讲的实现元素在原始队列及目标队列的转移
scheduleTask 方法会重新计算 delay,对于大于 10 的延时触发 pushTask,小于等于 10 的则立刻触发 pushTask
pushTask 会对 pushTaskAsync 操作进行回调,如果执行不成功则重新触发 scheduleTask,如果执行成功但是返回值 (timeoutSetName 的 zset 的第一个元素的得分) 不为 null 的话,则以该值触发 scheduleTask
小结
redisson 的 DelayedQueue 使用上是将元素及延时信息入队,之后定时任务将到期的元素转移到目标队列
这里使用了三个结构来存储,一个是目标队列 list;一个是原生队列 list,添加的是带有延时信息的结构体;一个是 timeoutSetName 的 zset,元素是结构体,其 score 为 timeout 值
redisson 使用了很多异步回调来操作,整体代码阅读上会相对费劲些
doc
delayed-queue