关于java:Quartz-JDBCBased-JobStore

9次阅读

共计 5115 个字符,预计需要花费 13 分钟才能阅读完成。

JDBC-Based JobStore 指的是应用数据库长久化存储作业相干信息的 JobStore,与之对应的是基于内存的 RAMJobStore。

咱们后面学习 Quartz 的 Job、JobDetail、Trigger、作业调度线程以及作业执行线程的时候,大部分状况下是基于 RAMJobStore 进行剖析的,所以对 RAMJobStore 曾经有了理解,然而还不太理解 JDBC-Based JobStore。

咱们从以下几个角度剖析 JDBC-Based JobStore:

  1. 波及到的表以及各表的作用
  2. 作业调度线程以及作业执行线程的基于 JDBC-Based JobStore 的次要工作过程
  3. JDBC-Based JobStore 的事务管理及锁机制
  4. JDBC-Based JobStore 的 recovery 及 Misfire 解决

以上 4 点放在一片文章中可能会太长,所以咱们可能会分 2 篇文章进行剖析,明天先剖析第 1 / 2 两个问题。

JDBC-Based JobStore 波及到的表

JDBC-Based JobStore 波及到的表(省略前缀 qrtz_):

  1. JOBDETAILS:作业信息
  2. TRIGGERS:Trigger 信息
  3. SIMPLE_TRIGGERS:Simple Trigger 信息
  4. CRON_TRIGGERS:Cron Trigger 信息
  5. FIRED_TRIGGERS:被触发的 Triggers
  6. SCHEDULER_STATE:调度服务的状态
  7. LOCKS:锁

Job 及 Trigger 的注册

注册过程咱们在后面的文章中曾经说过了,RAMJobStore 与 JDBC-Based JobStore 的区别次要是存储形式不同,一个存储在内存中,一个存储在数据库中。

Job 注册后存储在 JOBDETAILS 表中,内容与存储在内存中基本一致,次要包含 sched_name、name、group、description、job_class_name、job_data 等。

Trigger 注册后存储在 TRIGGERS 表中,内容与存储在内存中的也基本一致,次要包含 sched_name、name、group、job_name、job_group、trigger_state、trigger_type、start_time、end_time、calendar_name、misfire_instr、job_data 等。

trigger_state 在初始化写入之后的状态为 WAITING(期待被触发)。

trigger_type 包含:

  1. SIMPLE:Simple Trigger
  2. CRON:Cron Trigger
  3. CAL_INT:Calendar Interval Trigger
  4. DAILY_I:Daily Time Interval Trigger
  5. BLOB:A general blob Trigger

不论是 jobdetails 还是 trigger 表都有一个 sched_name 字段,记录以后的任务调度器名称,sched_name 从配置文件中取值(org.quartz.scheduler.instanceName)。

这个调度器名称 sched_name 其实就是运行任务调度服务的服务器标识,也就是以后正在运行 quartz 的服务的标识,集群环境下,不同的服务必须指定不同的 sched_name,无关 quartz 集群的细节咱们在其余文章做详细分析。

Trigger 注册的时候还波及到其余表:
如果以后 Trigger 处于 Group Paused 状态,Trigger 同时写入 paused_trigger_grps 表。

Simple Trigger 会写入 Simple_triggers 表,Cron Trigger 会写入 Cron_triggers 表,负责记录各自的非凡属性;Simple_triggers 记录 repeat_count/repeat_interval/times_triggered 等信息,Cron_triggers 表记录 cron 表达式。

作业的调度

调度工作的执行逻辑咱们在后面的文章中曾经重复剖析过:

  1. 从作业执行线程池获取 availThreadCount,也就是以后可用的线程数
  2. 调用 JobStore 的 acquireNextTriggers 办法,获取特定短时间(idleWaitTime,默认 30 秒)内可能须要被触发的,数量不超过 availThreadCount 的触发器
  3. 调用 JobStore 的 triggersFired 办法对获取到的可能须要被触发的触发器进行二次加工,再次获取到最终的待触发的触发器后果集
  4. 循环解决最终的待处理触发器后果集中的每一个须要被触发的触发器
  5. 用 JobRunShell 包装该触发器做绑定的 Job,送给线程池执行作业

JobStoreSupport#acquireNextTriggers

JobStoreSupport 是 JDBC-Based JobStore 的抽象类,有两个实现类 JobStoreCMT 和 JobStoreTX,两个实现类的次要作用是治理事务,大部分的业务逻辑其实都是在 JobStoreSupport 中实现的,包含 acquireNextTriggers 和 triggersFired 办法。

acquireNextTriggers 办法首先从 Triggers 表中获取符合条件的触发器:nextFireTime 在 30 秒内(有参数 idleWaitTime 设定)的、状态为 WAITING 的、肯定数量的(参数设置的一次解决的触发器个数、或者可用的执行线程数)的触发器。

针对获取到的每一个 Trigger:

  1. 从 JobDetails 表中获取其绑定的 Job,判断 Job 的 ConcurrentExectionDisallowed 属性,做并发管制(逻辑与 RAMJobStore 的相干逻辑一样)
  2. Triggers 表中以后 trigger 的状态从 WAITING 批改为 ACQUIRED
  3. 以后 Trigger 写入到 fired_triggers 表中,状态为 ACQUIRED

JobStoreSupport#triggersFired

再次从 triggers 表中获取到以后 trigger,验证其状态是否为 ACQUIRED,状态不正确的不做解决。

从 job_details 表中获取到以后 trigger 绑定的作业。

更新 fired_triggers 表中以后 trigger 的状态为 EXECUTING。

调用 trigger 的 triggered 办法获取以后 trigger 的下一次触发工夫。

更新 triggers 表中以后 trigger 的状态(WAITING)、下次触发工夫等数据。

作业执行 #JobRunShell

通过以上步骤,作业调度线程就获取到了以后须要执行的 trigger,之后就须要执行最初一步:

用 JobRunShell 包装该触发器,送给线程池执行该触发器关联的作业

咱们须要对这个 JobRunShell 做一个简略理解。

JobRunShell instances are responsible for providing the ‘safe’ environment for Job s to run in, and for performing all of the work of executing the Job, catching ANY thrown exceptions, updating the Trigger with the Job’s completion code, etc.
A JobRunShell instance is created by a JobRunShellFactory on behalf of the QuartzSchedulerThread which then runs the shell in a thread from the configured ThreadPool when the scheduler determines that a Job has been triggered.

JavaDoc 说的十分分明,JobRunShell 其实才是最终负责 Job 运行的,SimpleThreadPool 只是提供了运行 Job 的线程,有工作交进来之后(交进来的其实是持有 job 对象的 JobRunShell 对象)SimpleThreadPool 负责调配一个执行线程、之后用该线程运行该工作。

咱们在后面剖析 SimpleThreadPool 的文章中曾经说过,线程池的执行线程 WorkThread 有一个 Runable 接口的对象,工作触发后 Job 会封装为这个 Runable 对象、而后交给 WorkThread 用一个新的线程执行这个 Runable 对象。

这个 Runable 对象就是 JobRunShell,JobRunShell 实现了 Runable 接口。

所以咱们就从 JobRunShell 的 run 办法动手。

JobRunShell#run

JobRunShell 在初始化的时候封装了一个 JobExecutionContextImpl 对象 jec,其中蕴含了 Job、Trigger、Scheduler 等相干对象。

从 jec 获取到 job,调用 job 的 execute 办法,这里其实就调用到了咱们应用层中实现了 Job 接口对象的 execute 办法,其实是咱们的业务逻辑被调用执行了。

作业调用执行实现之后,回调 QuartzScheduler 的 notifyJobStoreJobComplete 办法,告诉调度器以后 trigger 曾经实现执行。

QuartzScheduler 的 notifyJobStoreJobComplete 办法调用 JobStore 的 triggeredJobComplete 办法。

JobStoreSuppor#triggeredJobComplete

依据 trigger 执行状况 更新 triggers 表中以后 trigger 的状态,如果 trigger 的 nextFireTime 不为空的话更新为 WAITING,期待下次被触发,如果为空的话则更新为 COMPLETED,工作执行实现。除此之外还有其余的挂起、谬误等状态。

以后 trigger 从 fired_triggers 表中删除。

Trigger State

Trigger 的状态其实和 JobStore 无关,也就是说不论是用基于内存的 JobStore,还是基于数据库的 JobStore,对于 Trigger 状态的治理逻辑都是一样的。

Trigger 的状态包含:

  1. WAITING:初始化状态,期待被调度
  2. ACQUIRED:曾经被调度器获取,期待被触发
  3. EXECUTING:正在执行
  4. COMPLETE:执行实现,不再被调度
  5. BLOCKED:阻塞
  6. ERROR:谬误
  7. PAUSED:挂起 / 暂停
  8. PAUSED_BLOCKED:挂起 - 阻塞
  9. DELETED:被删除

Trigger 注册的时候,如果在挂起列表(qrtz_paused_trigger_grps)中的话,状态为 PAUSED,否则状态为 WAITING。

Trigger 被调度器调度之后(状态为 WAITING、nextFiredTime 在 30 秒之内)状态为 ACQUIRED,被作业执行线程调起执行的时候状态变更为 EXECUTING,作业执行实现后,如果 Trigger 不再须要被执行(执行次数或执行工夫达到了设置要求)则状态为 COMPLETE。

设置为 ConcurrentExectionDisallowed 的作业被调度器执行后,以后作业绑定的其余 Trigger 的状态:

  1. WAITING -> BLOCKED
  2. ACQUIRED -> BLOCKED
  3. PAUSED -> PAUSED_BLOCKED

作业被调度器调度执行后,以后 Trigger 的状态:

  1. 如果以后 Trigger 被触发后,nextFiredTime 不为空的话,状态设置为 WAITING,期待下次被触发
  2. 否则,设置为 COMPLETE,触发器完成使命

JobRunShell 实现作业执行后通过调用 notifyJobStoreJobComplete 办法告诉 JobStore 触发器实现本次作业调度后,如果以后 Trigger 绑定的作业设置为 ConcurrentExectionDisallowed,则设置以后作业绑定其余 Trigger 的状态(与作业执行时做相同操作以回复这些触发器到失常状态):

  1. BLOCKED -> WAITING
  2. PAUSED_BLOCKED -> PAUSED

咱们能够通过调用 scheduler 的 pauseTrigger 办法暂停 / 挂起当前任务,调用后触发器的状态:

  1. WAITING -> PAUSED
  2. ACQUIRED -> PAUSED
  3. BLOCKED -> PAUSED_BLOCKED

被暂停的工作被从新调起后,状态复原。

小结

到这儿,开篇设置的本篇文章的工作就实现了,后续的两个问题下一篇文章持续。

Thanks a lot!

上一篇 Quartz 的相干线程

正文完
 0