乐趣区

关于数据库:日增百亿数据查询结果秒出-Apache-Doris-在-360商业化的统一-OLAP-应用实践

导读: 360 商业化为助力业务团队更好推动商业化增长,实时数仓共经验了三种模式的演进,别离是 Storm + Druid + MySQL 模式、Flink + Druid + TIDB 的模式 以及 Flink + Doris 的模式,基于 Apache Doris 的新一代架构的胜利落地使得 360 商业化团队实现了实时数仓在 OLAP 引擎上的对立,胜利实现宽泛实时场景下的秒级查问响应。本文将为大家进行具体介绍演进过程以及新一代实时数仓在广告业务场景中的具体落地实际。

作者|360 商业化数据团队 窦和雨、王新新

360 公司致力于成为互联网和平安服务提供商,是互联网收费平安的倡导者,先后推出 360 安全卫士、360 手机卫士、360 平安浏览器等平安产品以及 360 导航、360 搜寻等用户产品。

360 商业化依靠 360 产品宏大的用户笼罩能力和超强的用户粘性,通过业余数据处理和算法实现广告精准投放,助力数十万中小企业和 KA 企业实现价值增长。360 商业化数据团队次要是对整个广告投放链路中所产生的数据进行计算解决,为产品经营团队提供策略调整的剖析数据,为算法团队提供模型训练的优化数据,为广告主提供广告投放的成果数据。

业务场景

在正式介绍 Apache Doris 在 360 商业化的利用之前,咱们先对广告业务中的典型应用场景进行简要介绍:

  • 实时大盘: 实时大盘场景是咱们对外出现数据的要害载体,须要从多个维度监控商业化大盘的指标状况,包含流量指标、生产指标、转化指标和变现指标,因而其对数据的准确性要求十分高(保证数据不丢不重),同时对数据的时效性和稳定性要求也很高,要求实现秒级提早、分钟级数据恢复。
  • 广告账户的实时生产数据场景: 通过监控账户粒度下的多维度指标数据,及时发现账户的生产变动,便于产品团队依据实时生产状况推动经营团队对账户估算进行调整。在该场景下数据一旦呈现问题,就可能导致账户估算的谬误调整,从而影响广告的投放,这对公司和广告主将造成不可估量的损失,因而在该场景中,同样对数据准确性有很高的要求。目前在该场景下遇到的艰难是如何在数据量比拟大、查问穿插的粒度比拟细的前提下,实现秒级别查问响应。
  • AB 试验平台: 在广告业务中,算法和策略同学会针对不同的场景进行试验,在该场景下,具备报表维度不固定、多种维度灵便组合、数据分析比较复杂、数据量较大等特点,这就须要能够在百万级 QPS 下保证数据写入存储引擎的性能,因而咱们须要针对业务场景进行特定的模型设计和解决上的优化,进步实时数据处理的性能以及数据查问剖析的效率,只有这样能力满足算法和策略同学对试验报表的查问剖析需要。

实时数仓演进

为晋升各场景下数据服务的效率,助力相干业务团队更好推动商业化增长,截至目前实时数仓共经验了三种模式的演进,别离是 Storm + Druid + MySQL 模式、Flink + Druid + TIDB 的模式 以及 Flink + Doris 的模式,本文将为大家进行具体介绍实时数仓演进过程以及新一代实时数仓在广告业务场景中的具体落地。

第一代架构

该阶段的实时数仓是基于 Storm + Druid + MySQL 来构建的,Storm 为实时处理引擎,数据经 Storm 解决后,将数据写入 Druid,利用 Druid 的预聚合能力对写入数据进行聚合。

架构痛点:

最后咱们试图依附该架构解决业务上所有的实时问题,经由 Druid 对立对外提供数据查问服务,然而在理论的落地过程中咱们发现 Druid 是无奈满足某些分页查问和 Join 场景的,为解决该问题,咱们只能利用 MySQL 定时工作的形式将数据定时从 Druid 写入 MySQL 中(相似于将 MySQL 作为 Druid 的物化视图),再通过 Druid + MySQL 的模式对外提供服务。通过这种形式临时能够满足某些场景需要,但随着业务规模的逐渐扩充,当面对更大规模数据下的查问剖析需要时,该架构已难以为继,架构的缺点也越发显著:

  • 面对数据量的持续增长,数据仓库压力空前剧增,已无奈满足实时数据的时效性要求。
  • MySQL 的分库分表保护难度高、投入老本大,且 MySQL 表之间的数据一致性无奈保障。

第二代架构

基于第一套架构存在的问题,咱们进行了首次降级,这次降级的次要变动是将 Storm 替换成新的实时数据处理引擎 Flink,Flink 相较于 Storm 不仅在许多语义和性能上进行了扩大,还对数据的一致性做了保障,这些个性使得报表的时效性大幅晋升;其次咱们应用 TiDB 替换了 MySQL,利用 TIDB 分布式的个性,肯定水平上解决了 MySQL 分库分表难以保护的问题(TiDB 在肯定水平上比 MySQL 可能承载更大数据量,能够拆分更少表)。在降级实现后,咱们依照不同业务场景的需要,将 Flink 解决完的数据别离写入 Druid 和 TiDB,由 Druid 和 TIDB 对外提供数据查问服务。

架构痛点:

尽管该阶段的实时数仓架构无效晋升了数据的时效性、升高了 MySQL 分库分表保护的难度,但在一段时间的应用之后又暴露出了新的问题,也迫使咱们进行了第二次降级:

  • Flink + TIDB 无奈实现端到端的一致性,起因是当其面对大规模的数据时,开启事务将对 TiDB 写入性能造成很大的影响,该场景下 TiDB 的事务形同虚设,爱莫能助。
  • Druid 不反对规范 SQL,应用有肯定的门槛,相干团队应用数据时非常不便,这也间接导致了工作效率的降落。
  • 保护老本较高,须要保护两套引擎和两套查问逻辑,极大减少了保护和开发成本的投入。

新一代实时数仓架构

第二次降级咱们引入 Apache Doris 联合 Flink 构建了新一代实时数仓架构,借鉴离线数仓分层理念对实时数仓进行分层构建,并对立 Apache Doris 作为数仓 OLAP 引擎,由 Doris 对立对外提供服务。

咱们的数据次要源自于维表物料数据和业务打点日志。维表物料数据会定时全量同步到 Redis 或者 Aerospike(相似于 Redis 的 KV 存储)中,通过 Binlog 变更进行增量同步。业务数据由各个团队将日志收集到 Kafka,外部称为 ODS 原始数据(ODS 原始数据不做任何解决),咱们对 ODS 层的数据进行归一化解决,包含字段命名、字段类型等,并对一些有效字段进行删减,并依据业务场景拆分生成 DWD 层数据,DWD 层的数据通过业务逻辑加工以及关联 Redis 中维表数据或者多流 Join,最初生成面向具体业务的大宽表(即 DWT 层数据),咱们将 DWT 层数据通过聚合、经由 Stream Load 写入 Doris 中,由 Doris 对外提供数据查问服务。在离线数仓局部,同样也有一些场景须要每日将加工完的 DWS 数据经由 Broker Load 写入到 Doris 集群中,并利用 Doris 进行查问减速,以晋升咱们对外提供服务的效率。

抉择 Doris 的起因

基于 Apache Doris 高性能、极繁难用、实时对立等诸多个性,助力 360 商业化胜利构建了新一代实时数仓架构,本次降级不仅晋升了实时数据的复用性、实现了 OLAP 引擎的对立,而且满足了各大业务场景严苛的数据查问剖析需要,使得整体实时数据流程架构变得简略,大大降低了其保护和应用的老本。咱们抉择 Doris 作为对立 OLAP 引擎的重要起因大抵可归结为以下几点:

  • 物化视图: Doris 的物化视图与广告业务场景的特点符合度十分高,比方广告业务中大部分报表的查问维度绝对比拟固定,利用物化视图的个性能够晋升查问的效率,同时 Doris 能够保障物化视图和底层数据的一致性,该个性可帮忙咱们升高保护老本的投入。
  • 数据一致性: Doris 提供了 Stream Load Label 机制,咱们可通过事务的形式与 Flink 二阶段提交进行联合,以保障幂等写入数据,另外咱们通过自研 Flink Sink Doris 组件,实现了数据的端到端的一致性,保障了数据的准确性。
  • SQL 协定兼容:Doris 兼容 MySQL 协定,反对规范 SQL,这无论是对于开发同学,还是数据分析、产品同学,都能够实现无老本连接,相干同学间接应用 SQL 就能够进行查问,应用门槛很低,为公司节俭了大量培训和应用老本,同时也晋升了工作效率。
  • 优良的查问性能: Apache Doris 已全面实现向量化查问引擎,使 Doris 的 OLAP 性能体现更加强悍,在多种查问场景下都有非常明显的性能晋升,可极大优化了报表的询速度。同时依靠列式存储引擎、古代的 MPP 架构、预聚合物化视图、数据索引的实现,在低提早和高吞吐查问上,都达到了极速性能
  • 运维难度低: Doris 对于集群和和数据正本治理上做了很多自动化工作,这些投入使得集群运维起来十分的简略,近乎于实现零门槛运维。

在 AB 试验平台的具体落地

Apache Doris 目前广泛应用于 360 商业化外部的多个业务场景。比方在实时大盘场景中,咱们利用 Doris 的 Aggregate 模型对申请、曝光、点击、转化等多个实时流进行事实表的 Join;依附 Doris 事务个性保证数据的一致性;通过多个物化视图,提前依据报表维度聚合数据、晋升查问速度,因为物化视图和 Base 表的统一关系由 Doris 来保护保障,这也极大的升高了应用复杂度。比方在账户实时生产场景中,咱们次要借助 Doris 优良的查问优化器,通过 Join 来计算同环比 ……

接下来仅以 AB 试验平台这一典型业务场景为例,详尽的为大家介绍 Doris 在该场景下的落地实际,在上述所举场景中的利用将不再赘述。

AB 试验在广告场景中的利用十分宽泛,是掂量设计、算法、模型、策略对产品指标晋升的重要工具,也是精细化经营的重要伎俩,咱们能够通过 AB 试验平台对迭代计划进行测试,并联合数据进行剖析和验证,从而优化产品计划、晋升广告成果。

在文章结尾也有简略介绍,AB 试验场景所承载的业务绝对比较复杂,这里再具体阐明一下:

  • 各维度之间组合灵便度很高,例如须要对从 DSP 到流量类型再到广告地位等十几个维度进行剖析,实现从申请、竞价、曝光、点击、转化等几十个指标的残缺流量漏斗。
  • 数据量微小,日均流量能够达到 百亿级别 ,峰值可达 百万 OPS(Operations Per Second),一条流量可能蕴含 几十个试验标签 ID

基于以上特点,咱们在 AB 试验场景中一方面须要保证数据算的快、数据提早低、用户查问数据快,另一方面也要保证数据的准确性,保障数据不丢不重。

数据落地

当面对一条流量可能蕴含几十个试验标签 ID 的状况时,从剖析角度登程,只须要选中一个试验标签和一个对照试验标签进行剖析;而如果通过 like 的形式在几十个试验标签中去匹配选中的试验标签,实现效率就会非常低。

最后咱们冀望从数据入口处将试验标签打散,将一条蕴含 20 个试验标签的流量拆分为 20 条只蕴含一个试验标签的流量,再导入 Doris 的聚合模型中进行数据分析。而在这个过程中咱们遇到一个显著的问题,当数据被打散之后会收缩数十倍,百亿级数据将收缩为千亿级数据,即使 Doris 聚合模型会对数据再次压缩,但这个过程会对集群造成极大的压力。因而咱们放弃该实现形式,开始尝试将压力摊派一部分到计算引擎,这里须要留神的是,如果将数据间接在 Flink 中打散,当 Job 全局 Hash 的窗口来 Merge 数据时,收缩数十倍的数据也会带来几十倍的网络和 CPU 耗费。

接着咱们开始第三次尝试,这次尝试咱们思考在 Flink 端将数据拆分后立即进行 Local Merge,在同一个算子的内存中开一个窗口,先将拆分的数据进行一层聚合,再通过 Job 全局 Hash 窗口进行第二层聚合,因为 Chain 在一起的两个算子在同一个线程内,因而能够大幅升高收缩后数据在不同算子之间传输的网络耗费。该形式 通过两层窗口的聚合,再联合 Doris 的聚合模型,无效升高了数据的收缩水平,其次咱们也同步推动实业务方定期清理已下线的试验,缩小计算资源的节约。

思考到 AB 试验剖析场景的特点,咱们将试验 ID 作为 Doris 的第一个排序字段,利用前缀索引能够很快定位到指标查问的数据。另外依据罕用的维度组合建设物化视图,进一步放大查问的数据量,Doris 物化视图根本可能笼罩 80% 的查问场景 ,咱们会定期剖析查问 SQL 来调整物化视图。 最终咱们通过模型的设计、前缀索引的利用,联合物化视图能力,使大部分试验查问后果可能实现秒级返回。

数据一致性保障

数据的准确性是 AB 试验平台的根底,当算法团队醉生梦死优化的模型使广告成果晋升了几个百分点,却因数据失落看不出试验成果,这样的后果的确无奈令人承受,同时这也是咱们外部不容许呈现的问题。那么咱们该如何防止数据失落、保障数据的一致性呢?

自研 Flink Sink Doris 组件

咱们外部已有一套 Flink Stream API 脚手架,因而借助 Doris 的幂等写个性和 Flink 的二阶段提交个性,自研了 Sink To Doris 组件,保障了数据端到端的一致性,并在此基础上新增了异常情况的数据保障机制。

在 Doris 0.14 版本中(初期应用的版本),咱们个别通过“同一个 Label ID 只会被写入一次”的机制来保证数据的一致性;在 Doris 1.0 版本之后,通过“Doris 的事务联合 Flink 二阶段提交”的机制来保证数据的一致性。这里具体分享应用 Doris 1.0 版本之后,通过“Doris 的事务联合 Flink 二阶段提交”机制保证数据的一致性的原理与实现。

在 Flink 中做到数据端到端的一致性有两种形式,一种为通过至多一次联合幂等写,一种为通过恰好一次的二阶段事务。

如右图所示,咱们首先在数据写入阶段先将数据写入本地文件,一阶段过程中将数据预提交到 Doris,并保留事务 ID 到状态,如果 Checkpoint 失败,则手动放弃 Doris 事务;如果 Checkpoint 胜利,则在二阶段进行事务提交。对于二阶段提交重试屡次依然失败的数据,将提供数据以及事务 ID 保留到 HDFS 的选项,通过 Broker Load 进行手动复原。为了防止单次提交数据量过大,而导致 Stream Load 时长超过 Flink Checkpoint 工夫的状况,咱们提供了将单次 Checkpoint 拆分为多个事务的选项。最终胜利通过二阶段提交的机制实现了对数据一致性的保障。

利用展现

下图为 Sink To Doris 的具体利用,整体工具屏蔽了 API 调用以及拓扑流的组装,只须要通过简略的配置即可实现 Stream Load 到 Doris 的数据写入。

集群监控

在集群监控层面,咱们采纳了社区提供的监控模板,从集群指标监控、主机指标监控、数据处理监控三个方面登程来搭建 Doris 监控体系。其中集群指标监控和主机指标监控次要依据社区监控阐明文档进行监控,以便咱们查看集群整体运行的状况。除社区提供的模板之外,咱们还新增了无关 Stream Load 的监控指标,比方对以后 Stream Load 数量以及写入数据量的监控,如下图所示:

除此之外,咱们对数据写入 Doris 的时长以及写入的速度也比拟关注,依据本身业务的需要,咱们对工作写入数据速度、解决数据耗时等数据处理相干指标进行监控,帮忙咱们及时发现数据写入和读取的异常情况,借助公司外部的报警平台进行监控告警,报警形式反对电话、短信、推推、邮件等

总结与布局

目前 Apache Doris 次要利用于广告业务场景,已有数十台集群机器,笼罩近 70% 的实时数据分析场景,实现了全量离线试验平台以及局部离线 DWS 层数据查问减速。以后日均新增数据规模能够达到百亿级别,在大部分实时场景中,其查问提早在 1s 内。同时,Apache Doris 的胜利落地使得咱们实现了实时数仓在 OLAP 引擎上的对立。Doris 优异的剖析性能及简略易用的特点,也使得数仓架构更加简洁。

将来咱们将对 Doris 集群进行扩大,依据业务优先级进行资源隔离,欠缺资源管理机制,并打算将 Doris 利用到 360 商业化外部更宽泛的业务场景中,充分发挥 Doris 在 OLAP 场景的劣势。最初咱们将更加深刻的参加到 Doris 社区中来,踊跃回馈社区,与 Doris 并肩同行,共同进步!

退出移动版