关于架构:一种异步延迟队列的实现方式

10次阅读

共计 4929 个字符,预计需要花费 13 分钟才能阅读完成。

作者:京东批发 张路瑶

1. 利用场景

目前零碎中有很多须要用到延时解决的性能:领取超时勾销、排队超时、短信、微信等揭示提早发送、token 刷新、会员卡过期等等。通过延时解决,极大的节俭零碎的资源,不用轮询数据库解决工作。

目前大部分性能通过定时工作实现,定时工作还分应用 quartz 及 xxljob 两种类型轮询工夫短,每秒执行一次,对数据库造成肯定的压力,并且会有 1 秒的误差。轮询工夫久,如 30 分钟一次,03:01 插入一条数据,失常 3:31 执行过期,然而 3:30 执行轮询时,扫描 3:00-3:30 的数据,是扫描不到 3:31 的数据的,须要 4:00 的时候能力扫描到,相当于多提早了 29 分钟!

2. 延时解决形式调研

1.DelayQueue

1. 实现形式:

jvm 提供的提早阻塞队列,通过优先级队列对不同延迟时间工作进行排序,通过 condition 进行阻塞、睡眠 dealy 工夫 获取提早工作。

当有新工作退出时,会判断新工作是否是第一个待执行的工作,若是,会解除队列睡眠,避免新退出的元素时须要执行的元素而不能失常被执行线程获取到。

2. 存在的问题:

1. 单机运行,零碎宕机后,无奈进行无效的重试

2. 没有执行记录和备份

3. 没有重试机制

4. 零碎重启时,会将工作清空!

5. 不能分片生产

3. 劣势:实现简略,无工作时阻塞,节俭资源,执行工夫精确

2. 提早队列 mq

实现形式:依赖 mq,通过设置提早生产工夫,达到提早生产性能。像 rabbitMq、jmq 都能够设置提早生产工夫。RabbitMq 通过将音讯设置过期工夫,放入死信队列进行生产实现。

存在的问题:

1. 工夫设置不灵便,每个 queue 是固定的到期工夫,每次新创建延时队列,须要创立新的音讯队列

长处:依附 jmq,能够无效的监控、生产记录、重试,具备多机同时生产能力,不害怕宕机

3. 定时工作

通过定时工作轮询符合条件的数据

毛病:

1. 必须要读业务数据库,对数据库造成肯定的压力,

2. 存在延时

3. 一次扫描数据量过大时,占用过多的系统资源。

4. 无奈分片生产

长处:

1. 生产失败后,下次还能持续生产,具备重试能力,

2. 生产能力稳固

4.redis

工作存储在 redis 中,应用 redis 的 zset 队列依据 score 进行排序,程序通过线程一直获取队列数据生产,实现延时队列

长处:

1、查问 redis 相比拟数据库快,set 队列长度过大,会依据跳表构造进行查问,效率高

2、redis 可依据工夫戳进行排序,只须要查问以后工夫戳内的分数的工作即可

3、无惧机器重启

4、分布式生产

毛病:

1. 受限于 redis 性能,并发 10W

2. 多个命令无奈保障原子性,应用 lua 脚本会要求所有数据都在一个 redis 分片上。

5. 工夫轮

通过工夫轮实现的提早工作执行,也是基于 jvm 单机运行,如 kafka、netty 都有实现工夫轮,redisson 的看门狗也是通过 netty 的工夫轮实现的。

毛病:不适宜分布式服务的应用,宕机后,会失落工作。

3. 实现目标

兼容目前在应用的异步事件组件,并提供更牢靠,可重试、有记录、可监控报警、高性能的提早组件。

•音讯传输可靠性:音讯进入到提早队列后,保障至多被生产一次。

•Client 反对丰盛:反对多重语言。

•高可用性:反对多实例部署。挂掉一个实例后,还有后备实例持续提供服务。

•实时性:容许存在肯定的时间误差。

•反对音讯删除:业务应用方,能够随时删除指定音讯。

•反对生产查问

•反对手动重试

•对以后异步事件的执行减少监控

4. 架构设计

5. 提早组件实现形式

1. 实现原理

目前抉择应用 jimdb 通过 zset 实现延时性能,将工作 id 和对应的执行工夫作为 score 存在在 zset 队列中,默认会依照 score 排序,每次取 0 - 以后工夫内的 score 的工作 id,

发送提早工作时,会依据工夫戳 + 机器 ip+queueName+sequence 生成惟一的 id,结构音讯体,加密后放入 zset 队列中。

通过搬运线程,将达到执行工夫的工作挪动到公布队列中,期待消费者获取。

监控方通过集成 ump

生产记录通过 redis 备份 + 数据库长久化实现。

通过缓存实现的形式,只是实现的一种,能够通过参数管制应用哪一种实现形式,并可通过 spi 自在扩大。

2. 音讯构造

每个 Job 必须蕴含一下几个属性:

•Topic:Job 类型, 即 QueueName

•Id:Job 的惟一标识。用来检索和删除指定的 Job 信息。

•Delay:Job 须要提早的工夫。单位:秒。(服务端会将其转换为相对工夫)

•Body:Job 的内容,供消费者做具体的业务解决,以 json 格局存储。

•traceId: 发送线程的 traceId,待后续 pfinder 反对设置 traceId 后,可与发送线程专用同一个 traceiD, 便于日志追踪

具体构造如下图示意:

TTR 的设计目标是为了保障音讯传输的可靠性。

3. 数据流转及流程图

基于 redis-disruptor 形式进行公布、生产,能够作为音讯来进行应用,消费者采纳原有异步事件的 disruptor 无锁队列生产,不同利用、不同 queue 之间无锁

1. 反对利用只公布,不生产,达到音讯队列的性能。

2:反对分桶,针对大 key 问题,若事件多,能够设置提早队列和工作队列桶的数量,减小因大 key 造成的 redis 阻塞问题。

3: 通过 ducc 配置,进行性能的扩大,目前只反对开启生产和敞开生产。

4: 反对设置超时工夫配置,避免生产线程执行过久

瓶颈:生产速度慢,生产速度过快,会导致 ringbuffer 队列占满,以后利用既是生产者也是消费者时,生产者会休眠,性能取决于生产速度,可通过程度扩大机器,间接晋升性能。监控 redis 队列的长度,若一直增长,可思考减少消费者,间接进步性能。

可能呈现的状况:因一个利用专用一个 disruptor,领有 64 个消费者线程,如果某一个事件生产过慢,导致 64 个线程都在生产这个事件,会导致其余事件无生产线程生产,生产者线程也被阻塞,导致所有事件的生产都被阻塞。

前期察看是否有这个性能瓶颈,可给每一个 queue 一个消费者线程池。

6.demo 示例

减少配置文件

判断是否开启 jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>

配置

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

生产代码:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {log.info("Handler 开始生产:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {log.info("delayHandler 开始生产:{}", key);
}
}

注解模式:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {log.info("Handler 开始生产:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {log.info("delayHandler 开始生产:{}", key);
}
}

发送代码


package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {log.info("发送无提早音讯");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {log.info("发送提早 5 秒音讯");
eventQueue.push("delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {log.info("发送提早到 2022-04-02 00:00:00 执行的音讯");
eventQueue.push("delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 

}

参考有赞设计:https://tech.youzan.com/queuing_delay/

7. 目前利用:

1. 云修到店排队 24 小时后主动勾销

2.. 美团申请 token 定时刷新。

3. 质保卡延期 24 小时生成

5. 结算单延期生成

6. 短信提早发送

正文完
 0