更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
谈到数据仓库, 肯定离不开应用Extract-Transform-Load (ETL)或 Extract-Load-Transform (ELT)。 将起源不同、格局各异的数据提取到数据仓库中,并进行解决加工。
传统的数据转换过程个别采纳Extract-Transform-Load (ETL)来将业务数据转换为适宜数仓的数据模型,然而,这依赖于独立于数仓外的ETL零碎,因此保护老本较高。当初,以火山引擎ByteHouse为例的云原生数据仓库,凭借其弱小的计算能力、可扩展性,开始全面反对Extract-Load-Transform (ELT)的能力,从而使用户免于保护多套异构零碎。具体而言,用户能够将数据导入后,通过自定义的SQL语句,在ByteHouse外部进行数据转换,而无需依赖独立的ETL零碎及资源。
火山引擎ByteHouse是一款基于开源ClickHouse推出的云原生数据仓库,本篇文章将介绍ByteHouse团队如何在ClickHouse的根底上,构建并优化ELT能力,具体包含四局部:ByteHouse在字节的利用、ByteHouse团队做ELT的初衷、ELT in ByteHouse实现计划、将来布局。
ByteHouse在字节的利用
对于ByteHouse
ByteHouse的倒退
从2017年开始,字节外部的整体数据量一直上涨,为了撑持实时剖析的业务,字节外部开始了对各种数据库的选型。通过屡次试验,在实时剖析版块,字节外部决定开始试水ClickHouse。
2018年到2019年,字节外部的ClickHouse业务从繁多业务,逐渐倒退到了多个不同业务,实用到更多的场景,包含BI 剖析、A/B测试、模型预估等。
在上述这些业务场景的一直实际之下,研发团队基于原生ClickHouse做了大量的优化,同时又开发了十分多的个性。
2020年, ByteHouse正式在字节跳动外部立项,2021年通过火山引擎对外服务。
截止2022年3月,ByteHouse在字节外部总节点数达到18000个,而繁多集群的最大规模是2400个节点。
ByteHouse产品
在火山引擎官网的产品页中,咱们能够搜到ByteHouse产品(如下图):
ByteHouse产品能够分为两个状态:
- 企业版:PaaS模式、全托管、租户专属资源。
- 数仓版:SaaS模式,在这个模式中,使用者能够免运维。用户通过控制台建表、导数据以及应用查问性能。
在数据量较小、应用较为简单的状况下,用户能够先试用企业版本,如果之后集群规模变大、运维压力较大,亦或是扩大能力要求变高,那么就能够转用到纯算拆散、运维能力更强的CDW上来,也就是咱们刚刚提及的数仓版。
利用场景
数据洞察
数据洞察是反对千亿级别数据自助剖析的一站式数据分析及合作平台,包含数据导入以及整合查问剖析,最终以数据门户、数字大屏、治理驾驶舱的可视化状态出现给业务用户,为一个比拟典型的场景。
增长剖析
用户行为剖析,即多场景决策的数据分析平台。而在增长剖析当中,分为了以下三个内容:
- 数据采集:采集用户行为、经营剖析以及平台的数据,全埋点与可视化圈选,广告及其他触点数据接入。
数据分析:
- 行为剖析:包含一个行为的单点事件、路径分析以及热图等
- 用户剖析:对用户的客户群体、用户画像以及用户的具体查问等
- 内容分析:包含抖音视频、电商商品等
- 智能利用:对于一些异样的检测与诊断、资源位归因以及推送经营与广告策略的利用。
一站式指标剖析平台
以懂车帝为例,懂车帝次要给用户提供实在、业余汽车的内容分享和高效的选车服务,同时基于营销需要,他们会依据用户增长的模型以及销售方法论,收集用户在端内的操作行为,进行后盾的查问剖析。
而这种查问剖析底层对接了ByteHouse的大数据引擎,最初实现秒级甚至是亚秒级剖析的决策。整个过程包含智能诊断、智能布局以及策略到投放成果评估闭环,最终实现智能营销和精细化经营。
ETL场景
ELT与ETL的区别
- ETL是用来形容将材料从起源端通过抽取、转置、加载至目标端(数据仓库)的过程。Transform通常形容在数据仓库中的前置数据加工过程。
- ELT专一于将最小解决的数据加载到数据仓库中,而把大部分的转换操作留给分析阶段。相比起ETL,它不须要过多的数据建模,而给剖析者提供更灵便的选项。ELT曾经成为当今大数据的解决常态,它对数据仓库也提出了很多新的要求。
上面表述上会有一些两个词语混用的场景,大家不用过分关注区别。
典型场景
一站式报表
传统大数据解决的计划有两大难点:慢和难。别离体现在传统大数据计划在及时性上达不到要求以及传统数仓ETL对人员要求高、定位难和链路简单。
然而ByteHouse能够轻松的解决上述问题:将hive数据间接导入到ByteHouse,造成大宽表,后续所有解决都在ByteHouse进行。
现有挑战
资源重复
典型的数据链路如下:咱们将行为数据、日志、点击流等通过MQ/Kafka/Flink将其接入存储系统当中,存储系统又可分为域内的HDFS和云上的OSS&S3这种近程贮存零碎,而后进行一系列的数仓的ETL操作,提供给OLAP零碎实现剖析查问。
但有些业务须要从上述的存储中做一个分支,因而会在数据分析的某一阶段,从整体链路中将数据导出,做一些不同于主链路的ETL操作,会呈现两份数据存储。其次在这过程中也会呈现两套不同的ETL逻辑。
当数据质变大,计算冗余以及存储冗余所带来的老本压力也会愈发变大,同时,存储空间的收缩也会让弹性扩容变得不便当。
简单场景
从OLAP场景扩大进来,随着数据量的增长和业务复杂度的晋升,ClickHouse慢慢不能满足要求,体现在以下几点:
- 业务变简单后,单纯大宽表不能满足业务需要。
- 数据量逐步增多,进步性能的同时,须要进行一些数仓转换操作
在ByteHouse下来做简单查问或ELT工作,能够扩大ClickHouse的能力,加强它的可用性、稳定性以及性能,同时还反对不同类型的混合负载。
业界解决思路
在业界中,为了解决以上问题,有以下几类流派:
- 数据预计算流派:如Kylin等。如果Hadoop零碎中出报表较慢或聚合能力较差,能够去做一个数据的预计算,提前将配的指标的cube或一些视图算好。理论SQL查问时,能够间接用外面的cube或视图做替换,之后间接返回。
- 流批一体 派:如Flink、Risingwave。在数据流进时,针对一些须要出报表或者须要做大屏的数据间接内存中做聚合。聚合实现后,将后果写入HBase或MySQL中再去取数据,将数据取出后作展现。Flink还会去间接裸露中间状态的接口,即queryable state,让用户更好的应用状态数据。然而最初还会与批计算的后果实现对数,如果不统一,须要进行回查操作,整个过程考验运维/开发同学的功力。
- 湖仓 一体&HxxP:将数据湖与数据仓库联合起来。
ELT in ByteHouse
整体流程
ELT工作对系统的要求:
- 整体易扩大:导入和转换通常须要大量的资源,零碎须要通过程度扩大的形式来满足数据量的快速增长。
- 可靠性和容错能力:大量的job能有序调度;呈现task偶尔失败(OOM)、container失败时,可能拉起重试;能解决肯定的数据歪斜
- 效率&性能:无效利用多核多机并发能力;数据疾速导入;内存应用无效(内存治理);CPU优化(向量化、codegen)
- 生态& 可观测性:可对接多种工具;工作状态感知;工作进度感知;失败日志查问;有肯定可视化能力
ByteHouse针对ELT工作的要求,以及以后场景遇到的艰难,做了如下个性和改良。
存储服务化
计划:
- ETL后先贮存为Parquet
- 通过存储服务化对外提供查问服务
- Parque转Part文件
- 删掉Parquet文件
- 对立通过Part提供服务
val df = spark.read.format("CnchPart").options(Map("table" -> "cnch_db.c1")).load()
val spark = SparkSession.builder() .appName("CNCH-Reader") .config("spark.sql.extensions", "CnchAutoConvertExtension") .enableHiveSupport() .getOrCreate() val df = spark.sql("select * from cnch_db.c1")
收益:
- ETL简化为一套逻辑,节俭运维老本
- 文件对立存储为Part,占用空间与Parquet大体雷同。整体存储缩小1/2。
stage by stage schedule
整体介绍
以后ClickHouse的sql执行过程:
- 第一阶段,Coordinator 收到分布式表查问后将申请转换为对 local 表查问发送给每个 shard 节点。
- 第二阶段,Coordinator 收到各个节点的后果后汇聚起来解决后返回给客户端。
ClickHouse将Join操作中的右表转换为子查问,带来如下几个问题:
- 简单的query有多个子查问,转换复杂度高
- join表较大时容易造成worker节点的OOM
- 聚合阶段在Cooridnator,压力大,容易成为瓶颈
不同于ClickHouse,咱们在ByteHouse中实现了对简单查问的执行优化。通过对执行打算的切分,将之前的两阶段执行模型转换为分阶段执行。在逻辑打算阶段,依据算子类型插入exchange算子。执行阶段依据exchange算子将整个执行打算进行DAG切分,并且分stage进行调度。stage之间的exchange算子负责实现数据传输和替换。
关键点:
- exchange节点插入
- 切分stage
- stage scheduler
- segment executer
- exchange manager
这里重点来讲一下exchange的眼帘。上图能够看到,最顶层的是query plan。上面转换成物理打算的时候,咱们会依据不同的数据分布的要求转换成不同的算子。source层是接收数据的节点,根本都是对立的,叫做ExchangeSource。Sink则有不同的实现,BroadcastSink、Local、PartitionSink等,他们是作为map task的一部分去运行的。如果是跨节点的数据操作,咱们在底层应用对立的brpc流式数据传输,如果是本地,则应用内存队列来实现。针对不同的点,咱们进行了十分粗疏的优化。
数据传输层
- 过程内通过内存队列,无序列化,zero copy
- 过程间应用brpc stream rpc,保序、连贯复用、状态码传输、压缩等
算子层
- 批量发送
- 线程复用,缩小线程数量
带来的收益
Cooridnator更稳固、更高效
- 聚合等算子拆分到worker节点执行
- Cooridnator节点只须要聚合最终后果
Worker OOM缩小
- 进行了stage切分,每个stage的计算绝对简略
- 减少了exchange算子,缩小内存压力
网络连接更加稳固、高效
- exchange算子无效传输
- 复用连接池
adaptive scheduler
这是在稳定性方面所做的个性。在OLAP场景中可能会发现局部数据不全或数据查问超时等,起因是每个计算节点是所有的query共用的,这样一旦有一个节点较慢就会导致整个query的执行受到影响。
计算节点共用存在的问题:
- scan 节点负载和 workload 相干,做不到齐全均匀
- 各 plan segment 所需资源差别大
这就导致worker节点之间的负载重大不平衡。负载较重的worker节点就会影响query整体的过程。
解决措施:
- 建设 worker 衰弱度机制。Server 端建设worker 衰弱度治理类,能够疾速获取worker group 的衰弱度信息。包含cpu、内存、运行query数量等信息。
- 自适应调度。每个sql 依据 worker 衰弱度动静的进行worker 抉择以及计算节点并发度管制
query queue
咱们的集群也会呈现满载状况,即所有的worker都是不衰弱的或者满载/超载的,就会用查问队列来进行优化。
咱们间接在server端做了一个manager。每次查问的时候manager会去check集群的资源,并且持有一个锁。如果资源不够用,则期待资源开释后去唤醒这个锁。这就防止了Server端不限度的下发计算工作,导致worker节点超载,而后崩掉的状况。
以后实现绝对简略。server是多实例,每个server实例中都有queue,所持有的是一个部分视角,不足全局的资源视角。除此之外,每个queue中的查问状态没有长久化,只是简略的缓存在内存中。
后续,咱们会减少server之间的协调,在一个全局的视角上对查问并发做限度。也会对server实例中query做长久化,减少一些failover的场景反对。
async execution
ELT工作的一个典型特色就是绝对即时剖析,他们的运行工夫会绝对较长。个别为分钟级,甚至达到小时级。目前ClickHouse的客户端查问都采纳阻塞的形式进行返回。这样就造成了客户端长期处于期待的状况,而在这个期待过程中还须要放弃和服务端的连贯。在不稳固的网络状况下,客户端和服务端的连贯会断开,从而导致服务端的工作失败。
为了缩小这种不必要的失败,以及缩小客户端为了维持连贯的减少的复杂度。咱们开发了异步执行的性能,它的实现如下:
- 用户指定异步执行。用户能够通过settings enable_async_query = 1的形式进行per query的指定。也能够通过set enable_async_query = 1的形式进行session级别的指定。
- 如果是异步query,则将其放到后盾线程池中运行
- 静默io。当异步query执行时,则须要切断它和客户端的交互逻辑,比方输入日志等。
针对query的初始化还是在session的同步线程中进行。一旦实现初始化,则将query状态写入到metastore,并向客户端返回async query id。客户端能够用这个id查问query的状态。async query id返回后,则示意实现此次查问的交互。这种模式下,如果语句是select,那么后续后果则无奈回传给客户端。这种状况下咱们举荐用户应用async query + select...into outfile的组合来满足需要。
将来布局
针对ELT混合负载,目前只是牛刀小试。后续的版本中咱们会继续补齐布局中的能力,包含但不限于以下:
导入优化
- spark part writer转换到域内执行,进步性能
- 细粒度导入工作的事务处理
- 细粒度导入工作事务锁优化
故障恢复能力
算子spill
- sort、agg、join社区已有局部能力,咱们在同步的同时,会针对性的做性能优化和bug修复。也会摸索一些自动化spill的可能。
- exchange减少spill能力
recoverability
- 算子执行复原。ELT工作运行时长较长时,两头task的偶发失败会导致整个query失败。整体重试的话会造成时长的节约。task原地失败重试能够防止环境起因导致的偶发失败。
- stage重试。当节点失败时,能够进行stage级别的重试
- 队列作业状态保留
- remote shuffle service:以后业界开源的shuffle service通常为Spark定制,没有通用的客户端,比方c++客户端。须要一起适配。
资源
- 计算资源可指定:用户可指定query须要的计算资源。
- 计算资源预估/预占:可动静预估query须要的计算资源,并通过预占的形式进行调配。
- 动静申请资源:以后worker均为常驻过程/节点。动静申请资源能够进步利用率。
- 更细粒度的资源隔离:通过worker group或者过程级别的隔离,缩小各query之间相互影响
生态
反对更多ETL编排、调度工具
- dbt、AirFlow已反对
- Kettle、Dolphin、SeaTunnel陆续反对中...
数据湖格局对接
- Hudi、Iceberg external table reader
- JNI reader to accelerate
点击跳转火山引擎ByteHouse理解更多