关于java:Quartz-相关线程

39次阅读

共计 4802 个字符,预计需要花费 13 分钟才能阅读完成。

咱们在后面文章中说到过 Quartz 波及到的线程,然而散落在几篇文章中,不好找。而 Quartz 波及到的线程对于了解 Quartz 也比拟重要,所以明天专门提取进去独自说一下。

Quartz 中的次要线程:

  1. 任务调度线程 QuartzSchedulerThread
  2. 工作执行线程
  3. Misfire 解决线程
  4. ClusterManager 线程

任务调度线程 QuartzSchedulerThread

QuartzSchedulerThread 是任务调度线程,他的职责是对满足触发条件(nextFireTimer 到了)的注册到 JobStore 的 Trigger 调配给可用的工作执行线程去执行。

QuartzSchedulerThread 启动

任务调度线程 QuartzSchedulerThread 是在调度器 Scheduler 创立的时候启动的。

应用层通过以下调用创立 Scheduler:

Scheduler sche = new StdSchedulerFactory().getScheduler();

个别状况下通过 StdSchedulerFactory 构建 Scheduler,getScheduler 首先尝试从 SchedulerRepository 获取 Schedule,首次运行获取不到,则通过 instantiate() 办法获取。

public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {if (sched.isShutdown()) {schedRep.remove(getSchedulerName());
            } else {return sched;}
        }

        sched = instantiate();

        return sched;
    }

instantiate() 办法特地特地特地长,简直就是在这里实现 Quartz 所有相干组件的初始化的。

其中会创立 QuartzScheduler,之后会将 QuartzScheduler 包装到 stdScheduler 中存入 SchedulerRepository。

instantiate() 创立 QuartzScheduler 是调用的构造方法:

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + "created.");
    }

能够看到构造方法中创立了 QuartzSchedulerThread 对象,之后获取 ThreadExecutor(个别状况下为 DefaultThreadExecutor)并通过调用其 execute 办法启动 QuartzSchedulerThread 线程:

public class DefaultThreadExecutor implements ThreadExecutor {public void initialize() { }

    public void execute(Thread thread) {thread.start();
    }

}
QuartzSchedulerThread 的运行

其实就是他的 run 办法,咱们也大略剖析过,其次要逻辑是:

  1. 从作业执行线程池获取 availThreadCount,也就是以后可用的线程数
  2. 调用 JobStore 的 acquireNextTriggers 办法,获取特定短时间(idleWaitTime,默认 30 秒)内可能须要被触发的,数量不超过 availThreadCount 的触发器
  3. 调用 JobStore 的 triggersFired 办法对获取到的可能须要被触发的触发器进行二次加工,再次获取到最终的待触发器后果集
  4. 循环解决最终的待处理触发器后果集中的每一个须要被触发的触发器
  5. 用 JobRunShell 包装该触发器,送给线程池执行该触发器关联的作业

好了,作业调度线程 QuartzSchedulerThread 咱们就根本搞清楚了。

作业执行线程

Quartz 的作业执行线程是放在线程池中进行治理的,默认是 SimpleTreadPool,无关 SimpleThreadPool 咱们后面专门有一篇文章介绍过,这里就不再赘述了。

作业执行线程和作业调度线程一样,也是在作业调度器 Scheduler 创立后立刻启动,这个过程同样也是在 StdSchedulerFactory 的 instantiate() 办法中实现的:

instantiate() 创立 SimpleThreadPool 之后会调用 SimpleThreadPool 的 initialize 办法,依据配置文件指定的工作执行线程数实现工作线程的初始化和启动。比方配置未见设置为 10 则初始化 10 个工作线程并一一启动。作业执行线程的初始化及启动的具体过程请参考 Quartz – SimpleThreadPool。

MisfireHandler 线程

如果你的我的项目应用 RAMJobStore,而不是 JDBC-based JobStore(指须要长久化到数据库的 JobStore),那么就不存在 Misfire 解决线程。

因为 RAMJobStore 在解决失常触发的过程中顺便就解决了 Misfire,所以就不再须要其余解决机制了,这部分咱们在后面的文章中也剖析过 Quartz – Misfire(for RAMJobstore)。

JDBC-based JobStore 在解决失常触发的时候只获取未错过触发工夫的触发器,对于错过触发工夫的、也就是 Misfire 的触发器就须要另外的机制来解决。

Misfire 解决线程就是 Quartz 采纳 JDBC-based JobStore 的状况下用来解决错过触发机会的触发器的线程。

MisfireHandler 启动

MisfireHandler 定义在 JobStoreSupport 类中,JobStoreSupport 是 JobStore 的 JDBC-based JobStore 的虚构类,Quartz 次要提供了两个基于 JDBC 的 JobStore 的实现:JobStoreTX、JobStoreCMT。JDBC-based JobStore 咱们当前剖析,明天次要剖析 MisfireHandler。

咱们在利用中创立任务调度器 Scheduler 后须要调用他的 start 办法:

 Scheduler sche = new StdSchedulerFactory().getScheduler();
            sche.scheduleJob(jobDetail,trigger);
            sche.start();

这个 start 办法会调用到 JobStore 的 schedulerStarted() 办法,如果咱们利用中采纳的是 JDBC-based JobStore 的话,会调用到 JobStoreSupport 的 schedulerStarted(),其中会创立 MisfireHandler 之后调用 MisfireHandler 的 initialize():

misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();

Misfire 的 initialize 将创立好的 MisFireHandle 线程交给 ThreadExecutor 启动。

public void initialize() {ThreadExecutor executor = getThreadExecutor();
            executor.execute(MisfireHandler.this);
        }
MisfireHandle 的运行

也就是 MisfireHandle 的 run 办法。解决逻辑和 RAMJobStore 解决 misfired trigger 的逻辑相似,只不过 MisfireHandle 的所有解决逻辑都是通过数据库操作实现的。

JDBC-Based JobStore 对应的表构造咱们会找机会专门剖析,这里就不具体开展了。

MisfireHandle 的 run 办法的次要逻辑为:

  1. 从数据库中获取在 WAITING(期待执行)状态、下次执行工夫小于 msifiredtime(以后工夫 – MisfireThreshold) 的触发器,也就是错过触发工夫的触发器
  2. 为了防止存在大量 misfired trigger 的状况下,一次解决太多数据影响其余失常触发器的执行,MisfireHandle 线程每次仅获取局部而不是全副 misfired trigger(参数 maxToRecoverAtATime 指定,默认为 20)
  3. 对获取到的每一个错过执行工夫的触发器(misfired trigger), 调用触发器的 updateAfterMisfire 办法获取下次执行工夫,updateAfterMisfire 办法咱们在上一篇讲 Misfire 解决策略的文章中说过,就是依据触发器的解决策略获取下次执行工夫
  4. updateAfterMisfire 办法执行后,获取到的触发器的下次执行工夫如果不为空的话,更新到数据库中,期待失常的工作执行线程调度执行

ClusterManager 线程

与 MisfiredHandle 一样,ClusterManager 线程也是 JDBC-Based JobStore 特有的。

顾名思义,ClusterManager 线程与集群有关系,JDBC-Based JobStore 是能够反对 Quartz 的集群部署的,在集群环境下,Quartz 服务节点可能会 down 机、掉线,从而影响工作的执行,ClusterManager 线程就是负责查看 Quartz 服务节点的在线状态的,如果产生掉线后,将该服务节点负责的触发器交给其余服务节点来解决。

具体逻辑等到咱们剖析完 Quartz Cluster 之后补充。

上一篇 easypoi 模板导出 foreach 单行多后果集 + 合并单元格问题

正文完
 0