共计 12081 个字符,预计需要花费 31 分钟才能阅读完成。
org.quartz 实现定时任务并自定义切换任务数据源
在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz 是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对 quartz 有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。
集成 quartz 实现定时任务
集成 quartz 实现定时任务
quartz 中实现定时任务需要了解的基本概念
Job
通过实现 Job
类,在实现方法中写我们具体想要定时任务完成的工作,然后交给 quartz
管理。
JobDetail
Job
只负责实现具体任务,所以还需要借助 JobDetail
来存储一些描述 Job
的基本信息。
Quartz JobBuilder
为构造 JobDetail
实体提供的builder-style API
。你可以这样使用它来构建一个JobDetail
:
@Bean
public JobDetail jobDetail() {return JobBuilder.newJob().ofType(SampleJob.class)
.storeDurably()
.withIdentity("Qrtz_Job_Detail")
.withDescription("Invoke Sample Job service...")
.build();}
Spring JobDetailFactoryBean
在 Spring
中配置 JobDetail
的方式:
@Bean
public JobDetailFactoryBean jobDetail() {JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
jobDetailFactory.setJobClass(SampleJob.class);
jobDetailFactory.setDescription("Invoke Sample Job service...");
jobDetailFactory.setDurability(true);
return jobDetailFactory;
}
Trigger
触发器,代表一个调度参数的配置, 什么时候去调度:
@Bean
public Trigger trigger(JobDetail job) {return TriggerBuilder.newTrigger().forJob(job)
.withIdentity("Qrtz_Trigger")
.withDescription("Sample trigger")
.withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
.build();}
Scheduler
调度器,通过 Job
和Trigger
来注册一个调度器:
@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
Scheduler scheduler = factory.getScheduler();
scheduler.setJobFactory(springBeanJobFactory());
scheduler.scheduleJob(job, trigger);
scheduler.start();
return scheduler;
}
给系统添加一个 Job
在 quartz
中Job
就是我们需要去执行的任务,由 Scheduler
调度器负责调度任务们依靠制定好的 Trigger
来定时执行任务。
因此首先我们需要结合以上基础给系统添加一个 Job。
addJob
public void addJob(BaseJob job) throws SchedulerException {
/** 创建 JobDetail 实例, 绑定 Job 实现类
* JobDetail 表示一个具体的可执行的调度程序,job 是这个可执行调度程序所要执行的内容
* 另外 JobDetail 还包含了这个任务调度的方案和策略 **/
// 指明 job 的名称,所在组的名称,以及绑定 job 类
JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
.withIdentity(job.getJobKey())
.withDescription(job.getDescription())
.usingJobData(job.getDataMap())
.build();
/**
* Trigger 代表一个调度参数的配置, 什么时候去调度
*/
// 定义调度触发规则, 使用 cronTrigger 规则
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(job.getJobName(),job.getJobGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
.startNow()
.build();
// 将任务和触发器注册到任务调度中去
scheduler.scheduleJob(jobDetail,trigger);
// 判断调度器是否启动
if(!scheduler.isStarted()){scheduler.start();
}
log.info(String.format("定时任务:%s.%s- 已添加到调度器!", job.getJobGroup(),job.getJobName()));
}
首先需要定义好我们的 Job,之后通过 Job 初始化 JobDetail
和Trigger
, 最后将 JobDetail
和Trigger
注册到调度器中。
BaseJob
Job
的结构如下:
public abstract class BaseJob implements Job,Serializable {
private static final long serialVersionUID = 1L;
private static final String JOB_MAP_KEY = "self";
/**
* 任务名称
*/
private String jobName;
/**
* 任务分组
*/
private String jobGroup;
/**
* 任务状态 是否启动任务
*/
private String jobStatus;
/**
* cron 表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任务执行时调用哪个类的方法 包名 + 类名
*/
private Class beanClass = this.getClass();
/**
* 任务是否有状态
*/
private String isConcurrent;
/**
* Spring bean
*/
private String springBean;
/**
* 任务调用的方法名
*/
private String methodName;
/**
* 该任务所使用的数据源
*/
private String dataSource = DataSourceEnum.DB1.getName();
/**
* 为了将执行后的任务持久化到数据库中
*/
@JsonIgnore
private JobDataMap dataMap = new JobDataMap();
public JobKey getJobKey(){return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务 key
}
...
}
可以看到 Job
中定义了任务的一些基本信息,重点关注其中的 dataSource
和dataMap
属性。其中 dataSource
是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap
。
SchedulerConfig
在添加 Job
的时候,JobDetail
和 Trigger
都是通过关键字 new
生成的,而调度器 Scheduler
则需要放在容器中维护。
@Configuration
@Order
public class SchedulerConfig {
@Autowired
private MyJobFactory myJobFactory;
@Value("${spring.profiles.active}")
private String profile;
/*
* 通过 SchedulerFactoryBean 获取 Scheduler 的实例
*/
@Bean(name = "scheduler")
public Scheduler scheduler() throws Exception {return schedulerFactoryBean().getScheduler();}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
// 延时启动
factory.setStartupDelay(20);
// 加载 quartz 数据源配置
factory.setQuartzProperties(quartzProperties());
// 自定义 Job Factory,用于 Spring 注入
factory.setJobFactory(myJobFactory);
/********* 全局监听器配置 ************/
JobListener myJobListener = new SchedulerListener();
factory.setGlobalJobListeners(myJobListener);// 直接添加为全局监听器
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
if (Util.PRODUCT.equals(profile)) {// 正式环境
System.out.println("正式环境 quartz 配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
} else {System.out.println("测试环境 quartz 配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
}
// 在 quartz.properties 中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();}
/*
* quartz 初始化监听器
*/
@Bean
public QuartzInitializerListener executorListener() {return new QuartzInitializerListener();
}
}
上述代码中,将 scheduler
加入到 Spring
容器中。scheduler
是由 SchedulerFactoryBean
进行维护的,在 SchedulerFactoryBean
中对调度器工厂做了一些基本设置并从配置文件中加载了 quartz 数据源配置(配置文件的读取会根据运行环境 profile
来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。
MyJobFactory
使用 Spring 提供的JobFactory
。
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
// 进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
quartz.properties
quartz.properties
中是 quartz 连接数据库的一些配置信息。
# \u56FA\u5B9A\u524D\u7F00org.quartz
# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
#
#
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
# \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
# threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
# \u5E76\u53D1\u4E2A\u6570
org.quartz.threadPool.threadCount = 5
# \u4F18\u5148\u7EA7
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
org.quartz.jobStore.misfireThreshold = 5000
# \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
#\u6301\u4E45\u5316
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = qzDS
org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
org.quartz.dataSource.qzDS.user=quartz
org.quartz.dataSource.qzDS.password=123456
org.quartz.dataSource.qzDS.maxConnections = 30
org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL
org.quartz.dataSource.qzDS.validateOnCheckout = true
org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40
#org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60
quartz
会根据这个配置文件将 Job
持久化到数据库中,也因此 quartz
会需要初始化一些数据库表,表结构文件在文末。
SchedulerListener
调度器监听器用以监听任务的执行状态。
public class SchedulerListener implements JobListener {private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);
public static final String LISTENER_NAME = "QuartSchedulerListener";
@Override
public String getName() {return LISTENER_NAME; //must return a name}
// 任务被调度前
@Override
public void jobToBeExecuted(JobExecutionContext context) {String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
// 切换任务的数据源
DataSourceContextHolder.setDB(dataSource);
String jobName = context.getJobDetail().getKey().toString();
LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
}
// 任务调度被拒了
@Override
public void jobExecutionVetoed(JobExecutionContext context) {String jobName = context.getJobDetail().getKey().toString();
LOG.error("job {} is jobExecutionVetoed", jobName);
// 可以做一些日志记录原因
}
// 任务被调度后
@Override
public void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
// 清空存储的数据源
String jobName = context.getJobDetail().getKey().toString();
DataSourceContextHolder.clearDB();
LOG.info("Job : {} is finished", jobName);
if (jobException != null && !jobException.getMessage().equals("")) {
LOG.error("Exception thrown by:" + jobName
+ "Exception:" + jobException.getMessage());
}
}
}
SchedulerListener
监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了 quartz
的集成。
多数据源切换
多数据源切换
通过自定义 DynamicDataSource
来覆盖 Spring Boot 中原有的数据源。
DataSourceConfig
通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。
/**
* 多数据源配置类
*/
@Configuration
public class DataSourceConfig {
// 数据源 1
@Bean(name = "datasource1")
@ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis 中对应属性的前缀
public DataSource dataSource1() {return DataSourceBuilder.create().build();}
// 数据源 2
@Bean(name = "datasource2")
@ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis 中对应属性的前缀
public DataSource dataSource2() {return DataSourceBuilder.create().build();}
/**
* 动态数据源: 通过 AOP 在不同数据源之间动态切换
*
* @return
*/
@Primary
@Bean(name = "dynamicDataSource")
public DataSource dynamicDataSource() {DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 默认数据源
dynamicDataSource.setDefaultTargetDataSource(dataSource1());
// 配置多数据源
Map<Object, Object> dsMap = new HashMap();
dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());
dynamicDataSource.setTargetDataSources(dsMap);
return dynamicDataSource;
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
// 设置数据源
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean.getObject();}
/**
* 配置 @Transactional 注解事物
*
* @return
*/
@Bean
public PlatformTransactionManager transactionManager() {return new DataSourceTransactionManager(dynamicDataSource());
}
}
数据源配置
spring:
datasource:
db1:
driver-class-name: com.mysql.cj.jdbc.Driver
username: doctor
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
db2:
driver-class-name: com.mysql.cj.jdbc.Driver
username: quartz
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true
DataSourceContextHolder
由于 quartz
在执行过程中是通过不同的线程来执行 Job
的,因此此处通过 ThreadLocal
来保存线程所使用的数据源情况。
/**
* 保存本地数据源
*/
public class DataSourceContextHolder {private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
/**
* 默认数据源
*/
public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
/**
* ThreadLocal 之后会进行讲解
*/
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
// 设置数据源名
public static void setDB(String dbType) {LOG.info("切换到 {} 数据源", dbType);
contextHolder.set(dbType);
}
// 获取数据源名
public static String getDB() {return (contextHolder.get());
}
// 清除数据源名
public static void clearDB() {contextHolder.remove();
}
}
DynamicDataSource
获取执行中所使用的数据源。由于数据源被保存在了 DataSourceContextHolder
中的 ThreadLocal
中,所以直接获取就行了。
/**
* 获取本地数据源
*/
public class DynamicDataSource extends AbstractRoutingDataSource {private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);
@Override
protected Object determineCurrentLookupKey() {LOG.info("数据源为{}", DataSourceContextHolder.getDB());
return DataSourceContextHolder.getDB();}
}
至此就完成了集成 quartz
及数据源切换的功能。然后就是具体的任务了。
执行任务
具体的任务需要继承 BaseJob
并在 execute
方法中重写具体需要执行的任务。
execute
@Slf4j
@Service
public class ReadNumJob extends BaseJob {
@Autowired
private RedisService redisService;
@Autowired
private JdbcTemplate jdbcTemplate;
private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);
@Override
public void execute(JobExecutionContext context) {doSomething();
}
}
指定数据源
然后在添加任务时指定任务所使用的数据源
ReadNumJob job = new ReadNumJob();
job.setJobName("test");
job.setJobGroup("hys");
job.setDescription("test");
// 指定数据源
job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
job.setCronExpression("0 */1 * * * ?");
try {jobAndTriggerService.addJob(job);
} catch (SchedulerException e) {e.printStackTrace();
}
源码
转评赞就是最大的鼓励