导读: 小米团体于 2019 年首次引入了 Apache Doris,目前 Apache Doris 曾经在小米外部数十个业务中失去广泛应用,并且在小米外部曾经造成一套以 Apache Doris 为外围的数据生态。本篇文章转录自 Doris 社区线上 Meetup 主题演讲,旨在分享 Apache Doris 在小米数据场景的落地实际与优化实际。
作者|小米 OLAP 引擎研发工程师 魏祚
业务背景
因增长剖析业务须要,小米团体于 2019 年首次引入了 Apache Doris。通过三年工夫的倒退,目前 Apache Doris 曾经在广告投放、新批发、增长剖析、数据看板、天星数科、小米有品、用户画像等小米外部数十个业务中失去广泛应用 ,并且在小米外部曾经造成一套以 Apache Doris 为外围的数据生态。
以后 Apache Doris 在小米外部曾经具备 数十个 集群、总体达到 数百台 BE 节点的规模,其中单集群最大规模达到 近百台节点 ,领有 数十个 流式数据导入产品线,每日单表最大增量 120 亿 、反对 PB 级别 存储,单集群每天能够反对 2W 次以上 的多维分析查问。
架构演进
小米引入 Apache Doris 的初衷是为了解决外部进行用户行为剖析时所遇到的问题。随着小米互联网业务的倒退,各个产品线利用用户行为数据对业务进行增长剖析的需要越来越迫切。让每个业务产品线都本人搭建一套增长剖析零碎,不仅老本昂扬,也会导致效率低下。因而能有一款产品可能帮忙他们屏蔽底层简单的技术细节,让相干业务人员可能专一于本人的技术畛域,能够极大进步工作效率。基于此,小米大数据和云平台联合开发了增长剖析零碎 Growing Analytics(下文中简称 GA ),旨在提供一个灵便的多维实时查问和剖析平台,对立数据接入和查问计划,帮忙业务线做精细化经营。(此处内容援用自:基于 Apache Doris 的小米增长剖析平台实际)
剖析、决策、执行是一个循环迭代的过程,在对用户进行行为剖析后,针对营销策略是否还有晋升空间、是否须要在前端对用户进行个性化推送等问题进行决策,帮忙小米实现业务的持续增长。这个过程是对用户行为进行 剖析 - 决策 - 优化执行 - 再剖析 - 再决策 - 再优化执行 的迭代过程。
历史架构
增长剖析平台立项于 2018 年年中,过后基于开发工夫和老本,技术栈等因素的思考,小米复用了现有各种大数据根底组件(HDFS, Kudu, SparkSQL 等),搭建了一套基于 Lamda 架构的增长剖析查问零碎。GA 零碎初代版本的架构如下图所示,蕴含了以下几个方面:
- 数据源:数据源是前端的埋点数据以及可能获取到的用户行为数据。
- 数据接入层:对埋点数据进行对立的荡涤后打到小米外部自研的音讯队列 Talos 中,并通过 Spark Streaming 将数据导入存储层 Kudu 中。
- 存储层:在存储层中进行冷热数据拆散。热数据寄存在 Kudu 中,冷数据则会寄存在 HDFS 上。同时在存储层中进行分区,当分区单位为地利,每晚会将一部分数据转冷并存储到 HDFS 上。
- 计算层 / 查问层:在查问层中,应用 SparkSQL 对 Kudu 与 HDFS 上数据进行联结视图查问,最终把查问后果在前端页面上进行显示。
在过后的历史背景下,初代版本的增长剖析平台帮忙咱们解决了一系列用户经营过程中的问题,但同时在历史架构中也存在了两个问题:
第一个问题: 因为历史架构是基于 SparkSQL + Kudu + HDFS 的组合,依赖的组件过多导致运维老本较高。本来的设计是各个组件都应用公共集群的资源,然而实际过程中发现执行查问作业的过程中,查问性能容易受到公共集群其余作业的影响,容易抖动,尤其在读取 HDFS 公共集群的数据时,有时较为迟缓。
第二个问题: 通过 SparkSQL 进行查问时,提早绝对较高。SparkSQL 是基于批处理零碎设计的查问引擎,在每个 Stage 之间替换数据 Shuffle 的过程中仍然须要落盘操作,实现 SQL 查问的时延较高。为了保障 SQL 查问不受资源的影响,咱们通过增加机器来保障查问性能,然而实际过程中发现,性能晋升的空间无限,这套解决方案并不能充沛地利用机器资源来达到高效查问的目标,存在肯定的资源节约。(此处内容援用自:基于 Apache Doris 的小米增长剖析平台实际)
针对上述两个问题,咱们的指标是寻求一款计算存储一体的 MPP 数据库来代替咱们目前的存储计算层的组件,在通过技术选型后,最终咱们决定应用 Apache Doris 替换老一代历史架构。
基于 Apache Doris 的新版架构
以后架构从数据源获取前端埋点数据后,通过数据接入层打入 Apache Doris 后能够间接查问后果并在前端进行显示。
抉择 Doris 起因:
- Doris 具备优良的查问性能,可能满足业务需要。
- Doris 反对规范 SQL,用户应用与学习老本较低。
- Doris 不依赖于其余的内部零碎,运维简略。
- Doris 社区领有很高活跃度,有利于后续零碎的保护降级。
新旧架构性能比照
咱们选取了日均数据量大概 10 亿的业务,别离在不同场景下进行了性能测试,其中蕴含 6 个事件剖析场景,3 个留存剖析场景以及 3 个漏斗剖析场景。通过比照后,得出以下论断:
- 在事件剖析的场景下,均匀查问所耗时间 升高了 85%。
- 在留存剖析和漏斗剖析场景下,均匀查问所耗时间 升高了 50% 。
利用实际
随着接入业务的增多和数据规模的增长,让咱们也遇到不少问题和挑战,上面咱们将介绍在 应用 Apache Doris 过程中积淀进去的一些实践经验。
数据导入
小米外部次要通过 Stream Load 与 Broker Load 以及大量 Insert 形式来进行 Doris 的数据导入。数据个别会先打到 Talos 音讯队列中,并分为实时数据和离线数据两个局部。实时数据写入 Apache Doris 中: 一部分业务在通过 Flink 对数据进行解决后,会通过 Doris 社区提供的 Flink Doris Connector 组件写入到 Doris 中,底层依赖于 Doris Stream Load 数据导入形式。也有一部分会通过 Spark Streaming 封装的 Stream Load 将数据导入到 Doris 中。离线数据写入 Apache Doris 中:
离线数据局部则会先写到 Hive 中,再通过小米的数据工场将数据导入到 Doris 中。用户能够间接在数据工场提交 Broker Load 工作并将数据间接导入 Doris 中,也能够通过 Spark SQL 将数据导入 Doris 中。Spark SQL 形式则是依赖了 Doris 社区提供的 Spark Doris Connector 组件,底层也是对 Doris 的 Stream Load 数据导入形式进行的封装。
数据查问
用户通过数据工场将数据导入至 Doris 后即可进行查问,在小米外部是通过小米自研的数鲸平台来做查问的。用户能够通过数鲸平台对 Doris 进行查问可视化,并实现用户行为剖析(为满足业务的事件剖析、留存剖析、漏斗剖析、路径分析等行为剖析需要,咱们为 Doris 增加了相应的 UDF 和 UDAF)和用户画像剖析。
尽管目前仍然须要将 Hive 的数据导过去,但 Doris 社区也正在反对湖仓一体能力,在后续实现湖仓一体能力后,咱们会思考间接通过 Doris 查问 Hive 与 Iceberg 表面。值得一提的是,Doris 1.1 版本曾经实现反对查问 Iceberg 表面能力。 同时在行将公布的 1.2 版本 中,还将反对 Hudi 表面并减少了 Multi Catalog,能够实现内部表元数据的同步,无论是查问内部表的性能还是接入表面的易用性都有了很大的晋升。
Compaction 调优
Doris 底层采纳相似 LSM-Tree 形式,反对疾速的数据写入。每一次的数据导入都会在底层的 Tablet 下生成一个新的数据版本,每个数据版本内都是一个个小的数据文件。单个文件外部是有序的,然而不同的文件之间又是无序的。为了使数据有序,在 Doris 底层就会存在 Compaction 机制,异步将底层小的数据版本合并成大的文件。Compaction 不及时就会造成版本累积,减少元数据的压力,并影响查问性能。因为 Compaction 工作自身又比拟消耗机器 CPU、内存与磁盘资源,如果 Compaction 开得太大就会占用过多的机器资源并影响到查问性能,同时也可能会造成 OOM。针对以上问题,咱们一方面从业务侧着手,通过以下方面疏导用户:
- 通过疏导业务侧进行正当优化,对表设置 正当的分区和分桶,防止生成过多的数据分片。
- 疏导用户尽量 升高数据的导入频率 , 增大单次数据导入的量,升高 Compaction 压力。
- 疏导用户 防止过多应用会在底层生成 Delete 版本的 Delete 操作。在 Doris 中 Compaction 分为 Base Compaction 与 Cumulative Compaction。Cumulative Compaction 会疾速的把大量新导入的小版本进行疾速的合并,在执行过程中若遇到 Delete 操作就会终止并将以后 Delete 操作版本之前的所有版本进行合并。因为 Cumulative Compaction 无奈解决 Delete 版本,在合并完之后的版本会和以后版本一起放到 Base Compaction 中进行。当 Delete 版本特地多时,Cumulative Compaction 的步长也会相应变短,只能合并大量的文件,导致 Cumulative Compaction 不能很好的施展小文件合并成果。
另一方面咱们从运维侧着手:
- 针对不同的业务集群配置不同的 Compaction 参数。 局部业务是实时写入数据的,须要的查问次数很多,咱们就会将 Compaction 开的大一点以达到疾速合并目标。而另外一部分业务只写明天的分区,然而只对之前的分区进行查问,在这种状况下,咱们会适当的将 Compaction 放的小一点,防止 Compaction 占用过大内存或 CPU 资源。到早晨导入质变少时,之前导入的小版本可能被及时合并,对第二天查问效率不会有很大影响。
- 适当升高 Base Compaction 工作优先级并减少 Cumulative Compaction 优先级。 依据上文提到的内容,Cumulative Compaction 可能疾速合并大量生成的小文件,而 Base Compaction 因为合并的文件较大,执行的工夫也会相应变长,读写放大也会比较严重。所以咱们心愿 Cumulative Compaction 优先、疾速的进行。
- 减少版本积压报警。 当咱们收到版本积压报警时,动静调大 Compaction 参数,尽快耗费积压版本。
- 反对手动触发指定表与分区下数据分片的 Compaction 工作。 因为 Compaction 不及时,局部表在查问时版本累积较多并须要可能疾速进行合并。所以,咱们反对对单个表或单个表下的某个分区进步 Compaction 优先级。
目前 Doris 社区针对以上问题曾经做了 一系列的优化 ,在 1.1 版本中 大幅加强了数据 Compaction 能力,对于新增数据可能疾速实现聚合,防止分片数据中的版本过多导致的 -235 谬误以及带来的查问效率问题。\
首先,在 Doris 1.1 版本中,引入了 QuickCompaction,减少了被动触发式的 Compaction 查看,在数据版本减少的时候被动触发 Compaction。同时通过晋升分片元信息扫描的能力,疾速的发现数据版本多的分片,触发 Compaction。通过主动式触发加被动式扫描的形式,彻底解决数据合并的实时性问题。
同时,针对高频的小文件 Cumulative Compaction,实现了 Compaction 工作的调度隔离,避免重量级的 Base Compaction 对新增数据的合并造成影响。
最初 ,针对小文件合并,优化了小文件合并的策略,采纳梯度合并的形式,每次参加合并的文件都属于同一个数据量级,避免大小差异很大的版本进行合并,逐步有档次的合并,缩小单个文件参加合并的次数,可能大幅的节俭零碎的 CPU 耗费。 在社区 1.1 新版本的测试后果中,不论是 Compaction 的效率、CPU 的资源耗费,还是高频导入时的查问抖动,成果都有了大幅的晋升。
具体能够参考: Apache Doris 1.1 个性揭秘:Flink 实时写入如何兼顾高吞吐和低延时
监控报警
Doris 的监控次要是通过 Prometheus 以及 Grafana 进行。对于 Doris 的报警则是通过 Falcon 进行。
小米外部应用 Minos 进行集群部署。Minos 是小米外部自研并开源的大数据服务过程管理工具。在实现 Doris 集群部署后会更新至小米外部的轻舟数仓中。在轻舟数仓中的节点注册到 ZooKeeper 后,Prometheus 会监听 ZooKeeper 注册的节点,同时拜访对应端口,拉取对应 Metrics。在这之后,Grafana 会在面板上对监控信息进行显示,若有指标超过预设的报警阈值,Falcon 报警零碎就会在报警群内报警,同时针对报警级别较高或某些无奈及时响应的正告,可间接通过电话呼叫值班同学进行报警。\
另外,小米外部针对每一个 Doris 集群都有 Cloud – Doris 的守护过程。Could – Doris 最大性能是能够对 Doris 进行可用性探测。比方咱们每一分钟对 Doris 发送一次 select current timestamp(); 查问,若本次查问 20 秒没有返回,咱们就会判断本次探测不可用。小米外部对每一个集群的可用性进行保障,通过上述探测办法,能够在小米外部输入 Doris 可用性指标。
小米对 Apache Doris 的优化实际
在利用 Apache Doris 解决业务问题的同时,咱们也发现了 Apache Doris 存在的一些优化项,因而在与社区进行沟通后咱们开始深度参加社区开发,解决本身问题的同时也及时将开发的重要 Feature 回馈给社区,具体包含 Stream Load 两阶段提交(2PC)、单正本数据导入、Compaction 内存限度等。
Stream Load 两阶段提交(2PC)
遇到的问题
在 Flink 和 Spark 导入数据进 Doris 的过程中,当某些异样情况产生时可能会导致如下问题:
Flink 数据反复导入 : Flink 通过周期性 Checkpoint 机制解决容错并实现 EOS,通过主键或者两阶段提交实现蕴含内部存储的端到端 EOS。Doris-Flink-Connector 1.1 之前 UNIQUE KEY 表通过惟一键实现了 EOS,非 UNIQUE KEY 表不反对 EOS。
Spark SQL 数据局部导入 : 通过 SparkSQL 从 Hive 表中查出的数据并写入 Doris 表中的过程须要应用到 Spark Doris Connector 组件,会将 Hive 中查问的数据通过多个 Stream Load 工作写入 Doris 中,出现异常时会导致局部数据导入胜利,局部导入失败。
Stream Load 两阶段提交设计
以上两个问题能够通过导入反对两阶段提交解决,第一阶段实现后确保数据不丢且数据不可见,这就能保障第二阶段发动提交时肯定能胜利,也可能保障第二阶段发动勾销时肯定能胜利。
Doris 中的写入事务分为三步:
- 在 FE 上开始事务,状态为 Prepare;
- 数据写入 BE;
- 少数正本写入胜利的状况下,提交事务,状态变成 Committed,并且 FE 向 BE 下发 Publish Version 工作,让数据立刻可见。
引入两阶段提交之后,第 3 步变为状态批改为 Pre Commit,Publish Version 在第二阶段实现。用户在第一阶段实现后(事务状态为 Pre Commit),能够抉择在第二阶段放弃或者提交事务。
反对 Flink Exactly-Once 语义
Doris-Flink-Connector 1.1 应用两阶段 Stream Load 并反对 Flink 两阶段提交实现了 EOS,只有全局的 Checkpoint 实现时,才会发动 Sream Load 的第二阶段提交,否则发动第二阶段放弃。
解决 SparkSQL 数据局部导入
Doris-Spark-Connector 应用两阶段 Stream Load 之后,胜利的 Task 通过 Stream Load 第一阶段将写入数据到 Doris(Pre Commit 状态,不可见),当作业胜利后,发动所有 Stream Load 第二阶段提交,作业失败时,发动所有 Stream Load 第二阶段勾销。这就确保了不会有数据局部导入的问题。
单正本数据导入优化
单正本数据导入设计
Doris 通过多正本机制确保数据的高牢靠以及零碎高可用。 写入工作能够依照应用的资源分为计算和存储两类:排序、聚合、编码、压缩等应用的是 CPU 和内存的计算资源,最初的文件存储应用存储资源,三正本写入时计算和存储资源会占用三份。
那是否只写一份正本数据在内存中,待到单正本写入实现并生成存储文件后,将文件同步到另外两份正本呢?答案是可行的,因而针对三正本写入的场景,咱们做了单正本写入设计。单正本数据在内存中做完排序、聚合、编码以及压缩后,将文件同步至其余两个正本,这样很大水平上能够节俭出 CPU 和内存资源。
性能比照测试
Broker Load 导入 62G 数据性能比照
导入工夫: 三正本导入耗时 33 分钟,单正本导入耗时 31 分钟。
内存应用: 内存应用上优化成果非常显著,三正本数据导入的内存应用是单正本导入的三倍。单正本导入时只须要写一份内存,然而三正本导入时须要写三份内存,内存优化达到了 3 倍。
CPU 耗费比照: 三正本导入的 CPU 耗费差不多是单正本的三倍。
并发场景性能比照
测试中向 100 个表并发导入数据,每个表有 50 个导入工作,工作总数为 5000 个。单个 Stream Load 工作导入的数据行是 200 万行,约为 90M 的数据。测试中开了 128 个并发,将 单正本导入和三正本导入进行了比照:
导入工夫: 3 正本导入耗时 67 分钟,而后单正本耗时 27 分钟实现。导入效率相当晋升两倍以上。
内存应用: 单正本的导入会更低。
CPU 耗费比照: 因为都曾经是开了并发在导入,CPU 开销都比拟高,然而单正本导入吞吐晋升显著。
Compaction 内存限度
之前 Doris 在单机磁盘一次导入超过 2000 个 Segment 的状况下,Compaction 有内存 OOM 的问题。对于当天写入但不查当天数据而是查问之前的数据业务场景,咱们会把 Compaction 略微放的小一点,防止占用太大的内存,导致过程 OOM。Doris 之前每个磁盘有固定的线程做存储在这个盘上的数据的 Compaction,没有方法在全局进行管控。因为咱们要限度单个节点下面内存的应用,所以咱们将该模式改成了生产者 - 消费者模式:
生产者不停的从所有的磁盘下面生产工作,之后将生产工作提交到线程池中。咱们能够很好的把控线程池的入口,达到对 Compaction 的限度。咱们在合并时会把底层的小文件进行归并排序,之后在内存里给每一个文件开拓 Block,所以咱们能够近似认为占用的内存量与文件的数量是相干的,从而能够通过对单节点上同时执行合并的文件数量做限度,来达到管制内存的成果。
咱们减少了对单个 BE Compaction 合并的文件数量的限度。 若正在进行的 Compaction 的文件数量超过或等于以后限度时,后续提交上来的工作就须要期待,等到后面的 Compaction 工作做完并将指标释放出来后,后边提交进来的那些工作才能够进行。
通过这种形式,咱们对某些业务场景做了内存的限度,很好的防止集群负载高时占用过多内存导致 OOM 的问题。
总结
自从 Apache Doris 从 2019 年上线第一个业务至今,目前 Apache Doris 曾经在小米外部服务了数十个业务、集群数量达到数十个、节点规模达到数百台、每天实现数万次用户在线剖析查问,承当了包含增长剖析和报表查问等场景绝大多数在线剖析的需要。\
与此同时,以上所列小米对于 Apache Doris 的优化实际,曾经有局部性能曾经在 Apache Doris 1.0 或 1.1 版本中公布,有局部 PR 曾经合入社区 Master,在不久后公布的 1.2 新版本中应该就会与大家见面。随着社区的疾速倒退,有越来越多小伙伴参加到社区建设中,社区活跃度有了极大的晋升。Apache Doris 曾经变得越来越成熟,并开始从繁多计算存储一体的剖析型 MPP 数据库走向湖仓一体的路线,置信在将来还有更多的数据分析场景期待去摸索和实现。
退出社区
最初,欢送更多的开源技术爱好者退出 Apache Doris 社区,携手成长,共建社区生态。
SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师、产品经理和反对工程师组成的团队,凋敝开源社区生态,打造实时剖析型数据库畛域的国内工业界规范。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为用户和客户提供开箱即用的能力。
相干链接:
SelectDB 官方网站:
https://selectdb.com
Apache Doris 官方网站:
http://doris.apache.org
Apache Doris Github:
https://github.com/apache/doris
Apache Doris 开发者邮件组:
dev@doris.apache.org