共计 4929 个字符,预计需要花费 13 分钟才能阅读完成。
作者:京东批发 张路瑶
1. 利用场景
目前零碎中有很多须要用到延时解决的性能:领取超时勾销、排队超时、短信、微信等揭示提早发送、token 刷新、会员卡过期等等。通过延时解决,极大的节俭零碎的资源,不用轮询数据库解决工作。
目前大部分性能通过定时工作实现,定时工作还分应用 quartz 及 xxljob 两种类型轮询工夫短,每秒执行一次,对数据库造成肯定的压力,并且会有 1 秒的误差。轮询工夫久,如 30 分钟一次,03:01 插入一条数据,失常 3:31 执行过期,然而 3:30 执行轮询时,扫描 3:00-3:30 的数据,是扫描不到 3:31 的数据的,须要 4:00 的时候能力扫描到,相当于多提早了 29 分钟!
2. 延时解决形式调研
1.DelayQueue
1. 实现形式:
jvm 提供的提早阻塞队列,通过优先级队列对不同延迟时间工作进行排序,通过 condition 进行阻塞、睡眠 dealy 工夫 获取提早工作。
当有新工作退出时,会判断新工作是否是第一个待执行的工作,若是,会解除队列睡眠,避免新退出的元素时须要执行的元素而不能失常被执行线程获取到。
2. 存在的问题:
1. 单机运行,零碎宕机后,无奈进行无效的重试
2. 没有执行记录和备份
3. 没有重试机制
4. 零碎重启时,会将工作清空!
5. 不能分片生产
3. 劣势:实现简略,无工作时阻塞,节俭资源,执行工夫精确
2. 提早队列 mq
实现形式:依赖 mq,通过设置提早生产工夫,达到提早生产性能。像 rabbitMq、jmq 都能够设置提早生产工夫。RabbitMq 通过将音讯设置过期工夫,放入死信队列进行生产实现。
存在的问题:
1. 工夫设置不灵便,每个 queue 是固定的到期工夫,每次新创建延时队列,须要创立新的音讯队列
长处:依附 jmq,能够无效的监控、生产记录、重试,具备多机同时生产能力,不害怕宕机
3. 定时工作
通过定时工作轮询符合条件的数据
毛病:
1. 必须要读业务数据库,对数据库造成肯定的压力,
2. 存在延时
3. 一次扫描数据量过大时,占用过多的系统资源。
4. 无奈分片生产
长处:
1. 生产失败后,下次还能持续生产,具备重试能力,
2. 生产能力稳固
4.redis
工作存储在 redis 中,应用 redis 的 zset 队列依据 score 进行排序,程序通过线程一直获取队列数据生产,实现延时队列
长处:
1、查问 redis 相比拟数据库快,set 队列长度过大,会依据跳表构造进行查问,效率高
2、redis 可依据工夫戳进行排序,只须要查问以后工夫戳内的分数的工作即可
3、无惧机器重启
4、分布式生产
毛病:
1. 受限于 redis 性能,并发 10W
2. 多个命令无奈保障原子性,应用 lua 脚本会要求所有数据都在一个 redis 分片上。
5. 工夫轮
通过工夫轮实现的提早工作执行,也是基于 jvm 单机运行,如 kafka、netty 都有实现工夫轮,redisson 的看门狗也是通过 netty 的工夫轮实现的。
毛病:不适宜分布式服务的应用,宕机后,会失落工作。
3. 实现目标
兼容目前在应用的异步事件组件,并提供更牢靠,可重试、有记录、可监控报警、高性能的提早组件。
•音讯传输可靠性:音讯进入到提早队列后,保障至多被生产一次。
•Client 反对丰盛:反对多重语言。
•高可用性:反对多实例部署。挂掉一个实例后,还有后备实例持续提供服务。
•实时性:容许存在肯定的时间误差。
•反对音讯删除:业务应用方,能够随时删除指定音讯。
•反对生产查问
•反对手动重试
•对以后异步事件的执行减少监控
4. 架构设计
5. 提早组件实现形式
1. 实现原理
目前抉择应用 jimdb 通过 zset 实现延时性能,将工作 id 和对应的执行工夫作为 score 存在在 zset 队列中,默认会依照 score 排序,每次取 0 - 以后工夫内的 score 的工作 id,
发送提早工作时,会依据工夫戳 + 机器 ip+queueName+sequence 生成惟一的 id,结构音讯体,加密后放入 zset 队列中。
通过搬运线程,将达到执行工夫的工作挪动到公布队列中,期待消费者获取。
监控方通过集成 ump
生产记录通过 redis 备份 + 数据库长久化实现。
通过缓存实现的形式,只是实现的一种,能够通过参数管制应用哪一种实现形式,并可通过 spi 自在扩大。
2. 音讯构造
每个 Job 必须蕴含一下几个属性:
•Topic:Job 类型, 即 QueueName
•Id:Job 的惟一标识。用来检索和删除指定的 Job 信息。
•Delay:Job 须要提早的工夫。单位:秒。(服务端会将其转换为相对工夫)
•Body:Job 的内容,供消费者做具体的业务解决,以 json 格局存储。
•traceId: 发送线程的 traceId,待后续 pfinder 反对设置 traceId 后,可与发送线程专用同一个 traceiD, 便于日志追踪
具体构造如下图示意:
TTR 的设计目标是为了保障音讯传输的可靠性。
3. 数据流转及流程图
基于 redis-disruptor 形式进行公布、生产,能够作为音讯来进行应用,消费者采纳原有异步事件的 disruptor 无锁队列生产,不同利用、不同 queue 之间无锁
1. 反对利用只公布,不生产,达到音讯队列的性能。
2:反对分桶,针对大 key 问题,若事件多,能够设置提早队列和工作队列桶的数量,减小因大 key 造成的 redis 阻塞问题。
3: 通过 ducc 配置,进行性能的扩大,目前只反对开启生产和敞开生产。
4: 反对设置超时工夫配置,避免生产线程执行过久
瓶颈:生产速度慢,生产速度过快,会导致 ringbuffer 队列占满,以后利用既是生产者也是消费者时,生产者会休眠,性能取决于生产速度,可通过程度扩大机器,间接晋升性能。监控 redis 队列的长度,若一直增长,可思考减少消费者,间接进步性能。
可能呈现的状况:因一个利用专用一个 disruptor,领有 64 个消费者线程,如果某一个事件生产过慢,导致 64 个线程都在生产这个事件,会导致其余事件无生产线程生产,生产者线程也被阻塞,导致所有事件的生产都被阻塞。
前期察看是否有这个性能瓶颈,可给每一个 queue 一个消费者线程池。
6.demo 示例
减少配置文件
判断是否开启 jd.event.enable:true
<dependency> <groupId>com.jd.car</groupId>
<artifactId>senna-event</artifactId>
<version>1.0-SNAPSHOT</version> </dependency>
配置
jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle
生产代码:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {log.info("Handler 开始生产:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {log.info("delayHandler 开始生产:{}", key);
}
}
注解模式:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {log.info("Handler 开始生产:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {log.info("delayHandler 开始生产:{}", key);
}
}
发送代码
package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {
@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;
@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {log.info("发送无提早音讯");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {log.info("发送提早 5 秒音讯");
eventQueue.push("delay 5000 millseconds message,name",1000*5L);
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {log.info("发送提早到 2022-04-02 00:00:00 执行的音讯");
eventQueue.push("delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
}
}
参考有赞设计:https://tech.youzan.com/queuing_delay/
7. 目前利用:
1. 云修到店排队 24 小时后主动勾销
2.. 美团申请 token 定时刷新。
3. 质保卡延期 24 小时生成
5. 结算单延期生成
6. 短信提早发送