乐趣区

关于数据库:ElasticJob的执行原理及优化实践

1. Quartz

Quartz 是由 OpenSymphony 提供的弱小的开源任务调度框架,用来执行定时工作。比方每天凌晨三点钟须要从数据库导出数据,这时候就须要一个任务调度框架,帮咱们主动去执行这些程序。那 Quartz 是怎么实现的呢?
1)首先咱们须要定义一个运行业务逻辑的接口,即 Job,咱们的类继承这个接口来实现业务逻辑,比方凌晨三点读取数据库并且导出数据。

2)有了 Job 之后须要按时执行这个 Job,这就须要一个触发器 Trigger,触发器 Trigger 就是依照咱们的要求在每天凌晨三点执行咱们定义的 Job。

3)有了工作 Job 和触发器 Trigger 后,就须要把它们联合起来,让触发器 Trigger 在规定的工夫调用 Job,这时须要一个 Schedule 来实现这个性能。
所以,Quartz 次要有三个局部组成:
调度器:Scheduler
工作:JobDetail
触发器:Trigger,包含 SimpleTrigger 和 CronTrigger
创立一个 Quartz 工作的流程如下:

// 定义一个作业类,实现用户的业务逻辑
public class HelloJob implements Job {
     ......
     实现业务逻辑
}
// 依据作业类失去 JobDetail
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
// 定义一个触发器,依照规定的工夫调度作业
Trigger trigger = TriggerBuilder.newTrigger("每隔 1 分钟执行一次")
// 依据作业类和触发器创立调度器
Scheduler scheduler = scheduler.scheduleJob(jobDetail,trigger);
// 启动调度器,开始执行工作
scheduler .start()

2. Elastic-Job 的基本原理

2.1 分片

Elastic-Job 为了进步工作的并发能力,引入了分片的概念,行将一个工作划分成多个分片,而后由多个执行的机器别离支付这些分片来执行。比方一个数据库中有 1 亿条数据,须要将这些数据读取进去并计算,而后再写入到数据库中。就能够将这 1 亿条数据划分成 10 个分片,每一个分片读取其中的 1 千万条数据,而后计算后写入数据库。这 10 个分片编号为 0,1,2…9,如果有三台机器执行,A 机器分到分片(0,1,2,9),B 机器分到分片(3,4,5),C 机器分到分片(6,7,8)。

2.2 作业调度与执行

Elastic-Job 是去中心化的任务调度框架,当多个节点运行时,会先抉择一个主节点,当达到执行工夫后,每个实例开始执行工作,主节点负责分片的划分,其它节点期待划分实现,主节点将划分后的后果寄存到 zookeeper 中,而后每个节点再从 zookeeper 中获取划分好的分片项,将分片信息作为参数,传入到本地的工作函数中,从而执行工作。

2.3 作业的类型

elastic-job 反对三种类型的作业工作解决!
Simple 类型作业:Simple 类型用于个别工作的解决,只需实现 SimpleJob 接口。该接口仅提供繁多办法用于笼罩,此办法将定时执行,与 Quartz 原生接口类似。
Dataflow 类型作业:Dataflow 类型用于解决数据流,需实现 DataflowJob 接口。该接口提供 2 个办法可供笼罩,别离用于抓取 (fetchData) 和解决 (processData) 数据。
Script 类型作业:Script 类型作业意为脚本类型作业,反对 shell,python,perl 等所有类型脚本。只需通过控制台或代码配置 scriptCommandLine 即可,无需编码。执行脚本门路可蕴含参数,参数传递结束后,作业框架会主动追加最初一个参数为作业运行时信息。

3. Elastic-Job 的执行原理

3.1 Elastic-Job 的启动流程

上面以一个 SimpleJob 类型的工作来阐明 elastic-job 的启动流程

public class MyElasticJob implements SimpleJob {public void execute(ShardingContext context) {
         // 实现业务逻辑
          ......
    }
   
     // 对 zookeeper 进行设置,作为分布式工作的注册核心
    private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("xxxx"));
        regCenter.init();
        return regCenter;
    }

    // 设置工作的执行频率、执行的类
    private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
        // 定义 SIMPLE 类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
        // 定义 Lite 作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        return simpleJobRootConfig;
    }
   // 主函数
 public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}
}

创立一个 Elastic-Job 的工作并执行,步骤如下:
1)须要先设置 zookeeper 的根本信息,Elastic-Job 应用 zookeeper 来进行分布式治理,如选主、元数据存储与读取、分布式监听机制等。
2)创立一个执行工作的 Job 类,以 Simple 类型作业为例,创立一个继承 SimpleJob 的类,在这个类中实现 execute 函数。
3)设置作业的根本信息,在 JobCoreConfiguration 中设置作业的名称(jobName),作业执行的工夫表达式(cron),总的分片数(shardingTotalCount);而后在 SimpleJobConfiguration 中设置执行作业的 Job 类,最初定义 Lite 作业根配置。
4)创立 JobScheduler(作业调度器)实例,而后 JobScheduler 的 init()办法中执行作业的初始化,这样作业就开始运行了。
Elastic-Job 的作业调度在 JobScheduler 中实现,上面具体介绍 JobScheduler 办法。JobScheduler 的定义如下:

public class JobScheduler {
    
    public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
    
    private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
     
    // 作业配置
    private final LiteJobConfiguration liteJobConfig;
    
   // 注册核心 
   private final CoordinatorRegistryCenter regCenter;
    
    // 调度器门面
    private final SchedulerFacade schedulerFacade;
    
    // 作业门面
    private final JobFacade jobFacade;
 
     private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
 
        this.liteJobConfig = liteJobConfig;
 
        this.regCenter = regCenter;
 
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
 
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
 
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
 
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }

如上,在 JobScheduler 的构造方法中,设置好作业配置信息 liteJobConfig、注册核心 regCenter、一系列监听器 elasticJobListenerList,调度器门面,作业门面。
在创立好 JobScheduler 实例后,就进行作业的初始化操作,如下:

/**
     * 初始化作业.
     */
    public void init() {JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(liteJobConfig);
        jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
    }

如上,
1)JobRegistry 是作业注册表,以单例的模式存储作业的元数据,在 JobRegistry 中设置好分片总数等信息。
2)jobScheduleController 是作业调度控制器,在 jobScheduleController 中能够执行:调度作业、从新调度作业、暂停作业、复原作业、立即复原作业。所以作业的开始、暂停、复原都是在 jobScheduleController 中执行的。
3)在作业注册表 JobRegistry 中设置作业名称、作业调度器、注册核心。
4)执行调度器门面 schedulerFacade 的 registerStartUpInfo 办法,在这个办法中注册作业启动信息,代码如下:

/**
     * 注册作业启动信息.
     * 
     * @param liteJobConfig 作业配置
     */
    public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {regCenter.addCacheData("/" + liteJobConfig.getJobName());
        // 开启所有监听器
        listenerManager.startAllListeners();
        // 选举主节点
        leaderService.electLeader();
        // 长久化 job 的配置信息
        configService.persist(liteJobConfig);
        LiteJobConfiguration liteJobConfigFromZk = configService.load(false);
        // 长久化作业服务器上线信息
       serverService.persistOnline(!liteJobConfigFromZk.isDisabled());
        // 长久化作业运行实例上线相干信息,将服务实例注册到 zk
        instanceService.persistOnline();
        // 设置 须要从新分片的标记
        shardingService.setReshardingFlag();
        // 初始化 作业监听服务
        monitorService.listen();
        // 初始化 调解作业不统一状态服务
        if (!reconcileService.isRunning()) {reconcileService.startAsync();
        }
    }

如上,
1)开启所有的监听器,利用 zookeeper 的 watch 机制来监听系统中各种元数据的变动,从而执行相应的操作
2)选举主节点,利用 zookeeper 的分布式锁来抉择一个主节点,主节点次要进行分片的划分。
3)长久化各种元数据到 zookeeper,如作业的配置信息,每个服务实例的信息等
4)设置须要分片的标记,在第一次执行工作或者零碎中服务实例增减时都须要从新分片。
在作业启动信息注册好当前,就调用 jobScheduleController 的 scheduleJob 办法,进行作业的调度,这样作业就开始执行了。scheduleJob 办法的代码如下:

/**
     * 调度作业.
     * 
     * @param cron CRON 表达式
     */
    public void scheduleJob(final String cron) {
        try {if (!scheduler.checkExists(jobDetail.getKey())) {scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);
        }
    }

通过后面 Quartz 的解说可知,scheduler 通过将 jobDetail 和触发器 Trigger 联合,再调用 scheduler.start(),这样就开始了作业调用。
通过下面的代码剖析可知。作业的启动流程如下:

3.2 Elastic-Job 的执行流程

通过后面 Quartz 的解说可知,工作的执行理论是运行 JobDetail 中定义的业务逻辑,咱们只须要看 jobDetail 外面的内容,就能晓得作业执行的过程

private JobDetail createJobDetail(final String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
    // 疏忽其它代码
}

通过下面的代码可知,执行的工作就是 LiteJob 这个类的内容

public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();}
}

LiteJob 通过 JobExecutorFactory 取得到作业执行器(AbstractElasticJobExecutor),并进行执行:

public final class JobExecutorFactory {
    
    /**
     * 获取作业执行器.
     *
     * @param elasticJob 分布式弹性作业
     * @param jobFacade 作业外部服务门面服务
     * @return 作业执行器
     */
    @SuppressWarnings("unchecked")
    public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
        // ScriptJob
        if (null == elasticJob) {return new ScriptJobExecutor(jobFacade);
        }
        // SimpleJob
        if (elasticJob instanceof SimpleJob) {return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        // DataflowJob
        if (elasticJob instanceof DataflowJob) {return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type'%s'", elasticJob.getClass().getCanonicalName());
    }
}

可见,作业执行器工厂 JobExecutorFactory,依据不同的作业类型,返回对应的作业执行器, 而后执行对应作业执行器的 execute()函数。上面看一下 execute 函数

// AbstractElasticJobExecutor.java
public final void execute() {
   // 查看作业执行环境
   try {jobFacade.checkJobExecutionEnvironment();
   } catch (final JobExecutionEnvironmentException cause) {jobExceptionHandler.handleException(jobName, cause);
   }
   // 获取以后作业服务器的分片上下文
   ShardingContexts shardingContexts = jobFacade.getShardingContexts();
   // 公布作业状态追踪事件(State.TASK_STAGING)
   if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job'%s'execute begin.", jobName));
   }
   // 跳过存在运行中的被错过作业
   if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {// 公布作业状态追踪事件(State.TASK_FINISHED)
       if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                   "Previous job'%s'- shardingItems'%s'is still running, misfired job will start after previous job completed.", jobName, 
                   shardingContexts.getShardingItemParameters().keySet()));
       }
       return;
   }
   // 执行作业执行前的办法
   try {jobFacade.beforeJobExecuted(shardingContexts);
       //CHECKSTYLE:OFF
   } catch (final Throwable cause) {
       //CHECKSTYLE:ON
       jobExceptionHandler.handleException(jobName, cause);
   }
   // 执行一般触发的作业
   execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
   // 执行被跳过触发的作业
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
   // 执行作业生效转移
   jobFacade.failoverIfNecessary();
   // 执行作业执行后的办法
   try {jobFacade.afterJobExecuted(shardingContexts);
       //CHECKSTYLE:OFF
   } catch (final Throwable cause) {
       //CHECKSTYLE:ON
       jobExceptionHandler.handleException(jobName, cause);
   }
}

execute 函数的次要流程:

  1. 查看作业执行环境
  2. 获取以后作业服务器的分片上下文。即通过函数 jobFacade.getShardingContexts()获取以后的分片信息,由主节点依据相应的分片策略来进行分片项的划分,划分好之后将划分后果存入到 zookeeper 中,其它节点再从 zookeeper 中获取划分后果。
  3. 公布作业状态追踪事件
  4. 跳过正在运行中的被错过执行的作业
  5. 执行作业执行前的办法
  6. 执行一般触发的作业
    最初,会调用 MyElasticJob 中的 execute 办法,从而达到执行用户业务逻辑的目标。
    整个 Elastic-Job 的执行流程如下:

4. Elastic-Job 的优化实际

4.1 空转问题

Elastic-Job 的作业依照是否有实现类能够分为两种:有实现类的作业和没有实现类的作业。如 Simple 类型和 DataFlow 类型的作业须要用户本人定义实现类,继承 SimpleJob 或者 DataFlowJob 类;另一种是不须要实现类的作业,如 Script 类型作业和 Http 类型作业,对应这种不须要实现类的作业,用户只须要在配置平台填写好相应的配置,咱们后盾再定时的从配置平台拉取最新注册的工作,而后就能够执行用户最新注册的 script 或者 Http 类型的作业。
在生产环境中,执行作业的集群的机器数量很多,然而用户注册的每个作业的分片却很少(大部分只有 1 个分片),依据后面的剖析可知,对应只有一个分片的工作,集群中的所有机器都会参加运行,然而因为只有失去那个分片的机器才会真正运行,其余的都会因为没有分片而空转,这无疑是对计算资源的节约。

4.2 解决方案

为了解决分片数量少、执行服务器多而呈现的空转问题,咱们这边的解决方案是用户在配置平台注册工作时,指定好对应的执行服务器,执行服务器的数量 M = 分片数 +1(多进去的机器作为冗余备份)。如用户的作业分片为 2,后盾依据每天机器以后的负载排序,抉择 3 台负载最轻的机器作为执行服务器。这样当这些机器定时从配置平台拉取工作时,如果发现自己不属于这个工作的执行服务器,就不运行这个作业,只有属于当前任务的执行服务器才运行。这样既保证了可靠性,又防止了过多机器的空转,进步了效率。

5. OPPO 海量作业调度计划

Elastic-Job 通过 zookeeper 来实现弹性分布式的性能,这在任务量很小的时候能够满足用户需要,然而也有以下毛病:

  1. Elastic-Job 的弹性分布式性能强依赖 zookeeper,zookeeper 容易成为性能瓶颈。
  2. 工作划分的分片数可能小于执行工作的实例数,导致一些机器空转。

基于 Elastic-Job 的上述毛病,OPPO 中间件团队在解决海量任务调度时,采纳了集中式的调度计划,用户的作业不须要通过 Quartz 来定时触发,而是通过接管服务器的音讯来触发本地工作。用户先在注册平台注册工作,服务器定时从注册平台的数据库中扫描最近一个周期(30 秒)内须要执行的工作,再依据工作的理论执行工夫生成延时音讯并写入具备延时性能的音讯队列,用户再从音讯队列中拉取数据并触发作业的执行。这种集中式的调度形式由核心服务器来触发音讯执行,既克服了 zookeeper 的性能瓶颈,又防止了工作服务器的空转,可能满足海量工作的执行要求。

总结

Elastic-Job 应用 quartz 来进行作业的调度,同时引入 zookeeper 来实现分布式治理的性能,在高可用计划的根底上减少了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源从而实现了分布式任务调度的性能。同时因为分片的思路,也会导致没有失去分片的服务器处于空转的状态,这在理论的生产中能够设法躲避。

作者简介
Xinchun OPPO 高级后端工程师
目前负责分布式作业调度的研发,关注音讯队列、redis 数据库、ElasticSearch 等中间件技术。

获取更多精彩内容,扫码关注 [OPPO 数智技术] 公众号

退出移动版