乐趣区

关于阿里云:基于任务调度的企业级分布式批处理方案

作者:姚辉 (千习)

背景

先来谈下什么是分布式批处理,从字面来了解就是有大批量的业务数据须要应用程序去批量计算解决,而通过单机模式去执行会消耗很长的解决工夫,也不能充分发挥业务集群中每个利用节点解决能力。通过一些常见的分布式批处理计划,能够无效地让业务集群中所有业务利用节点协同实现一个大批量数据处理的工作,从而晋升整体的解决效率和解决可靠性。

批处理模型

在简略单机场景下能够开启多线程来同时解决一个大工作,在多个机器下能够由多台机器同时并行处理同一个工作。因而,分布式批处理计划须要为开发者在代码开发层面屏蔽上述工作切分后散发、并行执行、后果汇聚、失败容错、动静扩容等业务利用集群间的分布式协调逻辑,让使用者仅聚焦于上述红框形容的业务逻辑分片规定和业务逻辑解决即可。

大数据批处理比拟

在大数据处理场景中咱们也会用到 MapReduce 模型,其解决逻辑实质与咱们要探讨的业务批处理逻辑是统一的。在大数据场景下的批处理次要是面向数据自身的解决,并须要部署相应大数据平台集群来反对数据存储和数据批处理程序处理,因而该场景下次要目标是用于构建一个残缺的数据平台。与大数据批处理场景相比拟,本次更次要聚焦探讨分布式业务批处理场景,基于现有业务应用服务集群构建分布式批处理逻辑。通过分布式批处理计划能够解决以下需要

  • 对耗时业务逻辑解耦,保障外围链路业务解决疾速响应
  • 充沛调度业务集群所有利用节点单干批量实现业务解决
  • 有别于大数据处理,子工作处理过程中还会有调用其余在线业务服务参加批处理过程

开源批处理计划

ElasticJob

ElasticJob 是一款分布式任务调度框架,其次要特点是在 Quartz 根底上实现定时调度并提供在业务集群中对工作进行分片协调解决能力。在整个架构上基于 Zookeeper 来实现工作分片执行、利用集群动静弹性调度、子工作执行高可用。分片调度模型可反对大批量业务数据处理平衡的散发至业务集群中的每一个节点进行解决,无效地进步了工作解决效率。

  • SimpleJob

Spring Boot 工程可通过 YAML 配置工作定义,指定以下内容:工作实现类、定时调度周期、分片信息。

elasticjob:
  regCenter:
    serverLists: 127.0.0.1:2181
    namespace: elasticjob-lite-springboot
  jobs:
    simpleJob:
      elasticJobClass: org.example.job.SpringBootSimpleJob
      cron: 0/5 * * * * ?
      overwrite: true
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

配置的 org.example.job.SpringBootSimpleJob 类须要实现 SimpleJob 接口的 execute 办法,并且通过 ShardingContext 参数获取对应业务分片数据进行业务逻辑解决。

@Component
public class SpringBootSimpleJob implements SimpleJob {
    @Override
    public void execute(final ShardingContext shardingContext) {String value = shardingContext.getShardingParameter();
        System.out.println("simple.process->"+value);
    }
}

咱们部署 3 个应用服务作为调度解决集群解决上述工作,当工作触发运行时,ElasticJob 就会将对应 3 个分片工作别离给 3 个应用服务进行解决来实现整个工作数据处理。

  • DataflowJob

DataflowJob 目前来看实质上跟 SimpleJob 在整体的构造上并无本质区别。参考如下接口,相比 SimpleJob 其减少了 fetchData 办法供业务方自行实现加载要解决的数据,理论就是将 SimpleJob 的 execute 办法在逻辑定义上拆解成两个步骤。惟一区别在于 DataflowJob 提供一种常驻的数据处理工作(可称为:streaming process),反对工作常驻运行直至 fetchData 为空。

public interface DataflowJob<T> extends ElasticJob {

    /**
     * Fetch to be processed data.
     *
     * @param shardingContext sharding context
     * @return to be processed data
     */
    List<T> fetchData(ShardingContext shardingContext);

    /**
     * Process data.
     *
     * @param shardingContext sharding context
     * @param data to be processed data
     */
    void processData(ShardingContext shardingContext, List<T> data);
}

在 DataflowJob 工作的 yaml 配置上增加 props: streaming.process=true,即可实现该工作 streaming process 的成果。当工作被触发执行后,每个分片工作将按对应流程:fetchData->processData->fetchData 循环执行直到 fetchData 为空。该模式场景剖析:

  • 单个分片工作待数据量大,fetchData 时读取该分片局部分页数据进行解决直至所有数据处理结束
  • 分片待数据继续产生,使工作通过 fetchData 始终获取数据,实现长期驻留继续地进行业务数据处理
elasticjob:
  regCenter:
    serverLists: 127.0.0.1:2181
    namespace: elasticjob-lite-springboot
  jobs:
    dataflowJob:
      elasticJobClass: org.example.job.SpringBootDataflowJob
      cron: 0/5 * * * * ?
      overwrite: true
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      props:
        # 开启 streaming process
        streaming.process: true
  • 个性剖析

ElasticJob 的分布式分片调度模型,对常见简略的批处理场景提供了很大的便当反对,解决了一个大批量业务数据处理分布式切分执行的整个协调过程。另外在以下一些方面可能还存在些有余:

  • 整个架构的外围取决于 ZK 稳定性
    • 须要额定运维部署并且要保障其高可用
    • 大量工作存储触发运行过程都依赖 ZK,当任务量大时 ZK 集群容易成为调度性能瓶颈
  • 分片配置数量固定,不反对动静分片
    • 如每个分片待处理数据量差别大时,容易突破集群解决能力均衡
    • 如分片定义不合理,当集群规模远大于分片数量时集群弹性失去成果
    • 分片定义与业务逻辑较为割裂,人为维持两者之间分割比拟麻烦
  • 管控台能力弱

Spring Batch 批处理框架

Spring Batch 批处理框架,其提供轻量且欠缺批处理能力。Spring Batch 工作批处理框次要提供:单过程多线程解决、分布式多过程解决两种形式。在单过程多线程解决模式下,用户可自行定一个 Job 作为一个批处理工作单元,Job 是由一个或多个 Step 步骤进行串联或并行组成,每一个 Step 又别离由 reader、process、writer 形成来实现每一步工作的读取、解决、输入。后续次要探讨一个 Job 只蕴含一个 Step 的场景进行剖析。

Spring Batch 框架集体感觉单过程下多线程实际意义并不是太大,次要是在较小批量数据工作解决采纳该框架来实现有点费功夫,齐全能够自行开线程池来解决问题。本次探讨次要聚焦于肯定规模的业务集群下分布式协同实现业务数据批处理工作的场景。在 Spring Batch 中提供了近程分片 / 分区解决能力,在 Job 的 Step 中可依据特定规定将工作拆分成多个子工作并分发给集群中其余的 worker 来解决,以实现分布式并行批处理解决能力。其近程交互能力常见是借助第三方消息中间件来实现子工作的散发和执行后果汇聚。

  • 近程分块(Remote Chunking)

近程分块是 Spring Batch 在解决大批量数据工作时提供的一种分布式批处理解决方案,它能够做到在一个 Step 步骤中通过 ItemReader 加载数据构建成多个 Chunk 块,并由 ItemWriter 将这多个分块通过消息中间件或其余模式散发至集群节点,由集群利用节点对每一个 Chunk 块进行业务解决。

Remote Chunking 示例

在上述主节点 ItemReader 和 ItemWriter 能够映射为本次探讨的批处理模型中的“工作拆分 -split”阶段,主节点对 ItemWriter 可采纳 Spring Batch Integration 提供的 ChunkMessageChannelItemWriter,该组件通过集成 Spring Integration 提供的其余通道(如:AMQP、JMS)实现批处理工作数据加载和分块散发。


    @Bean
    public Job remoteChunkingJob() {return jobBuilderFactory.get("remoteChunkingJob")
             .start(stepBuilderFactory.get("step2")
                     .<Integer, Integer>chunk(2) // 每 Chunk 块蕴含 reader 加载的记录数
                     .reader(itemReader())
                     // 采纳 ChunkMessageChannelItemWriter 散发 Chunk 块
                     .writer(itemWriter())
                     .build())
             .build();}

    @Bean
    public ItemReader<Integer> itemReader() {return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requests());
        ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
        chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
        chunkMessageChannelItemWriter.setReplyChannel(replies());
        return chunkMessageChannelItemWriter;
    }
    // 省略了相干消息中间件对接通道配置 

Slave 节点次要是对散发过去的 Chunk 块数据(可了解为子工作)进行对应业务逻辑解决和数据后果输入。因而,在子工作解决端须要通过配置 Spring Batch Integration 提供的 ChunkProcessorChunkHandler 来实现子工作接管、理论业务解决、反馈处理结果等相干动作。

    // 省略了相干消息中间件对接通道配置

    // 接管分块工作降级及反馈执行后果
    @Bean
    @ServiceActivator(inputChannel = "slaveRequests", outputChannel = "slaveReplies")
    public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor(slaveItemProcessor(), slaveItemWriter());
        ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
        chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
        return chunkProcessorChunkHandler;
    }

    // 理论业务须要开发的工作解决逻辑 processor
    @Bean
    public SlaveItemProcessor slaveItemProcessor(){ return new SlaveItemProcessor();}

    // 理论业务须要开发的工作解决逻辑 writer
    @Bean
    public SlaveItemWriter slaveItemWriter(){ return new SlaveItemWriter();}
  • 近程分区(Remote Partitioning)

近程分区与近程分块次要区别在于 master 节点不负责数据加载,可了解为将以后 Step 通过 Partitioner 拆分出多个子 Step(也能够了解为子工作),而后通过 PartitionHandler 将对应的子工作分发给各个 Slave 节点解决,为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler 来实现对应的子工作散发,其底层也是须要依赖消息中间件等进行适配对接。在每个 Slave 节点须要读取子工作 Step 的上下文信息,依据该信息进行残缺的 ItemReader、ItemProcess、ItemWrite 解决。

  • 个性剖析

Spring Batch 框架,综合个性剖析:

  • 具备齐备批处理能力:反对单机多线程、分布式多过程协同批处理解决,反对自定义的分片模型。
  • 缺定时调度反对:原生无定时调度能力需集成三方定时框架(如:Spring Task 需自行解决集群反复触发)。
  • 可视化管控能力弱:Spring Batch 常见采纳程序或文件配置工作,管控台需额定搭建且管控能力较弱。
  • 集成难度高:其分布式批处理能力需额定第三方中间件集成搭建,或基于其接口自行扩大开发;基于官网提供的形式实现企业级应用须要绝对简单布局集成。

企业级批处理计划 -SchedulerX 可视化 MapReduce 工作

SchedulerX 任务调度平台针对企业级批处理需要提供了欠缺的整体解决方案,用户可间接采纳私有云平台的服务即可轻松实现业务利用集群的分布式批处理能力(用户非阿里云业务利用部署也可反对对接),无需额定部署其余中间件集成保护。

原理分析

在整个解决方案中,任务调度平台为用户注册的工作提供全方位的可视化管控、高牢靠定时调度以及可视化查问能力。另外,在用户业务利用侧通过集成 SchedulerX SDK,即可实现分布式批处理能力的疾速接入。此时用户仅需关怀批处理模型中子工作业务切分规定、每个子工作解决逻辑即可。这个分布式批处理过程中具备以下个性:

  • 子工作高可用:当集群执行节点宕机时,反对主动 failover 将掉线机器上对子工作从新分发给其余节点
  • 主动弹性扩容:当集群中有新对利用节点部署上来后,能主动参加到后续工作的执行过程中
  • 可视化能力:为工作和子工作的执行过程提供各类监控运维及业务日志查问能力

上面形容下大抵的原理过程:

  • 在平台创立 MapReduce 工作后,定时调度服务会为它开启高牢靠的定时触发执行
  • 当 MapReduce 工作触发执行时,调度服务会在接入上来的业务 Worker 节点中抉择一个节点作为本次工作运行的主节点
  • 主节点运行执行用户自定义开发的子工作切分加载逻辑,并通过 map 办法调用给集群中其余 worker 节点平衡地散发子工作解决申请
  • 主节点会监控整个分布式批处理工作的处理过程,以及每个 Worker 节点衰弱监控,保障整体运行高可用
  • 其余各个 worker 节点在接管子工作解决申请后,开始回调执行用户自定义的业务逻辑,最终实现每个子工作的解决需要;并且能够配置单个利用节点同时解决子工作的并行线程数。
  • 所有子工作实现后,主节点将汇聚所有子工作执行后果回调 reduce 办法,并反馈调度平台记录本次执行后果

开发者只需在业务利用中实现一个 MapReduceJobProcessor 抽象类,在 isRootTask 中加载本次须要解决的业务子工作数据对象列表;在非 root 申请中通过 jobContext.getTask() 获取单个子工作对象信息,依据该信息执行业务解决逻辑。在业务利用部署公布至集群节点后,当工作触发运行时集群所有节点会参加协调整个分布式批处理工作执行直至实现。

public class MapReduceHelloProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult reduce(JobContext jobContext) throws Exception {
        // 所有子工作实现的汇聚逻辑解决回调,可选实现
        jobContext.getTaskResults();
        return new ProcessResult(true, "处理结果数量集:" + jobContext.getTaskResults().size());
    }

    @Override
    public ProcessResult process(JobContext jobContext) throws Exception {if (isRootTask(jobContext)) {
            List<String> list = // 加载业务待处理的子工作列表
            // 回调 sdk 提供的 map 办法,主动实现子工作散发
            ProcessResult result = map(list, "SecondDataProcess");
            return result;
        } else {
            // 取得单个子工作数据信息,进行单个子工作业务解决
            String data = (String) jobContext.getTask();
            // ... 业务逻辑解决补充 ...
            return new ProcessResult(true, "数据处理胜利!");
        }
    }
}

性能劣势

  • 子工作可视化能力

用户大盘:提供了所有工作的触发运行可视化记录信息。

可视化子工作详情:通过查问工作执行记录详情,可取得每一个子工作执行状态及所在节点。

  • 子工作业务日志

在子工作列表中点击“日志”,能够取得以后子工作处理过程中的日志记录信息。

  • 执行堆栈查看

执行堆栈查看性能,可用于在子工作处理过程中呈现卡住始终运行未完结的场景下,不便排查对应执行线程栈信息。

  • 自定义业务标签

子工作业务标签能力,为用户提供了疾速可视化的子工作业务信息查看和查问能力。在下图中“账户名称”是本次子工作切分进去的业务标签信息,用户可基于该信息疾速理解对应业务子工作的解决状态,并反对查问指定业务标签信息的子工作解决状态。

如何为子工作配置自定义标签,只需对本次 map 散发的子工作对象实现 BizSubTask 接口,并实现其 labelMap 办法即可为每个子工作增加其专属的业务特色标签用于可视化查问。

public class AccountTransferProcessor extends MapReduceJobProcessor {private static final Logger logger = LoggerFactory.getLogger("schedulerxLog");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {if(isRootTask(context)){logger.info("split task list size:20");
            List<AccountInfo> list = new LinkedList();
            for(int i=0; i < 20; i++){list.add(new AccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4,"0"),"AC"+StringUtils.leftPad(i+"", 12, "0")));
            }
            return map(list, "transfer");
        }else {logger.info("start biz process...");
            logger.info("task info:"+context.getTask().toString());
            TimeUnit.SECONDS.sleep(30L);
            logger.info("start biz process end.");
            return new ProcessResult(true);
        }
    }
}

public class AccountInfo implements BizSubTask {

        private long id;

        private String name;

        private String accountId;

        public AccountInfo(long id, String name, String accountId) {
            this.id = id;
            this.name = name;
            this.accountId = accountId;
        }

        // 子工作标签信息设置
        @Override
        public Map<String, String> labelMap() {Map<String, String> labelMap = new HashMap();
            labelMap.put("账户名称", name);
            return labelMap;
        }
    }
  • 兼容开源

SchedulerX 反对基于常见开源框架编写的执行器,包含:XXL-Job、ElasticJob,后续调度平台还将打算反对调度 Spring Batch 工作。

案例场景

分布式批处理模型(可视化 MapReduce 模型),在理论企业级利用中是有大量的需要场景存在。一些常见的应用场景如:

  • 针对分库分表数据批量并行处理,将分库或分表信息作为子工作对象在集群节点间散发实现并行处理
  • 按城市区域的物流订单数据处理,将城市和区域作为子工作对象在集群节点间散发实现并行处理
  • 鉴于可视化 MapReduce 子工作可视化能力,可将重点客户 / 订单信息作为子工作解决对象,来进行相应数据报表解决或信息推送,以实现重要子工作的可视化跟踪解决
  • 基金销售业务案例

以下提供一个基金销售业务案例以供参考如果应用分布式批处理模型,以便使用者在本人的业务场景下自由发挥。案例阐明:在基金公司与基金销售公司(如:蚂蚁财产)之间每天会有投资者的账户 / 交易申请数据同步解决,其往往采纳的是文件数据交互,一个基金公司对接着 N 多家销售商(反之亦然),每家销售商提供的数据文件齐全独立;每一个销售商的数据文件都须要通过文件校验、接口文件解析、数据校验、数据导入这么几个固定步骤。基金公司在解决上述固定步骤就非常适合采纳分布式批处理形式以放慢数据文件解决,以每个销售商为子工作对象散发至集群中,所有利用节点参加解析各自调配到的不同销售商数据文件解决。

@Component
public class FileImportJob extends MapReduceJobProcessor {private static final Logger logger = LoggerFactory.getLogger("schedulerx");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {if(isRootTask(context)){
            // ---------------------------------------------------------
            // Step1. 读取对接的销售商列表 Code
            // ---------------------------------------------------------
            logger.info("以销售商为维度构建子工作列表...");

            // 伪代码从数据库读取销售商列表,Agency 类须要实现 BizSubTask 接口并可将
            // 销售商名称 / 编码作为子工作标签,以便控制台可视化跟踪
            List<Agency> agencylist = getAgencyListFromDb();
            return map(agencylist, "fileImport");
        }else {
            // ---------------------------------------------------------
            // Step2. 针对单个销售商进行对应文件数据的解决
            // ---------------------------------------------------------
            Agency agency = (Agency)context.getTask();
            File file = loadFile(agency);
            logger.info("文件加载实现.");
            validFile(file);
            logger.info("文件校验通过.");
            List<Request> request = resolveRequest(file);
            logger.info("文件数据解析实现.");
            List<Request> request = checkRequest(request);
            logger.info("申请数据查看通过.");
            importRequest(request);
            logger.info("申请数据导入实现.");
            return new ProcessResult(true);
        }
    }
}

案例次要是将基金交易清理中的一个业务环节,采纳并行批处理形式来进行解决,其后续每一个解决环节也能够采纳相似形式解决。另外,每一个可视化 MapReduce 工作节点通过 DAG 依赖编排可构建一个残缺的主动业务清理流程。

总结

分布式任务调度平台 SchedulerX 为企业级分布式批处理提供来欠缺的解决方案,为用户提供了疾速易用的接入模式,并反对定时调度、可视化运行跟踪、可管控简运维、高可用的调度服务,同时配套企业级监控大盘、日志服务、监控报警等能力。

参考文献:

Spring Batch Integration:

https://docs.spring.io/spring…

ElasticJob:

https://shardingsphere.apache…

分布式任务调度 SchedulerX 使用手册:

https://help.aliyun.com/docum…

SchedulerX 如何帮忙用户解决分布式任务调度:

https://mp.weixin.qq.com/s/EgyfS1Vuv4itnuxbiT7KwA

退出移动版