1. 简述
我的项目中需要是在开始工夫和完结工夫周期性的执行指定的工作(脚本),完结后可能更新工作状态,对应的工作比拟耗时,每个工作可能执行的工夫是 10s 到几分钟。根据需要进行设计,周期性的工作有定时工作框架 quartz 实现,定时工作每次执行将工作信息扔到音讯队列中,音讯队列消费者多线程生产音讯,工作执行实现后确认生产了音讯,并更新工作后果状态
2.quartz 调度器
A. 注册 Scheduler
配置文件
```java
org.quartz.scheduler.instanceName = PeriodCheckQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 5
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
org.quartz.jobStore.misfireThreshold = 5000
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = qzDS
org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL = jdbc:mysql://10.50.0.152:3306/cmp?characterEncoding=utf8
org.quartz.dataSource.qzDS.user = root
org.quartz.dataSource.qzDS.password = 123456
```
B. 注册 bean
```java
@Configuration
public class SchedulerConfig {
@Autowired
private SpringJobFactory springJobFactory;
@Autowired
private QuarzProperties quarzProperties;
@Bean(name = "SchedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setAutoStartup(true);
factory.setStartupDelay(5);// 延时 5 秒启动
factory.setQuartzProperties(quartzProperties());
factory.setJobFactory(springJobFactory);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setProperties(quarzProperties.getQuartzProperties());
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();}
@Bean(name="Scheduler")
public Scheduler scheduler() throws IOException {return schedulerFactoryBean().getScheduler();}
}
```
3. 自定义 Job 工作
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.Serializable;
@DisallowConcurrentExecution
public class PeriodCheckJob implements Job, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(PeriodCheckJob.class);
@Autowired
private PeriodCheckService periodCheckService;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {// 业务逻辑}
}
4.JobDetail 和 Trigger
Class dj = Class.forName(PeriodCheckJob.class.getName());
dj.newInstance();
JobDetail jobDetail = JobBuilder.newJob(dj).withIdentity(new JobKey(PeriodCheckJob.class.toString() + expression + checkId, JOB_GROUP)).withDescription("description").build();
jobDetail.getJobDataMap().put("checkId", checkId);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(expression).withMisfireHandlingInstructionDoNothing();
// 开始工夫和完结工夫,以及通过 cron 表达式定义执行工夫
Trigger trigger = trigger = TriggerBuilder.newTrigger().withIdentity(new TriggerKey(PeriodCheck.class.toString() + expression + checkId, "TRIGGER_KEY_" + expression))
.startAt(startTime).endAt(endTime).withSchedule(cronScheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
5. 工作状态
A. 敞开工作
JobKey jobKey = new JobKey(PeriodCheckJob.class.toString() + expression + checkId, JOB_GROUP);
scheduler.pauseJob(jobKey);// 暂停工作
scheduler.deleteJob(jobKey);// 敞开工作
scheduler.resumeJob(jobKey);// 重启工作
// 批改工作工作配置信息
TriggerKey triggerKey = new TriggerKey(InspectPlan.class.toString(), planId.toString());
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(inspectPlan.getCronExpression());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
scheduler.rescheduleJob(triggerKey, trigger);// 从新调度工作
###6. 工作完结事件监听
@Component("periodCheckJobListener")
@Slf4j
public class PeriodCheckJobListener implements ChannelAwareMessageListener {public void onMessage(Message message, Channel channel) {// 通过 message 获取工作信息,更改业务状态}
}