乐趣区

关于大数据:ELT-in-ByteHouse-实践与展望

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

谈到数据仓库,肯定离不开应用 Extract-Transform-Load (ETL)或 Extract-Load-Transform (ELT)。将起源不同、格局各异的数据提取到数据仓库中,并进行解决加工。

传统的数据转换过程个别采纳 Extract-Transform-Load (ETL)来将业务数据转换为适宜数仓的数据模型,然而,这依赖于独立于数仓外的 ETL 零碎,因此保护老本较高。当初,以火山引擎 ByteHouse 为例的云原生数据仓库,凭借其弱小的计算能力、可扩展性,开始全面反对 Extract-Load-Transform (ELT)的能力,从而使用户免于保护多套异构零碎。具体而言,用户能够将数据导入后,通过自定义的 SQL 语句,在 ByteHouse 外部进行数据转换,而无需依赖独立的 ETL 零碎及资源。

火山引擎 ByteHouse 是一款基于开源 ClickHouse 推出的云原生数据仓库,本篇文章将介绍 ByteHouse 团队如何在 ClickHouse 的根底上,构建并优化 ELT 能力,具体包含四局部:ByteHouse 在字节的利用、ByteHouse 团队做 ELT 的初衷、ELT in ByteHouse 实现计划、将来布局。

ByteHouse 在字节的利用

对于 ByteHouse

ByteHouse 的倒退

从 2017 年开始,字节外部的整体数据量一直上涨,为了撑持实时剖析的业务,字节外部开始了对各种数据库的选型。通过屡次试验,在实时剖析版块,字节外部决定开始试水 ClickHouse。

2018 年到 2019 年,字节外部的 ClickHouse 业务从繁多业务,逐渐倒退到了多个不同业务,实用到更多的场景,包含 BI 剖析、A/ B 测试、模型预估等。

在上述这些业务场景的一直实际之下,研发团队基于原生 ClickHouse 做了大量的优化,同时又开发了十分多的个性。

2020 年,ByteHouse 正式在字节跳动外部立项,2021 年通过火山引擎对外服务。

截止 2022 年 3 月,ByteHouse 在字节外部总节点数达到 18000 个,而繁多集群的最大规模是 2400 个节点。

ByteHouse 产品

在火山引擎官网的产品页中,咱们能够搜到 ByteHouse 产品(如下图):

ByteHouse 产品能够分为两个状态:

  1. 企业版:PaaS 模式、全托管、租户专属资源。
  2. 数仓版:SaaS 模式,在这个模式中,使用者能够免运维。用户通过控制台建表、导数据以及应用查问性能。

在数据量较小、应用较为简单的状况下,用户能够先试用企业版本,如果之后集群规模变大、运维压力较大,亦或是扩大能力要求变高,那么就能够转用到纯算拆散、运维能力更强的 CDW 上来,也就是咱们刚刚提及的数仓版。

利用场景

数据洞察

数据洞察是反对千亿级别数据自助剖析的一站式数据分析及合作平台,包含数据导入以及整合查问剖析,最终以数据门户、数字大屏、治理驾驶舱的可视化状态出现给业务用户,为一个比拟典型的场景。

增长剖析

用户行为剖析,即多场景决策的数据分析平台。而在增长剖析当中,分为了以下三个内容:

  1. 数据采集:采集用户行为、经营剖析以及平台的数据, 全埋点与可视化圈选,广告及其他触点数据接入。
  2. 数据分析

    1. 行为剖析:包含一个行为的单点事件、路径分析以及热图等
    2. 用户剖析:对用户的客户群体、用户画像以及用户的具体查问等
    3. 内容分析:包含抖音视频、电商商品等
  3. 智能利用:对于一些异样的检测与诊断、资源位归因以及推送经营与广告策略的利用。

一站式指标剖析平台

以懂车帝为例,懂车帝次要给用户提供实在、业余汽车的内容分享和高效的选车服务,同时基于营销需要,他们会依据用户增长的模型以及销售方法论,收集用户在端内的操作行为,进行后盾的查问剖析。

而这种查问剖析底层对接了 ByteHouse 的大数据引擎,最初实现秒级甚至是亚秒级剖析的决策。整个过程包含智能诊断、智能布局以及策略到投放成果评估闭环,最终实现智能营销和精细化经营。

ETL 场景

ELT 与 ETL 的区别
  • ETL 是用来形容将材料从起源端通过抽取、转置、加载至目标端(数据仓库)的过程。Transform 通常形容在数据仓库中的前置数据加工过程。
  • ELT 专一于将最小解决的数据加载到数据仓库中,而把大部分的转换操作留给分析阶段。相比起 ETL,它不须要过多的数据建模,而给剖析者提供更灵便的选项。ELT 曾经成为当今大数据的解决常态,它对数据仓库也提出了很多新的要求。

上面表述上会有一些两个词语混用的场景,大家不用过分关注区别。

典型场景

一站式报表

传统大数据解决的计划有两大难点:慢和难。别离体现在传统大数据计划在及时性上达不到要求以及传统数仓 ETL 对人员要求高、定位难和链路简单。

然而 ByteHouse 能够轻松的解决上述问题:将 hive 数据间接导入到 ByteHouse,造成大宽表,后续所有解决都在 ByteHouse 进行。

现有挑战

资源重复

典型的数据链路如下:咱们将行为数据、日志、点击流等通过 MQ/Kafka/Flink 将其接入存储系统当中,存储系统又可分为域内的 HDFS 和云上的 OSS&S3 这种近程贮存零碎,而后进行一系列的数仓的 ETL 操作,提供给 OLAP 零碎实现剖析查问。

但有些业务须要从上述的存储中做一个分支,因而会在数据分析的某一阶段,从整体链路中将数据导出,做一些不同于主链路的 ETL 操作,会呈现两份数据存储。其次在这过程中也会呈现两套不同的 ETL 逻辑。

当数据质变大,计算冗余以及存储冗余所带来的老本压力也会愈发变大,同时,存储空间的收缩也会让弹性扩容变得不便当。

简单场景

从 OLAP 场景扩大进来,随着数据量的增长和业务复杂度的晋升,ClickHouse 慢慢不能满足要求,体现在以下几点:

  • 业务变简单后,单纯大宽表不能满足业务需要。
  • 数据量逐步增多,进步性能的同时,须要进行一些数仓转换操作

在 ByteHouse 下来做简单查问或 ELT 工作,能够扩大 ClickHouse 的能力,加强它的可用性、稳定性以及性能,同时还反对不同类型的混合负载。

业界解决思路

在业界中,为了解决以上问题,有以下几类流派:

  • 数据预计算流派:如 Kylin 等。如果 Hadoop 零碎中出报表较慢或聚合能力较差,能够去做一个数据的预计算,提前将配的指标的 cube 或一些视图算好。理论 SQL 查问时,能够间接用外面的 cube 或视图做替换,之后间接返回。
  • 流批一体 :如 Flink、Risingwave。在数据流进时,针对一些须要出报表或者须要做大屏的数据间接内存中做聚合。聚合实现后,将后果写入 HBase 或 MySQL 中再去取数据,将数据取出后作展现。Flink 还会去间接裸露中间状态的接口,即 queryable state,让用户更好的应用状态数据。然而最初还会与批计算的后果实现对数,如果不统一,须要进行回查操作,整个过程考验运维 / 开发同学的功力。
  • 湖仓 一体 &HxxP: 将数据湖与数据仓库联合起来。

ELT in ByteHouse

整体流程

ELT 工作对系统的要求:

  1. 整体易扩大:导入和转换通常须要大量的资源,零碎须要通过程度扩大的形式来满足数据量的快速增长。
  2. 可靠性和容错能力:大量的 job 能有序调度;呈现 task 偶尔失败(OOM)、container 失败时,可能拉起重试;能解决肯定的数据歪斜
  3. 效率 & 性能:无效利用多核多机并发能力;数据疾速导入;内存应用无效(内存治理);CPU 优化(向量化、codegen)
  4. 生态 & 可观测性:可对接多种工具;工作状态感知;工作进度感知;失败日志查问;有肯定可视化能力

ByteHouse 针对 ELT 工作的要求,以及以后场景遇到的艰难,做了如下个性和改良。

存储服务化

计划:

  1. ETL 后先贮存为 Parquet
  2. 通过存储服务化对外提供查问服务
  3. Parque 转 Part 文件
  4. 删掉 Parquet 文件
  5. 对立通过 Part 提供服务
val df = spark.read.format("CnchPart")
.options(Map("table" -> "cnch_db.c1")).load()
val spark = SparkSession.builder() 
 .appName("CNCH-Reader") 
 .config("spark.sql.extensions", "CnchAutoConvertExtension") 
 .enableHiveSupport() .getOrCreate() 
val df = spark.sql("select * from cnch_db.c1")

收益:

  1. ETL 简化为一套逻辑,节俭运维老本
  2. 文件对立存储为 Part,占用空间与 Parquet 大体雷同。整体存储缩小 1 /2。

stage by stage schedule

整体介绍

  • 以后 ClickHouse 的 sql 执行过程:

    • 第一阶段,Coordinator 收到分布式表查问后将申请转换为对 local 表查问发送给每个 shard 节点。
    • 第二阶段,Coordinator 收到各个节点的后果后汇聚起来解决后返回给客户端。
  • ClickHouse 将 Join 操作中的右表转换为子查问,带来如下几个问题:

    • 简单的 query 有多个子查问,转换复杂度高
    • join 表较大时容易造成 worker 节点的 OOM
    • 聚合阶段在 Cooridnator,压力大,容易成为瓶颈

不同于 ClickHouse,咱们在 ByteHouse 中实现了对简单查问的执行优化。通过对执行打算的切分,将之前的两阶段执行模型转换为分阶段执行。在逻辑打算阶段,依据算子类型插入 exchange 算子。执行阶段依据 exchange 算子将整个执行打算进行 DAG 切分,并且分 stage 进行调度。stage 之间的 exchange 算子负责实现数据传输和替换。

关键点:

  1. exchange 节点插入
  2. 切分 stage
  3. stage scheduler
  4. segment executer
  5. exchange manager

这里重点来讲一下 exchange 的眼帘。上图能够看到,最顶层的是 query plan。上面转换成物理打算的时候,咱们会依据不同的数据分布的要求转换成不同的算子。source 层是接收数据的节点,根本都是对立的,叫做 ExchangeSource。Sink 则有不同的实现,BroadcastSink、Local、PartitionSink 等,他们是作为 map task 的一部分去运行的。如果是跨节点的数据操作,咱们在底层应用对立的 brpc 流式数据传输,如果是本地,则应用内存队列来实现。针对不同的点,咱们进行了十分粗疏的优化。

  • 数据传输层

    • 过程内通过内存队列,无序列化,zero copy
    • 过程间应用 brpc stream rpc,保序、连贯复用、状态码传输、压缩等
  • 算子层

    • 批量发送
    • 线程复用,缩小线程数量

带来的收益

  • Cooridnator 更稳固、更高效

    • 聚合等算子拆分到 worker 节点执行
    • Cooridnator 节点只须要聚合最终后果
  • Worker OOM 缩小

    • 进行了 stage 切分,每个 stage 的计算绝对简略
    • 减少了 exchange 算子,缩小内存压力
  • 网络连接更加稳固、高效

    • exchange 算子无效传输
    • 复用连接池

adaptive scheduler

这是在稳定性方面所做的个性。在 OLAP 场景中可能会发现局部数据不全或数据查问超时等,起因是每个计算节点是所有的 query 共用的,这样一旦有一个节点较慢就会导致整个 query 的执行受到影响。

计算节点共用存在的问题:

  • scan 节点负载和 workload 相干,做不到齐全均匀
  • 各 plan segment 所需资源差别大

这就导致 worker 节点之间的负载重大不平衡。负载较重的 worker 节点就会影响 query 整体的过程。

解决措施:

  • 建设 worker 衰弱度机制。Server 端建设 worker 衰弱度治理类,能够疾速获取 worker group 的衰弱度信息。包含 cpu、内存、运行 query 数量等信息。
  • 自适应调度。每个 sql 依据 worker 衰弱度动静的进行 worker 抉择以及计算节点并发度管制

query queue

咱们的集群也会呈现满载状况,即所有的 worker 都是不衰弱的或者满载 / 超载的,就会用查问队列来进行优化。

咱们间接在 server 端做了一个 manager。每次查问的时候 manager 会去 check 集群的资源,并且持有一个锁。如果资源不够用,则期待资源开释后去唤醒这个锁。这就防止了 Server 端不限度的下发计算工作,导致 worker 节点超载,而后崩掉的状况。

以后实现绝对简略。server 是多实例,每个 server 实例中都有 queue,所持有的是一个部分视角,不足全局的资源视角。除此之外,每个 queue 中的查问状态没有长久化,只是简略的缓存在内存中。

后续,咱们会减少 server 之间的协调,在一个全局的视角上对查问并发做限度。也会对 server 实例中 query 做长久化,减少一些 failover 的场景反对。

async execution

ELT 工作的一个典型特色就是绝对即时剖析,他们的运行工夫会绝对较长。个别为分钟级,甚至达到小时级。目前 ClickHouse 的客户端查问都采纳阻塞的形式进行返回。这样就造成了客户端长期处于期待的状况,而在这个期待过程中还须要放弃和服务端的连贯。在不稳固的网络状况下,客户端和服务端的连贯会断开,从而导致服务端的工作失败。

为了缩小这种不必要的失败,以及缩小客户端为了维持连贯的减少的复杂度。咱们开发了异步执行的性能,它的实现如下:

  1. 用户指定异步执行。用户能够通过 settings enable_async_query = 1 的形式进行 per query 的指定。也能够通过 set enable_async_query = 1 的形式进行 session 级别的指定。
  2. 如果是异步 query,则将其放到后盾线程池中运行
  3. 静默 io。当异步 query 执行时,则须要切断它和客户端的交互逻辑,比方输入日志等。

针对 query 的初始化还是在 session 的同步线程中进行。一旦实现初始化,则将 query 状态写入到 metastore,并向客户端返回 async query id。客户端能够用这个 id 查问 query 的状态。async query id 返回后,则示意实现此次查问的交互。这种模式下,如果语句是 select,那么后续后果则无奈回传给客户端。这种状况下咱们举荐用户应用 async query + select…into outfile 的组合来满足需要。

将来布局

针对 ELT 混合负载,目前只是牛刀小试。后续的版本中咱们会继续补齐布局中的能力,包含但不限于以下:

导入优化

  • spark part writer 转换到域内执行,进步性能
  • 细粒度导入工作的事务处理
  • 细粒度导入工作事务锁优化

故障恢复能力

  • 算子 spill

    • sort、agg、join 社区已有局部能力,咱们在同步的同时,会针对性的做性能优化和 bug 修复。也会摸索一些自动化 spill 的可能。
    • exchange 减少 spill 能力
  • recoverability

    • 算子执行复原。ELT 工作运行时长较长时,两头 task 的偶发失败会导致整个 query 失败。整体重试的话会造成时长的节约。task 原地失败重试能够防止环境起因导致的偶发失败。
    • stage 重试。当节点失败时,能够进行 stage 级别的重试
    • 队列作业状态保留
  • remote shuffle service:以后业界开源的 shuffle service 通常为 Spark 定制,没有通用的客户端,比方 c ++ 客户端。须要一起适配。

资源

  • 计算资源可指定:用户可指定 query 须要的计算资源。
  • 计算资源预估 / 预占:可动静预估 query 须要的计算资源,并通过预占的形式进行调配。
  • 动静申请资源:以后 worker 均为常驻过程 / 节点。动静申请资源能够进步利用率。
  • 更细粒度的资源隔离:通过 worker group 或者过程级别的隔离,缩小各 query 之间相互影响

生态

  • 反对更多 ETL 编排、调度工具

    • dbt、AirFlow 已反对
    • Kettle、Dolphin、SeaTunnel 陆续反对中 …
  • 数据湖格局对接

    • Hudi、Iceberg external table reader
    • JNI reader to accelerate

点击跳转火山引擎 ByteHouse 理解更多

退出移动版