1. 指标

通过底层零碎数据的拉通,数据治理对立数据口径,实现控制塔KPI体系的线上数字化存储计算和灵便展示,撑持我的项目的顺利落地。

2. 方案设计

2.1 支流ETL工具调研

维度\产品DataPipelinekettleOracle GaodengateInformaticatalendDatax
性能实用场景次要用于各类数据交融、数据交换场景,专为超大数据量、高度简单的数据链路设计的灵便、可扩大的数据交换平台面向数据仓库建模传统ETL工具次要用于数据备份、容灾面向数据仓库建模传统ETL工具面向数据仓库建模传统ETL工具面向数据仓库建模传统ETL工具
应用形式全流程图形化界面,利用端采纳B/S架构,Cloud Native为云而生,所有操作在浏览器内就能够实现,不须要额定的开发和生产公布C/S客户端模式,开发和生产环境须要独立部署,工作的编写、调试、批改都在本地,须要公布到生产环境,线上生产环境没有界面,须要通过日志来调试、debug,效率低,费时费力没有图形化的界面,操作皆为命令行形式,可配置能力差C/S客户端模式,开发和生产环境须要独立部署,工作的编写、调试、批改都在本地,须要公布到生产环境;学习老本较高,个别须要受过专业培训的工程师能力应用;C/S客户端模式,开发和生产环境须要独立部署,工作的编写、调试、批改都在本地,须要公布到生产环境;DataX是以脚本的形式执行工作的,须要齐全吃透源码才能够调用,学习老本高,没有图形开发化界面和监控界面,运维老本绝对高。
底层架构分布式集群高可用架构,能够程度扩大到多节点反对超大数据量,架构容错性高,能够主动调节工作在节点之间调配,实用于大数据场景主从构造非高可用,扩展性差,架构容错性低,不实用大数据场景可做集群部署,躲避单点故障,依赖于外部环境,如Oracle RAC等;schema mapping非主动;可复制性比拟差;更新换代不是很强反对分布式部署反对单机部署和集群部署两种形式
CDC机制基于日志、基于工夫戳和自增序列等多种形式可选基于工夫戳、触发器等次要是基于日志基于日志、基于工夫戳和自增序列等多种形式可选基于触发器、基于工夫戳和自增序列等多种形式可选离线批处理
对数据库的影响基于日志的采集形式对数据库无侵入性对数据库表构造有要求,存在肯定侵入性源端数据库须要预留额定的缓存空间基于日志的采集形式对数据库无侵入性有侵入性通过sql select 采集数据,对数据源没有侵入性
主动断点续传反对不反对反对不反对,依赖ETL设计的合理性(例如T-1),指定续读某个工夫点的数据,非主动不反对,依赖ETL设计的合理性(例如T-1),指定续读某个工夫点的数据,非主动不反对
监控预警可视化的过程监控,提供多样化的图表,辅助运维,故障问题可实时预警依赖日志定位故障问题,往往只能是后处理的形式,短少过程预警无图形化的界面预警monitor能够看到报错信息,信息绝对抽象,定位问题仍需依赖剖析日志有问题预警,定位问题仍需依赖日志依赖工具日志定位故障问题,没有图形化运维界面和预警机制,须要自定义开发。
数据荡涤围绕数据品质做轻量荡涤围绕数据仓库的数据需要进行建模计算,荡涤性能绝对简单,须要手动编程轻量荡涤反对简单逻辑的荡涤和转化反对简单逻辑的荡涤和转化须要依据本身清晰规定编写荡涤脚本,进行调用(DataX3.0 提供的性能)。
数据转换自动化的schema mapping手动配置schema mapping需手动配置异构数据间的映射手动配置schema mapping手动配置schema mapping通过编写json脚本进行schema mapping映射
特色数据实时性实时非实时实时反对实时,然而支流利用都是基于工夫戳等形式做批量解决,实时同步效率未知实时定时
利用难度
是否须要开发
易用性
稳定性
其余施行及售后服务原厂施行和售后服务开源软件,需自客户自行施行、保护原厂和第三方的施行和售后服务次要为第三方的施行和售后服务分为开源版和企业版,企业版可提供相应服务阿里开源代码,须要客户主动施行、开发、保护

2.2 kettle应用体验

根据上述调研,抉择了最风行且收费的kettle作为体验对象。

  1. 长处:
  2. 无需开发,通过界面操作,即可实现数据ETL流程(对非开发人员敌对)。
  3. 毛病:
  4. 对开发人员来说,新增同步时,界面操作的效率,不肯定比本人实现的效率高。
  5. 简单的计算逻辑依然须要写代码,并且是在工具的界面上,按工具要求的标准来写。

根据上述体验后果,联合以后业务“有大量的计算字段”的特点,最终抉择了自主开发计划。

2.3 自主开发计划

  • 数据同步计划
  • ETL繁难流程

3. 计划实现

3.1 设计准则

宽表同步&计算有以下特点:

  • 须要从多个业务表中同步数据,每个表都要执行“查问数据“和”映射字段值到宽表中",同时后续可能会新增映射字段(包含新加原始业务表)。
  • 除了已有的业务字段外,宽表还有依据以后字段计算出来的“计算结果”字段,并且后续也可能会继续新增此类字段。
  • 流程环节可能会新增,也可能会删除。

以上都是可预感的会扩大、批改比拟频繁的点,依据开闭准则(对扩大凋谢,对批改敞开),将业务表的数据查问和映射剥离进去,提供形象对象来反对扩大;“计算结果”字段也是一样;将每个流程环节独立成一个类,多个类按程序串成一个链,新增/删除的时候就在链中插入/删除节点。

3.2 具体流程

3.3 JAVA类图

3.4 重点类介绍

3.4.1 通用对象:

  • BaseMerger:封装了流程通用的“启动流程前”、“执行流程”、“流程失常执行实现(失常执行实现时进入)”、“流程异样解决(异样时进入)”、“流程执行实现(不论是否异样都会进入本办法)”办法,同时提供扩大办法“流程名称”,由子类实现,用于获取子类须要的流程节点列表(从所有的FlowNode列表中筛选出属于该流程的列表)。
  • MergeContext:上下文对象,用于在各个节点中传递数据。
  • BaseFlowNode:流程节点父类,封装了每个节点执行时通用的办法,如日志记录,子类只须要实现外围的“doExecute”办法即可。
  • BaseLoopFlowNode:可循环的流程节点父类,当流程判断(以后节点为BaseLoopFlowNode子类,且上下文中“循环次数”> 1时),会开始进入循环状态,直到(下一个节点不是BaseLoopFlowNode,且循环次数达到最大次数)时,才会跳出循环。

    protected void executeFlow(MergeContext<T> context) {  BaseFlowNode<T> currentNode = flowNodes.get(0);  BaseFlowNode<T> beginLoopNode = null;  do {      if (!context.isRunning()) {          break;      }      currentNode.execute(context);      int totalLoopTimes = context.getTotalLoopTimes();      // 循环节点,且须要循环      if (currentNode instanceof BaseLoopFlowNode && totalLoopTimes > 0) {          // 启动循环          if (beginLoopNode == null) {              beginLoopNode = currentNode;          }          BaseFlowNode<T> nextFlowNode = currentNode.getNext();          // 最初一个节点          if (nextFlowNode == null) {              // 然而还未达到最大循环次数              if (context.getCurrentLoopTimes() < totalLoopTimes - 1) {                  // 重头开始循环                  currentNode = beginLoopNode;                  // 循环次数加1                  context.setCurrentLoopTimes(context.getCurrentLoopTimes() + 1);              } else {                  // 完结循环                  ((BaseLoopFlowNode) currentNode).resetLoop(context);                  beginLoopNode = null;                  currentNode = nextFlowNode;              }          } else {              if (nextFlowNode instanceof BaseLoopFlowNode) {                  // 下一个节点也是循环中的一部分                  currentNode = nextFlowNode;              } else if (context.getCurrentLoopTimes() < totalLoopTimes - 1) {                  // 还未达到最大循环次数                  // 重头开始循环                  currentNode = beginLoopNode;                  // 循环次数加1                  context.setCurrentLoopTimes(context.getCurrentLoopTimes() + 1);              } else {                  // 完结循环                  ((BaseLoopFlowNode) currentNode).resetLoop(context);                  beginLoopNode = null;                  currentNode = nextFlowNode;              }          }      } else {          // 循环完结、或者还没开始、或者不须要循环          currentNode = currentNode.getNext();      }  } while (currentNode != null);}

3.4.2 具体流程对象

  • LqPrDetailMergeConvertor:依据“开闭准则”拆出来的对象,当读取到的业务表数据须要映射到宽表对象时,在此类做批改。
  • BaseDataExtractor:依据“开闭准则”拆出来的形象对象,须要从业务表中“读取数据”、“过滤数据”时,增加此类的实现类即可。
  • BaseCalculator:依据“开闭准则”拆出来的形象对象,须要对宽表的某个字段做计算或者调整时,增加此类的实现类即可。
参考
六种 支流ETL 工具的比拟