乐趣区

关于实时计算:新东方基于Hologres实时离线一体化数仓建设实践

业务介绍

新东方教育科技团体定位于以学生全面成长为外围,以科技为驱动力的综合性教育团体。团体由 1993 年成立的北京新东方学校发展壮大而来,领有短期培训零碎、基础教育零碎、文化流传零碎等业务。

在互联网大潮中,新东方在 IT 技术上也一直重构,继续投入大数据建设,研发大数据的相干技术和利用,从而疾速而精准地响应业务需要,并用数据为团体各级领导提供决策依据。新东方的大数据利用次要包含两局部:

  • 企业应用端的业务场景(B 端):包含交易,教学,人员等数据,数据规模为 TB 级。数据会被依照不同的条件和学校层级等,造成营收、教学、客服、财产人事等实时报表,为 CRM 零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时也供各级管理人员理解业务的运行状况,辅助业务决策。
  • 互联网间接面向用户场景(C 端):次要为招生引流类、云教室等利用,包含网页版,App 端,H5 等,数据量为 PB 级。这部分数据记录了用户(学员和潜在用户)在新东方的教学闭环轨迹,C 端数据除了生成惯例的经营报表外,还会绘制用户画像,进而开发举荐零碎和圈选等利用,改善 C 端各种利用的用户体验,进一步精细化经营。

数仓建设和利用痛点

为了满足日益增长的业务需要,团体开始投入数仓建设。在数据仓库建设的初期,以业务驱动为主。通过阿里云的 MaxCompute 为外围构建数据仓库,间接集成业务库数据以及 WEB 利用的 OSS 日志等,而后在数据仓库中剖析业务数据并产生统计分析后果。初期的架构如下:

依据业务须要,将中小型规模的后果导入 MySQL 并反对数据更新。数据规模较大的只读后果则导入 MongoDB。
而后 Web 服务层查问 MySQL 和 MongoDB 并向用户提供服务接口,Web 服务层也能够通过 Lightning 减速接口间接查问 MaxCompute 的数据,

Lightning 协定是 MaxCompute 查问减速服务,反对以 PostgreSQL 协定及语法连贯拜访 MaxCompute 数据,相比 MaxCompute 提供的 odps jdbc 接口速度要快得多。起因是后者把每次拜访作为一个 Map-Reduce 解决,即便很小的数据量查问响应工夫也要超过 10 秒,而 Lightning 能将延时降到百毫秒内,满足业务后果报表的展现需要。目前 Lightning 服务进入服务下线阶段,新的减速服务由 Hologres 减速集群代替。

应用这套架构能够在较短的工夫内满足报表开发、用户画像和举荐服务等需要,为新东方的日常经营和招生引流提供较好的数据反对。然而随着业务的发展,这套架构越来越难以满足用户的需要,次要体现在:

  1. 实时性,业务心愿可能达到 1 分钟级甚至秒级的实时性,而应用 MaxCompute 只能实现批量解决,个别只能提供分钟级(个别 5 分钟以上)的延时
  2. 来自 Web 服务层的 高并发查问,MaxCompute 的大数据量查问只能反对到 100 左右的 QPS,满足不了来自 C 端利用的高并发查问
  3. 简单逻辑的 大数据量剖析 和 Ad-hoc 查问,随着剖析数据迅速从数百 G 上涨到 TB 级,在多个数亿行以上的数据进行简单报表开发,单实例 MySQL 难以反对;而 MongoDB 无奈应用规范的 SQL 进行简单查问,同时 MongoDB 自身简单的查问业务,开发效率很低。
  4. Lightning 接口尽管反对规范的 SQL 并且某些场景上速度比拟快,然而 Lightning 开始逐步下线,须要找到替换的办法。

实时数仓选型

要解决以上的业务痛点,就须要找到能满足实时数仓建设需要的产品。大数据团队调研了多种实时数仓计划,基于新东方的数据和利用特点进行选型,计划比对如下:

产品 Ad-hoc 查问 高并发反对(QPS) SQL 反对 TP(交易)反对 与 MaxCompute/Flink 集成 文档和技术支持
ClickHouse 20.1 反对 PB 级以上 默认反对 100 的并发查问,qps 取决于单个查问的响应工夫 单表查问反对较好,简单报表查问反对较弱 通过 mutation 反对 update,较弱 反对 文档丰盛,社区反对较好
Doris 0.9 反对 PB 级以上 数百 兼容 MySQL 不反对 通过兼容 MySQL 与 MaxCompute 集成, 与 Flink 的集成 不明确 文档和社区都较差
Hologres 1.1 反对 PB 级以上 数万以上 兼容 PostgreSQL DDL 反对 与 MaxCompute 间接在存储层集成,并且都兼容 PostgreSQL,提供 Flink Connector 集成 阿里在线文档和技术支持
Tidb 4.x(含 Tiflash) 反对 PB 级以上 数万以上 兼容 MySQL 反对 反对 文档丰盛,社区反对较好
Elastic Search 7.x 反对 PB 级以上 数万以上 不反对规范 SQL 不反对 反对与 MaxCompute 集成,Flink Connector 只反对 Source 文档丰盛,社区反对较好

从以上的表格能看出,Tidb 和 Hologres 能够较好地解决新东方在大数据方面面临的问题。然而 Tidb 须要公有云部署并运维,而 MaxCompute 部署在私有云,两者在不同的云环境。Hologres 是阿里云提供的云原生服务,并与 MaxCompute 都部署在私有云,且在 Pangu 文件层严密集成,数据交换效率远高于其余内部零碎,两者都兼容 PostgreSQL,从离线数据仓库开发迁徙到实时数据仓库开发难度升高。

基于以上的剖析,抉择 Hologres 作为实时数仓。

实时数仓建设

实时数仓是在离线数仓的根底上,基于 Lambda 架构构建,离线和实时同时进行建设。无关 Lambda 的,参阅:[Lambda architecture]

架构的各组件阐明:
1)数据源:

  • Binlog,即各类利用(B 端和 C 端)的数据库 Binlog,对于 SQL SERVER 的数据库则是 CT log;
  • App 音讯,即 App 运行时上报的事件;
  • Web 日志 / 埋点日志,即 Web 服务器所产生的 ngix 日志,以及 Web app/H5 运行时埋点服务产生的日志

2)CDC 数据总线(简称 CDC)

  • CDC 数据总线采集数据源,写入 Kafka Topic。对于离线数仓和实时数仓, CDC 都是间接交互的数据源 /
  • CDC 包含 Source Connector、Kafka 集群、Sink Connector 三局部。Source Connector 负责从数据源采集数据并写入 Kafka 集群的 Topic,而 Sink Connector 则将 Kafka Topic 的数据 ETL 到指标库,包含实时和离线数仓。
  • CDC 易于部署和监控,并提供了简略的数据过滤,老本较低,数据 ETL 工作尽量采纳 CDC。

3)离线数据处理

  • 离线数据处理基于 MaxCompute 搭建,用于计算全量数据,数据源来自于 CDC 的实时导入。离线数据通过离线数仓计算(ODS->DWD/DWS→ADS)导入 Hologres 作为存量数据,一部分离线的 DWD/DWS 数据也导入 Hologres 作为维表的存量数据。
  • Flink 计算工作会将 ADS 层后果 Sink 到 MaxCompute,用于数据备份。

4)实时数据处理
实时数据处理基于阿里云托管的 Flink 流式计算引擎。与离线数仓解决固定日期的数据(如 T +1)不同,实时数仓解决的是流式数据,从工作启动开始,就始终运行,除非异样终止,否则不会完结。数仓的档次与离线数仓相似,依据实时处理的特点做了简化。如下表所示:

数仓档次 形容 数据载体
ODS 层 与数据源表构造类似,数据未通过解决 Kafka Topic/cdc Connector
DWD/DWS 层 数据仓库层,依据业务线 / 主题解决数据,可复用 Kafka Topic
DIM 层 维度层 holo 维表,Kafka Topic
ADS 层 应用层,面向利用创立,存储处理结果 holo 实时后果表,Kafka Topic

5)Hologres 数据查问

数据表名称 形容 数仓档次 数据源
维度数据表 维度建模后的数据表,在实时计算时事实表通过 JDBC 查问 DIM 层 初始化数据来自离线数仓 dim 层、CDC、Flink 维表计算工作
实时后果表 实时数仓的计算结果表 实时数仓 DWS/ADS 层 实时数仓的 DWS/ADS 层计算工作
存量后果表 离线数仓的计算结果表 实时数仓 DWS/ADS 层 离线数仓的 DWS/ADS 层计算工作
查问 view 合并实时和存量后果,对外提供对立的展现 View 实时数仓 ADS 层 存量后果表
实时后果表
表面 来自 MaxCompute 的数据表援用 各层次 离线数仓
备份表 备份实时计算一段时间内的数据,用于做数据校验和问题诊断 DWD/DWS 层 实时数仓

利用场景

通过新的架构,反对了新东方团体内如下利用场景:

  1. 实时报表查问:为 CRM 零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时为管理层提供实时流动看板服务,延时秒级,辅助业务决策。
  2. Ad-hoc 查问:B 端和 C 端经营人员能够间接通过 Hologres 定制本人的简单业务查问
  3. 用户轨迹和画像场景:实时处理用户来自 B 端和 C 端的数据,生成用户轨迹和标签,为业务疾速决策提供根据。
  4. 举荐零碎和圈选业务:通过 Maxcompute 训练离线模型,并通过 Flink 数据调整模型的参数。基于用户的实时轨迹数据圈选出符合条件的用户并推送服务,进一步精细化经营。

应用实际

一个典型的实时工作解决流程如下图所示:

  1. ODS 层数据通过 CDC 数据总线导入 MaxCompute,提供离线计算源数据。同时也会将数据写入到 Hologres, 用于做数据验证。在 Hologres 中,维表存储全量数据。而其余类型的 ODS 数据表个别存储工夫 > 离线的计算周期即可,如离线 T +1,则存储 2 天,无相应的离线计算工作依据验证数据周期而定。
  2. Flink 工作读取 ODS 层数据作为输出,与存储在 Hologres 中的维表做关联,计算的后果存储到 DWD/DWS 层的 Kafka Topic 中,同时将后果写入到 Hologres 用于数据验证,数据存储工夫与 ODS 层雷同
  3. Flink 工作读取 DWD/DWS 层数据,与存储在 Hologres 中的维表做关联,将结算的后果存储到 Hologres。依据利用须要,如果是 Lambda 架构,存储工夫 > 离线的计算周期即可,如离线 T +1,则存储 2 天,如果是 Kappa 架构,保留全副数据,同时将后果数据写入离线数仓用于离线剖析用(可选)。

上面具体介绍在每一步解决流程中的应用实际与教训优化,以帮忙达到更好的成果。

数据验证

因为实时处理源数据和后果都是动静的,数据验证无奈在工作中进行。能够在 Hologres 中,对实时数仓的各层落仓后果进行验证。因为实时处理和工夫相干,每一档次的数据都须要带上一个解决工夫戳(Process Time)。在 Lambda 架构中,将实时后果和离线后果进行比对,假如离线解决周期为 T +1, 则实时处理取工夫戳与昨天的数据进行比对,计算出准确率。如果是 Kappa 架构,须要进行逻辑验证,并与业务人员解决的后果数据进行比对。

全量数据初始化

Kafka Topic 个别存储几天内的数据,不能提供全量数据,所以须要从离线数仓进行全量数据初始化,将维表、ADS 层后果等导入 Hologres。

Hologres 维表的 Lookup 和性能优化

1)Lookup
在 Flink 计算工作中,流表和 Hologres 的维度数据表 Join,就是 Lookup。Lookup 须要解决两个问题:

  1. 维表索引:理论处理过程是每条流表的数据,应用 Join 条件的列去维表中查问,将后果返回。Hologres 的维表的索引须要和 Flink SQL 的 Join key 统一。
  2. 维表的提早:因为维表的数据导入是另外的工作(CDC 工作或者 Flink 工作),就会呈现数据不同步的状况,流表数据已到,而关联的维度数据没有入库。

对于问题 1,在创立 Hologres 的维度表时,须要依据 Flink SQL 的须要去设置表的各类索引,尤其是 Distribution key 和 Clustering key,使之与 Join 的关联条件列统一,无关 Hologres 维表的索引会在前面大节提到。

对于问题 2,维表和流表 Join 中,解决两者数据不同步的问题,通过设置窗口能够解决大部分问题,然而因为 watermark 触发窗口执行,须要兼顾维表数据提早较多的状况,因此 watermark duration 设置较高,从而导致了数据处理工作的 Latency 很高,有时不合乎疾速响应的业务要求,这时能够采纳联结 Join,,将双流 Join 和 Lookup 联合起来。

维表数据包含两局部:1. Hologres 维表,查问全量数据. 2. 从维表对应的 Kafka Topic 创立的流表,查问最新的数据。Join 时,先取维表对应的流表数据,如果不存在取 Hologres 维表的数据。

以下是一个例子,t_student(学员表)的流表和 t_account(用户表) Join 获取学员的 user id

combined join
//stream table:stream_uc_account
val streamUcAccount: String =
s"""
CREATE TABLE `stream_t_account` (
`user_id` VARCHAR
,`mobile` VARCHAR
.......(omitted)
,WATERMARK FOR event_time AS event_time - INTERVAL '20' SECOND
) WITH (
 'connector' = 'kafka'
 .......(omitted)
)
""".stripMargin
//dim table:t_account
val odsUcAccount: String =
s"""CREATE TABLE `t_account` WITH ('connector'='jdbc',
.......(omitted)
) LIKE stream_t_account (excluding ALL)
""".stripMargin
//query sql: combined join
val querySql:String =
s"""
select
   coalesce(stm_acc.user_id,acc.user_id) as user_id
from t_student stu
LEFT JOIN stm_acc
ON stu.stu_id  = stm_acc.student_id
AND stu.modified_time 
BETWEEN stm_acc.modified_time - INTERVAL '5' MINUTE 
AND stm_acc.modified_time + INTERVAL '5' SECOND
LEFT JOIN uc_account FOR SYSTEM_TIME AS OF stu.process_time AS acc
ON stu.stu_id = acc.student_id

2)维表性能的优化
Flink SQL 在 Lookup 时,流表每一条数据到来,会对 Join 的维表执行一次点查,Join 的条件就是查问条件,例如对于流表 stm_A 和维表 dim_B,Join 条件为 stm_A.id = dim.B.id
当 id=id1 的 stm_A 数据到来时,会产生一条查问:select <field list> from dim_B where id=id1,因为维表查问的频率十分高,所以 Join 的维表列应该有索引。
Hologres 索引包含:distribution key,clustering key,bitmap key,segment key(event timestamp) , 无关索引,能够参考 holo 表的创立和索引

留神:维表举荐用 Hologres 行存表,然而在理论状况中,因为维表还用于 adhoc 一类的剖析查问业务,所以本实际中大部分维表是列存表,以下实际论断是基于列存表和查问状况设定的,仅供参考,请依据业务状况正当设置。

实际论断 1:维表的 Join 列设置成 distribution key
因为以后应用列存作为维度表,维表的列数会影响查问性能,对于同一个维表,8 个列和 16 个列的性能相差 50% 以上,倡议 join 用到的列都设置成 distribution key,不能多也不能少。如果应用行存表,没有这个限度。

实际论断 2:尽可能减少维表的属性列
在利用中,维表可能有多个维度列会被用于 Join,例如表 T1,有两个维度列 F1、F2 别离用做和流表 A,B 的 Join 条件。依据 F1 和 F2 之间的关系,如果 F1..F2→1..n,就在 F1 上创立 distribution key, 反过来则在 F2 上创立,即在粒度较大的维度列上创立 distribution key。

实际论断 3: 一个维度表有多个维度列并且是 Hierarchy 时,在粒度较大的列上创立 distribution key, 并用在 Join 条件中
如果 F1..F2 是多对多的关系,阐明一个维表有两个交织的维度,而不是档次维度 (hierarchy) 上,须要进行拆分。
查问时,不论 Lookup 是否必须用到 distribution key 索引列,都要把 distribution key 索引放在 Join 条件里
示例:维表 t1 有两个维度列:stu_code 和 roster_code,distribution key 加在 stu_code 上
流表 stm_t2 须要 Lookup 维表 t1,关联条件是两个表的 roster_code 雷同

select <field list> from  FROM stm_t2 stm JOIN t1 FOR SYSTEM_TIME AS OF stm.process_time AS dim ON stm.stu_code = dim.stu_code and stm.roster_code = dim.roster_code 

业务价值

通过半年的实时数仓建设,并在团体内宽泛应用。为业务的带来的价值如下:

  1. 为经营人员提供了 1 分钟级 / 秒级的实时看板服务和实时报表,业务人员能够及时理解到用户的反馈和业务的过程,从而调整经营策略,进步经营效率
  2. 为 C 端利用提供了 秒级的实时用户画像服务和用户圈选服务,从而能够让举荐零碎及时依据用户反馈调整举荐产品列表,进步用户体验
  3. 开发效率大为进步,开发人员从之前的架构迁徙到 Hologres+Flink SQL 上后,开发效率进步了 1 - 2 倍,学习的梯度也升高了很多。
  4. 运维老本降落,之前须要保护 MySQL, MongoDB 等异构零碎,而 Hologres 是云原生服务,无需保护一个运维团队。

作者:陈毓林,新东方互联网技术核心大数据工程师。在新东方从事多年大数据架构研发,数据集成平台开发,以及业务数据分析等,次要技术畛域包含基于 flink 的实时计算和优化,kafka 相干的数据交换和集成等阿里云的云原生技术。

退出移动版