1. Quartz
Quartz是由OpenSymphony提供的弱小的开源任务调度框架,用来执行定时工作。比方每天凌晨三点钟须要从数据库导出数据,这时候就须要一个任务调度框架,帮咱们主动去执行这些程序。那Quartz是怎么实现的呢?
1)首先咱们须要定义一个运行业务逻辑的接口,即Job,咱们的类继承这个接口来实现业务逻辑,比方凌晨三点读取数据库并且导出数据。
2)有了Job之后须要按时执行这个Job,这就须要一个触发器Trigger,触发器Trigger就是依照咱们的要求在每天凌晨三点执行咱们定义的Job。
3)有了工作Job和触发器Trigger后,就须要把它们联合起来,让触发器Trigger在规定的工夫调用Job,这时须要一个Schedule来实现这个性能。
所以,Quartz次要有三个局部组成:
调度器:Scheduler
工作:JobDetail
触发器:Trigger,包含SimpleTrigger和CronTrigger
创立一个Quartz工作的流程如下:
//定义一个作业类,实现用户的业务逻辑
public class HelloJob implements Job {
......
实现业务逻辑
}
//依据作业类失去JobDetail
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
//定义一个触发器,依照规定的工夫调度作业
Trigger trigger = TriggerBuilder.newTrigger("每隔1分钟执行一次")
//依据作业类和触发器创立调度器
Scheduler scheduler = scheduler.scheduleJob(jobDetail,trigger);
//启动调度器,开始执行工作
scheduler .start()
2. Elastic-Job的基本原理
2.1 分片
Elastic-Job为了进步工作的并发能力,引入了分片的概念,行将一个工作划分成多个分片,而后由多个执行的机器别离支付这些分片来执行。比方一个数据库中有1亿条数据,须要将这些数据读取进去并计算,而后再写入到数据库中。就能够将这1亿条数据划分成10个分片,每一个分片读取其中的1千万条数据,而后计算后写入数据库。这10个分片编号为0,1,2…9,如果有三台机器执行,A机器分到分片(0,1,2,9),B机器分到分片(3,4,5),C机器分到分片(6,7,8) 。
2.2 作业调度与执行
Elastic-Job是去中心化的任务调度框架,当多个节点运行时,会先抉择一个主节点,当达到执行工夫后,每个实例开始执行工作,主节点负责分片的划分,其它节点期待划分实现,主节点将划分后的后果寄存到zookeeper中,而后每个节点再从zookeeper中获取划分好的分片项,将分片信息作为参数,传入到本地的工作函数中,从而执行工作。
2.3 作业的类型
elastic-job反对三种类型的作业工作解决!
Simple 类型作业:Simple 类型用于个别工作的解决,只需实现SimpleJob接口。该接口仅提供繁多办法用于笼罩,此办法将定时执行,与Quartz原生接口类似。
Dataflow 类型作业:Dataflow 类型用于解决数据流,需实现DataflowJob接口。该接口提供2个办法可供笼罩,别离用于抓取(fetchData)和解决(processData)数据。
Script类型作业:Script 类型作业意为脚本类型作业,反对 shell,python,perl等所有类型脚本。只需通过控制台或代码配置 scriptCommandLine 即可,无需编码。执行脚本门路可蕴含参数,参数传递结束后,作业框架会主动追加最初一个参数为作业运行时信息。
3. Elastic-Job的执行原理
3.1 Elastic-Job的启动流程
上面以一个SimpleJob类型的工作来阐明elastic-job的启动流程
public class MyElasticJob implements SimpleJob {
public void execute(ShardingContext context) {
//实现业务逻辑
......
}
// 对zookeeper进行设置,作为分布式工作的注册核心
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("xxxx"));
regCenter.init();
return regCenter;
}
//设置工作的执行频率、执行的类
private static LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
//主函数
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
}
创立一个Elastic-Job的工作并执行,步骤如下:
1)须要先设置zookeeper的根本信息,Elastic-Job应用zookeeper来进行分布式治理,如选主、元数据存储与读取、分布式监听机制等。
2)创立一个执行工作的Job类,以Simple 类型作业为例,创立一个继承SimpleJob的类,在这个类中实现execute函数。
3)设置作业的根本信息,在JobCoreConfiguration 中设置作业的名称(jobName),作业执行的工夫表达式(cron),总的分片数(shardingTotalCount);而后在SimpleJobConfiguration 中设置执行作业的Job类,最初定义Lite作业根配置。
4)创立JobScheduler(作业调度器)实例,而后JobScheduler的init()办法中执行作业的初始化,这样作业就开始运行了。
Elastic-Job的作业调度在JobScheduler中实现,上面具体介绍JobScheduler办法。JobScheduler的定义如下:
public class JobScheduler {
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
//作业配置
private final LiteJobConfiguration liteJobConfig;
//注册核心
private final CoordinatorRegistryCenter regCenter;
//调度器门面
private final SchedulerFacade schedulerFacade;
//作业门面
private final JobFacade jobFacade;
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
如上,在JobScheduler的构造方法中,设置好作业配置信息liteJobConfig、注册核心regCenter、一系列监听器elasticJobListenerList ,调度器门面,作业门面。
在创立好JobScheduler实例后,就进行作业的初始化操作,如下:
/**
* 初始化作业.
*/
public void init() {
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(liteJobConfig);
jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
}
如上,
1)JobRegistry是作业注册表,以单例的模式存储作业的元数据,在JobRegistry中设置好分片总数等信息。
2)jobScheduleController是作业调度控制器,在jobScheduleController中能够执行:调度作业、从新调度作业、暂停作业、复原作业、立即复原作业。所以作业的开始、暂停、复原都是在jobScheduleController中执行的。
3)在作业注册表JobRegistry中设置作业名称、作业调度器、注册核心。
4)执行调度器门面schedulerFacade的registerStartUpInfo办法,在这个办法中注册作业启动信息,代码如下:
/**
* 注册作业启动信息.
*
* @param liteJobConfig 作业配置
*/
public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
regCenter.addCacheData("/" + liteJobConfig.getJobName());
// 开启所有监听器
listenerManager.startAllListeners();
// 选举主节点
leaderService.electLeader();
//长久化job的配置信息
configService.persist(liteJobConfig);
LiteJobConfiguration liteJobConfigFromZk = configService.load(false);
// 长久化作业服务器上线信息
serverService.persistOnline(!liteJobConfigFromZk.isDisabled());
// 长久化作业运行实例上线相干信息,将服务实例注册到zk
instanceService.persistOnline();
// 设置 须要从新分片的标记
shardingService.setReshardingFlag();
// 初始化 作业监听服务
monitorService.listen();
// 初始化 调解作业不统一状态服务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
如上,
1)开启所有的监听器,利用zookeeper的watch机制来监听系统中各种元数据的变动,从而执行相应的操作
2)选举主节点,利用zookeeper的分布式锁来抉择一个主节点,主节点次要进行分片的划分。
3)长久化各种元数据到zookeeper,如作业的配置信息,每个服务实例的信息等
4)设置须要分片的标记,在第一次执行工作或者零碎中服务实例增减时都须要从新分片。
在作业启动信息注册好当前,就调用jobScheduleController的scheduleJob办法,进行作业的调度,这样作业就开始执行了。scheduleJob办法的代码如下:
/**
* 调度作业.
*
* @param cron CRON表达式
*/
public void scheduleJob(final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
通过后面Quartz的解说可知,scheduler通过将jobDetail和触发器Trigger联合,再调用scheduler.start(),这样就开始了作业调用。
通过下面的代码剖析可知。作业的启动流程如下:
3.2 Elastic-Job的执行流程
通过后面Quartz的解说可知,工作的执行理论是运行JobDetail中定义的业务逻辑,咱们只须要看jobDetail外面的内容,就能晓得作业执行的过程
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
//疏忽其它代码
}
通过下面的代码可知,执行的工作就是LiteJob这个类的内容
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
LiteJob 通过 JobExecutorFactory 取得到作业执行器( AbstractElasticJobExecutor ),并进行执行:
public final class JobExecutorFactory {
/**
* 获取作业执行器.
*
* @param elasticJob 分布式弹性作业
* @param jobFacade 作业外部服务门面服务
* @return 作业执行器
*/
@SuppressWarnings("unchecked")
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
// ScriptJob
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
// SimpleJob
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
// DataflowJob
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
}
可见,作业执行器工厂JobExecutorFactory ,依据不同的作业类型,返回对应的作业执行器,而后执行对应作业执行器的execute()函数。上面看一下execute函数
// AbstractElasticJobExecutor.java
public final void execute() {
// 查看作业执行环境
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 获取以后作业服务器的分片上下文
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// 公布作业状态追踪事件(State.TASK_STAGING)
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 跳过存在运行中的被错过作业
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
// 公布作业状态追踪事件(State.TASK_FINISHED)
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
// 执行作业执行前的办法
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
// 执行一般触发的作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 执行被跳过触发的作业
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 执行作业生效转移
jobFacade.failoverIfNecessary();
// 执行作业执行后的办法
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
execute函数的次要流程:
- 查看作业执行环境
- 获取以后作业服务器的分片上下文。即通过函数jobFacade.getShardingContexts()获取以后的分片信息,由主节点依据相应的分片策略来进行分片项的划分,划分好之后将划分后果存入到zookeeper中,其它节点再从zookeeper中获取划分后果。
- 公布作业状态追踪事件
- 跳过正在运行中的被错过执行的作业
- 执行作业执行前的办法
- 执行一般触发的作业
最初,会调用MyElasticJob中的execute办法,从而达到执行用户业务逻辑的目标。
整个Elastic-Job的执行流程如下:
4. Elastic-Job的优化实际
4.1 空转问题
Elastic-Job的作业依照是否有实现类能够分为两种:有实现类的作业和没有实现类的作业。如Simple类型和DataFlow类型的作业须要用户本人定义实现类,继承SimpleJob或者DataFlowJob类;另一种是不须要实现类的作业,如Script类型作业和Http类型作业,对应这种不须要实现类的作业,用户只须要在配置平台填写好相应的配置,咱们后盾再定时的从配置平台拉取最新注册的工作,而后就能够执行用户最新注册的script或者Http类型的作业。
在生产环境中,执行作业的集群的机器数量很多,然而用户注册的每个作业的分片却很少(大部分只有1个分片),依据后面的剖析可知,对应只有一个分片的工作,集群中的所有机器都会参加运行,然而因为只有失去那个分片的机器才会真正运行,其余的都会因为没有分片而空转,这无疑是对计算资源的节约。
4.2 解决方案
为了解决分片数量少、执行服务器多而呈现的空转问题,咱们这边的解决方案是用户在配置平台注册工作时,指定好对应的执行服务器,执行服务器的数量M=分片数+1(多进去的机器作为冗余备份)。如用户的作业分片为2, 后盾依据每天机器以后的负载排序,抉择3台负载最轻的机器作为执行服务器。这样当这些机器定时从配置平台拉取工作时,如果发现自己不属于这个工作的执行服务器,就不运行这个作业,只有属于当前任务的执行服务器才运行。这样既保证了可靠性,又防止了过多机器的空转,进步了效率。
5. OPPO海量作业调度计划
Elastic-Job通过zookeeper来实现弹性分布式的性能,这在任务量很小的时候能够满足用户需要,然而也有以下毛病:
- Elastic-Job的弹性分布式性能强依赖zookeeper,zookeeper容易成为性能瓶颈。
- 工作划分的分片数可能小于执行工作的实例数,导致一些机器空转。
基于Elastic-Job的上述毛病,OPPO中间件团队在解决海量任务调度时,采纳了集中式的调度计划,用户的作业不须要通过Quartz来定时触发,而是通过接管服务器的音讯来触发本地工作。用户先在注册平台注册工作,服务器定时从注册平台的数据库中扫描最近一个周期(30秒)内须要执行的工作,再依据工作的理论执行工夫生成延时音讯并写入具备延时性能的音讯队列,用户再从音讯队列中拉取数据并触发作业的执行。这种集中式的调度形式由核心服务器来触发音讯执行,既克服了zookeeper的性能瓶颈,又防止了工作服务器的空转,可能满足海量工作的执行要求。
总结
Elastic-Job应用quartz来进行作业的调度,同时引入zookeeper来实现分布式治理的性能,在高可用计划的根底上减少了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源从而实现了分布式任务调度的性能。同时因为分片的思路,也会导致没有失去分片的服务器处于空转的状态,这在理论的生产中能够设法躲避。
作者简介
Xinchun OPPO高级后端工程师
目前负责分布式作业调度的研发,关注音讯队列、redis数据库、ElasticSearch等中间件技术。
获取更多精彩内容,扫码关注[OPPO数智技术]公众号
发表回复