乐趣区

关于开源:ByConity-技术详解之-ELT

谈到数据仓库,肯定离不开应用 Extract-Transform-Load (ETL)或 Extract-Load-Transform (ELT)。将起源不同、格局各异的数据提取到数据仓库中,并进行解决加工。传统的数据转换过程个别采纳 Extract-Transform-Load (ETL)来将业务数据转换为适宜数仓的数据模型,然而,这依赖于独立于数仓外的 ETL 零碎,因此保护老本较高。

ByConity 作为云原生数据仓库,从 0.2.0 版本开始逐渐反对 Extract-Load-Transform (ELT),使用户免于保护多套异构数据系统。本文将介绍 ByConity 在 ELT 方面的能力布局,实现原理和应用形式等。

ETL 场景和计划

ELT 与 ETL 的区别

  • ETL:是用来形容将数据从起源端通过抽取、转置、加载至目标端(数据仓库)的过程。Transform 通常形容在数据仓库中的前置数据加工过程。
  • ELT 专一于将最小解决的数据加载到数据仓库中,而把大部分的转换操作留给分析阶段。相比起前者(ETL),它不须要过多的数据建模,而给剖析者提供更灵便的选项。ELT 曾经成为当今大数据的解决常态,它对数据仓库也提出了很多新的要求。

资源重复的挑战

典型的数据链路如下:咱们将行为数据、日志、点击流等通过 MQ/ Kafka/ Flink 将其接入存储系统当中,存储系统又可分为域内的 HDFS 和云上的 OSS& S3 这种近程贮存零碎,而后进行一系列的数仓的 ETL 操作,提供给 OLAP 零碎实现剖析查问。

但有些业务须要从上述的存储中做一个分支,因而会在数据分析的某一阶段,从整体链路中将数据导出,做一些不同于主链路的 ETL 操作,会呈现两份数据存储。其次在这过程中也会呈现两套不同的 ETL 逻辑。

当数据质变大,计算冗余以及存储冗余所带来的老本压力也会愈发变大,同时,存储空间的收缩也会让弹性扩容变得不便当。

业界解决思路

在业界中,为了解决以上问题,有以下几类流派:

  • 数据预计算流派:如 Kylin 等。如果 Hadoop 零碎中出报表较慢或聚合能力较差,能够去做一个数据的预计算,提前将配的指标的 cube 或一些视图算好。理论 SQL 查问时,能够间接用外面的 cube 或视图做替换,之后间接返回。
  • 流批一体 :如 Flink、Risingwave。在数据流进时,针对一些须要出报表或者须要做大屏的数据间接内存中做聚合。聚合实现后,将后果写入 HBase 或 MySQL 中再去取数据,将数据取出后作展现。Flink 还会去间接裸露中间状态的接口,即 queryable state,让用户更好的应用状态数据。然而最初还会与批计算的后果实现对数,如果不统一,须要进行回查操作,整个过程考验运维 / 开发同学的功力。
  • 湖仓 一体 &HxxP: 将数据湖与数据仓库联合起来。

ELT in ByConity

整体执行流程

ELT 工作对系统的要求:

  1. 整体易扩大:导入和转换通常须要大量的资源,零碎须要通过程度扩大的形式来满足数据量的快速增长。
  2. 可靠性和容错能力:大量的 job 能有序调度;呈现 task 偶尔失败(OOM)、container 失败时,可能拉起重试;能解决肯定的数据歪斜
  3. 效率 & 性能:无效利用多核多机并发能力;数据疾速导入;内存应用无效(内存治理);CPU 优化(向量化、codegen)
  4. 生态 & 可观测性:可对接多种工具;工作状态感知;工作进度感知;失败日志查问;有肯定可视化能力

ByConity 针对 ELT 工作的要求,以及以后场景遇到的艰难,新增了以下个性和优化改良。

分阶段执行(Stage-level Scheduling)

原理解析

  • 以后 ClickHouse 的 SQL 执行过程如下:

    • 第一阶段,Coordinator 收到分布式表查问后将申请转换为对 local 表查问发送给每个 shard 节点;
    • 第二阶段,Coordinator 收到各个节点的后果后汇聚起来解决后返回给客户端;
  • ClickHouse 将 Join 操作中的右表转换为子查问,带来如下几个问题都很难以解决:

    • 简单的 query 有多个子查问,转换复杂度高;
    • Join 表较大时,容易造成 worker 节点的 OOM;
    • 聚合阶段在 Cooridnator,压力大,容易成为性能瓶颈;

不同于 ClickHouse,咱们在 ByConity 中实现了对简单查问的执行优化。通过对执行打算的切分,将之前的两阶段执行模型转换为分阶段执行。在逻辑打算阶段,依据算子类型插入 exchange 算子。执行阶段依据 exchange 算子将整个执行打算进行 DAG 切分,并且分 stage 进行调度。stage 之间的 exchange 算子负责实现数据传输和替换。

要害节点:

  1. exchange 节点插入
  2. 切分 stage
  3. stage scheduler
  4. segment executer
  5. exchange manager

这里重点来讲一下 exchange 的眼帘。上图能够看到,最顶层的是 query plan。上面转换成物理打算的时候,咱们会依据不同的数据分布的要求转换成不同的算子。source 层是接收数据的节点,根本都是对立的,叫做 ExchangeSource。Sink 则有不同的实现,BroadcastSink、Local、PartitionSink 等,他们是作为 map task 的一部分去运行的。如果是跨节点的数据操作,咱们在底层应用对立的 brpc 流式数据传输,如果是本地,则应用内存队列来实现。针对不同的点,咱们进行了十分粗疏的优化:

  • 数据传输层

    • 过程内通过内存队列,无序列化,zero copy
    • 过程间应用 brpc stream rpc,保序、连贯复用、状态码传输、压缩等
  • 算子层

    • 批量发送
    • 线程复用,缩小线程数量

带来的收益

因为 ByConity 彻底采纳了多阶段的查问执行形式,整体有很大的收益:

  • Cooridnator 更稳固、更高效

    • 聚合等算子拆分到 worker 节点执行
    • Cooridnator 节点只须要聚合最终后果
  • Worker OOM 缩小

    • 进行了 stage 切分,每个 stage 的计算绝对简略
    • 减少了 exchange 算子,缩小内存压力
  • 网络连接更加稳固、高效

    • exchange 算子无效传输
    • 复用连接池

自适应的调度器(Adaptive Scheduler)

Adaptive Scheduler 属于咱们在稳定性方面所做的个性。在 OLAP 场景中可能会发现局部数据不全或数据查问超时等,起因是每个 worker 是所有的 query 共用的,这样一旦有一个 worker 较慢就会导致整个 query 的执行受到影响。

计算节点共用存在的问题:

  • Scan 所在的节点负载和不同查问所需的扫描数据量相干,做不到齐全均匀;
  • 各 Plan Segment 所需资源差别大;

这就导致 worker 节点之间的负载重大不平衡。负载较重的 worker 节点就会影响 query 整体的过程。因而咱们做了以下的优化计划:

  • 建设 Worker 衰弱度机制。Server 端建设 Worker 衰弱度治理类,能够疾速获取 Worker Group 的衰弱度信息,包含 CPU、内存、运行 Query 数量等信息。
  • 自适应调度:每个 SQL 依据 Worker 衰弱度动静的进行抉择以及计算节点并发度管制。

查问的队列机制(Query Queue)

咱们的集群也会呈现满载状况,即所有的 worker 都是不衰弱的或者满载 / 超载的,就会用查问队列来进行优化。

咱们间接在 server 端做了一个 manager。每次查问的时候 manager 会去 check 集群的资源,并且持有一个锁。如果资源不够用,则期待资源开释后去唤醒这个锁。这就防止了 Server 端不限度的下发计算工作,导致 worker 节点超载,而后崩掉的状况。

以后实现绝对简略。server 是多实例,每个 server 实例中都有 queue,所持有的是一个部分视角,不足全局的资源视角。除此之外,每个 queue 中的查问状态没有长久化,只是简略的缓存在内存中。

后续,咱们会减少 server 之间的协调,在一个全局的视角上对查问并发做限度。也会对 server 实例中 query 做长久化,减少一些 failover 的场景反对。

异步执行(Async Execution)

ELT 工作的一个典型特色就是:绝对于即时剖析,他们的运行工夫会绝对较长。个别 ELT 工作执行时长为分钟级,甚至达到小时级。

目前 ClickHouse 的客户端查问都采纳阻塞的形式进行返回。这样就造成了客户端长期处于期待的状况,而在这个期待过程中还须要放弃和服务端的连贯。在不稳固的网络状况下,客户端和服务端的连贯会断开,从而导致服务端的工作失败。

为了缩小这种不必要的失败,以及缩小客户端为了维持连贯的减少的复杂度。咱们开发了异步执行的性能,它的实现如下:

  1. 用户指定异步执行。用户能够通过 settings enable_async_query = 1 的形式进行 per query 的指定。也能够通过 set enable_async_query = 1 的形式进行 session 级别的指定。
  2. 如果是异步 query,则将其放到后盾线程池中运行
  3. 静默 io。当异步 query 执行时,则须要切断它和客户端的交互逻辑,比方输入日志等。

针对 query 的初始化还是在 session 的同步线程中进行。一旦实现初始化,则将 query 状态写入到 metastore,并向客户端返回 async query id。客户端能够用这个 id 查问 query 的状态。async query id 返回后,则示意实现此次查问的交互。这种模式下,如果语句是 select,那么后续后果则无奈回传给客户端。这种状况下咱们举荐用户应用 async query + select…into outfile 的组合来满足需要。

将来布局

针对 ELT 混合负载,ByConity 0.2.0 版本目前只是牛刀小试。后续的版本中咱们会继续优化查问相干的能力,ELT 为外围的布局如下:

故障恢复能力

  • 算子 Spill

    • Sort、Agg、Join 算子 Spill;
    • Exchange Spill 能力;
  • Recoverability 容错复原

    • 算子执行复原:ELT 工作运行时长较长时,两头 Task 的偶发失败会导致整个 Query 失败,反对 Task 级别重试能够极大地升高环境起因导致的偶发失败;
    • Stage 重试:当节点失败时,能够进行 Stage 级别的重试;
    • 保留队列作业状态的能力;
  • Remote Shuffle Service:以后业界开源的 shuffle service 通常为 Spark 定制,没有通用的客户端,比方 c ++ 客户端。后续咱们会补充这部分能力。

资源

  • 计算资源可指定:用户可指定 query 须要的计算资源;
  • 计算资源预估 / 预占:可动静预估 query 须要的计算资源,并通过预占的形式进行调配;
  • 动静申请资源:以后 worker 均为常驻过程 / 节点。动静申请资源能够进步利用率;
  • 更细粒度的资源隔离:通过 worker group 或者过程级别的隔离,缩小各 query 之间相互影响;

欢送退出社区,与咱们共建

ByConity 我的项目 GitHub 地址:

https://github.com/ByConity

用户手册:

https://byconity.github.io/zh-cn/docs/introduction/background…

扫码增加小助手

退出移动版