1. 前言
Schedulerx2.0 的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖 schedulerx-worker 这个 jar 包,通过 schedulerx2.0 提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。
这篇文章重点是介绍基于 schedulerx2.0 的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)
2. 可扩展的执行引擎
Worker 总体架构参考 Yarn 的架构,分为 TaskMaster, Container, Processor 三层:
- TaskMaster:类似于 yarn 的 AppMaster,支持可扩展的分布式执行框架,进行整个 jobInstance 的生命周期管理、container 的资源管理,同时还有 failover 等能力。默认实现 StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
- Container:执行业务逻辑的容器框架,支持线程 / 进程 /docker/actor 等。
- Processor:业务逻辑框架,不同的 processor 表示不同的任务类型。
以 MapTaskMaster 为例,大概的原理如下图所示:
3. 分布式编程模型之 Map 模型
Schedulerx2.0 提供了多种分布式编程模型,这篇文章主要介绍 Map 模型(之后的文章还会介绍 MapReduce 模型,适用更多的业务场景),简单几行代码就可以将海量数据分布式到多台机器上进行分布式跑批,非常简单易用。
针对不同的跑批场景,map 模型作业还提供了并行计算、内存网格、网格计算三种执行方式:
- 并行计算:子任务 300 以下,有子任务列表。
- 内存网格:子任务 5W 以下,无子任务列表,速度快。
- 网格计算:子任务 100W 以下,无子任务列表。
4. 并行计算原理
因为并行任务具有子任务列表:
如上图,子任务列表可以看到每个子任务的状态、机器,还有重跑、查看日志等操作。
因为并行计算要做到子任务级别的可视化,并且 worker 挂了、重启还能支持手动重跑,就需要把 task 持久化到 server 端:
如上图所示:
- server 触发 jobInstance 到某个 worker,选中为 master。
- MapTaskMaster 选择某个 worker 执行 root 任务,当执行 map 方法时,会回调 MapTaskMaster。
- MapTaskMaster 收到 map 方法,会把 task 持久化到 server 端。
- 同时,MapTaskMaster 还有个 pull 线程,不停拉取 INIT 状态的 task,并派发给其他 worker 执行。
5. 网格计算原理
网格计算要支持百万级别的 task,如果所有任务都往 server 回写,server 肯定扛不住,所以网格计算的存储实际上是分布式在用户自己的机器上的:
如上图所示:
- server 触发 jobInstance 到某个 worker,选中为 master。
- MapTaskMaster 选择某个 worker 执行 root 任务,当执行 map 方法时,会回调 MapTaskMaster。
- MapTaskMaster 收到 map 方法,会把 task 持久化到本地 h2 数据库。
- 同时,MapTaskMaster 还有个 pull 线程,不停拉取 INIT 状态的 task,并派发给其他 worker 执行。
6. 最佳实践
6.1 需求
举个例子:
- 读取 A 表中 status= 0 的数据。
- 处理这些数据,插入 B 表。
- 把 A 表中处理过的数据的修改 status=1。
- 数据量有 4 亿 +,希望缩短时间。
6.2 反面案例
我们先看下如下代码是否有问题?
public class ScanSingleTableProcessor extends MapJobProcessor {
private static int pageSize = 1000;
@Override
public ProcessResult process(JobContext context) {String taskName = context.getTaskName();
Object task = context.getTask();
if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {int recordCount = queryRecordCount();
int pageAmount = recordCount / pageSize;// 计算分页数量
for(int i = 0 ; i < pageAmount ; i ++) {List<Record> recordList = queryRecord(i);// 根据分页查询一页数据
map(recordList, "record 记录");// 把子任务分发出去并行处理
}
return new ProcessResult(true);//true 表示执行成功,false 表示失败
} else if ("record 记录".equals(taskName)) {
//TODO
return new ProcessResult(true);
}
return new ProcessResult(false);
}
}
如上面的代码所示,在 root 任务中,会把数据库所有记录读取出来,每一行就是一个 Record,然后分发出去,分布式到不同的 worker 上去执行。逻辑是没有问题的,但是实际上性能非常的差。结合网格计算原理,我们把上面的代码绘制成下面这幅图:
如上图所示,root 任务一开始会全量的读取 A 表的数据,然后会全量的存到 h2 中,pull 线程还会全量的从 h2 读取一次所有的 task,还会分发给所有客户端。所以实际上对 A 表中的数据:
- 全量读 2 次
- 全量写一次
- 全量传输一次
这个效率是非常低的。
6.3 正面案例
下面给出正面案例的代码:
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {return startId;}
public int getEndId() {return endId;}
}
@Override
public ProcessResult process(JobContext context) {String taskName = context.getTaskName();
Object task = context.getTask();
if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); // 计算分页数量
for (int i = minId; i < maxId; i+=step) {taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {//TODO select min(id),max(id) from xxx
return null;
}
}
如上面的代码所示,
- 每个 task 不是整行记录的 record,而是 PageTask,里面就 2 个字段,startId 和 endId。
- root 任务,没有全量的读取 A 表,而是读一下整张表的 minId 和 maxId,然后构造 PageTask 进行分页。比如 task1 表示 PageTask[1,1000],task2 表示 PageTask[1001,2000]。每个 task 处理 A 表不同的数据。
- 在下一级 task 中,如果拿到的是 PageTask,再根据 id 区间去 A 表处理数据。
根据上面的代码和网格计算原理,得出下面这幅图:
如上图所示,
- A 表只需要全量读取一次。
- 子任务数量比反面案例少了上千、上万倍。
- 子任务的 body 非常小,如果 recod 中有大字段,也少了上千、上万倍。
综上,对 A 表访问次数少了好几倍,对 h2 存储压力少了上万倍,不但执行速度可以快很多,还保证不会把自己本地的 h2 数据库搞挂。
本文作者:黄晓萌
阅读原文
本文为云栖社区原创内容,未经允许不得转载。