关于实时计算:新东方基于Hologres实时离线一体化数仓建设实践

业务介绍

新东方教育科技团体定位于以学生全面成长为外围,以科技为驱动力的综合性教育团体。团体由1993年成立的北京新东方学校发展壮大而来,领有短期培训零碎、基础教育零碎、文化流传零碎等业务。

在互联网大潮中,新东方在IT技术上也一直重构,继续投入大数据建设,研发大数据的相干技术和利用,从而疾速而精准地响应业务需要,并用数据为团体各级领导提供决策依据。新东方的大数据利用次要包含两局部:

  • 企业应用端的业务场景(B端):包含交易,教学,人员等数据,数据规模为TB级。数据会被依照不同的条件和学校层级等,造成营收、教学、客服、财产人事等实时报表,为CRM零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时也供各级管理人员理解业务的运行状况,辅助业务决策。
  • 互联网间接面向用户场景(C端):次要为招生引流类、云教室等利用,包含网页版,App端,H5等,数据量为PB级。这部分数据记录了用户(学员和潜在用户)在新东方的教学闭环轨迹,C端数据除了生成惯例的经营报表外,还会绘制用户画像,进而开发举荐零碎和圈选等利用,改善C端各种利用的用户体验,进一步精细化经营。

数仓建设和利用痛点

为了满足日益增长的业务需要,团体开始投入数仓建设。在数据仓库建设的初期,以业务驱动为主。通过阿里云的MaxCompute为外围构建数据仓库,间接集成业务库数据以及WEB利用的OSS日志等,而后在数据仓库中剖析业务数据并产生统计分析后果。初期的架构如下:

依据业务须要,将中小型规模的后果导入MySQL并反对数据更新。数据规模较大的只读后果则导入 MongoDB。
而后Web服务层查问MySQL和MongoDB并向用户提供服务接口, Web服务层也能够通过Lightning减速接口间接查问MaxCompute的数据,

Lightning协定是MaxCompute查问减速服务,反对以PostgreSQL协定及语法连贯拜访MaxCompute数据,相比MaxCompute提供的odps jdbc接口速度要快得多。起因是后者把每次拜访作为一个Map-Reduce解决,即便很小的数据量查问响应工夫也要超过10秒,而 Lightning能将延时降到百毫秒内,满足业务后果报表的展现需要。目前Lightning服务进入服务下线阶段,新的减速服务由Hologres减速集群代替。

应用这套架构能够在较短的工夫内满足报表开发、用户画像和举荐服务等需要,为新东方的日常经营和招生引流提供较好的数据反对。然而随着业务的发展,这套架构越来越难以满足用户的需要,次要体现在:

  1. 实时性,业务心愿可能达到1分钟级甚至秒级的实时性,而应用MaxCompute只能实现批量解决,个别只能提供分钟级(个别5分钟以上)的延时
  2. 来自Web服务层的高并发查问,MaxCompute的大数据量查问只能反对到100左右的QPS,满足不了来自C端利用的高并发查问
  3. 简单逻辑的大数据量剖析和Ad-hoc查问,随着剖析数据迅速从数百G上涨到TB级,在多个数亿行以上的数据进行简单报表开发,单实例MySQL难以反对;而MongoDB无奈应用规范的SQL进行简单查问,同时MongoDB自身简单的查问业务,开发效率很低。
  4. Lightning接口尽管反对规范的SQL并且某些场景上速度比拟快,然而Lightning开始逐步下线,须要找到替换的办法。

实时数仓选型

要解决以上的业务痛点,就须要找到能满足实时数仓建设需要的产品。大数据团队调研了多种实时数仓计划,基于新东方的数据和利用特点进行选型,计划比对如下:

产品 Ad-hoc查问 高并发反对(QPS) SQL反对 TP(交易)反对 与MaxCompute/Flink集成 文档和技术支持
ClickHouse 20.1 反对PB级以上 默认反对100的并发查问,qps取决于单个查问的响应工夫 单表查问反对较好,简单报表查问反对较弱 通过mutation反对update,较弱 反对 文档丰盛,社区反对较好
Doris 0.9 反对PB级以上 数百 兼容MySQL 不反对 通过兼容MySQL与MaxCompute集成,与Flink的集成 不明确 文档和社区都较差
Hologres 1.1 反对PB级以上 数万以上 兼容PostgreSQL DDL反对 与MaxCompute间接在存储层集成,并且都兼容PostgreSQL,提供Flink Connector集成 阿里在线文档和技术支持
Tidb 4.x (含Tiflash) 反对PB级以上 数万以上 兼容MySQL 反对 反对 文档丰盛,社区反对较好
Elastic Search 7.x 反对PB级以上 数万以上 不反对规范SQL 不反对 反对与MaxCompute集成,Flink Connector只反对Source 文档丰盛,社区反对较好

从以上的表格能看出,Tidb和Hologres能够较好地解决新东方在大数据方面面临的问题。然而Tidb须要公有云部署并运维,而MaxCompute部署在私有云,两者在不同的云环境。Hologres是阿里云提供的云原生服务,并与MaxCompute都部署在私有云,且在Pangu文件层严密集成,数据交换效率远高于其余内部零碎,两者都兼容PostgreSQL,从离线数据仓库开发迁徙到实时数据仓库开发难度升高。

基于以上的剖析,抉择Hologres作为实时数仓。

实时数仓建设

实时数仓是在离线数仓的根底上,基于Lambda架构构建,离线和实时同时进行建设。无关Lambda的,参阅:[Lambda architecture]

架构的各组件阐明:
1)数据源:

  • Binlog,即各类利用(B端和C端)的数据库Binlog,对于SQL SERVER的数据库则是CT log;
  • App音讯,即App运行时上报的事件;
  • Web日志/埋点日志,即Web服务器所产生的ngix日志,以及Web app/H5运行时埋点服务产生的日志

2)CDC数据总线(简称CDC)

  • CDC数据总线采集数据源,写入Kafka Topic。对于离线数仓和实时数仓, CDC都是间接交互的数据源/
  • CDC包含Source Connector、Kafka 集群、Sink Connector三局部。 Source Connector 负责从数据源采集数据并写入Kafka集群的Topic,而Sink Connector则将Kafka Topic的数据ETL到指标库,包含实时和离线数仓。
  • CDC易于部署和监控,并提供了简略的数据过滤,老本较低,数据ETL工作尽量采纳CDC。

3)离线数据处理

  • 离线数据处理基于MaxCompute搭建,用于计算全量数据,数据源来自于CDC的实时导入。离线数据通过离线数仓计算(ODS->DWD/DWS→ADS)导入Hologres作为存量数据,一部分离线的DWD/DWS数据也导入Hologres作为维表的存量数据。
  • Flink计算工作会将ADS层后果Sink到MaxCompute, 用于数据备份。

4)实时数据处理
实时数据处理基于阿里云托管的 Flink流式计算引擎。与离线数仓解决固定日期的数据(如T+1)不同,实时数仓解决的是流式数据,从工作启动开始,就始终运行,除非异样终止,否则不会完结。数仓的档次与离线数仓相似,依据实时处理的特点做了简化。如下表所示:

数仓档次 形容 数据载体
ODS层 与数据源表构造类似,数据未通过解决 Kafka Topic/cdc Connector
DWD/DWS层 数据仓库层,依据业务线/主题解决数据,可复用 Kafka Topic
DIM层 维度层 holo 维表,Kafka Topic
ADS层 应用层,面向利用创立,存储处理结果 holo实时后果表,Kafka Topic

5)Hologres 数据查问

数据表名称 形容 数仓档次 数据源
维度数据表 维度建模后的数据表,在实时计算时事实表通过JDBC查问 DIM层 初始化数据来自离线数仓dim 层、CDC、Flink维表计算工作
实时后果表 实时数仓的计算结果表 实时数仓DWS/ADS层 实时数仓的DWS/ADS层计算工作
存量后果表 离线数仓的计算结果表 实时数仓DWS/ADS层 离线数仓的DWS/ADS层计算工作
查问view 合并实时和存量后果,对外提供对立的展现View 实时数仓ADS层 存量后果表
实时后果表
表面 来自MaxCompute的数据表援用 各层次 离线数仓
备份表 备份实时计算一段时间内的数据,用于做数据校验和问题诊断 DWD/DWS层 实时数仓

利用场景

通过新的架构,反对了新东方团体内如下利用场景:

  1. 实时报表查问:为CRM零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时为管理层提供实时流动看板服务,延时秒级,辅助业务决策。
  2. Ad-hoc查问:B端和C端经营人员能够间接通过Hologres定制本人的简单业务查问
  3. 用户轨迹和画像场景:实时处理用户来自B端和C端的数据,生成用户轨迹和标签,为业务疾速决策提供根据。
  4. 举荐零碎和圈选业务:通过Maxcompute训练离线模型,并通过Flink数据调整模型的参数。基于用户的实时轨迹数据圈选出符合条件的用户并推送服务,进一步精细化经营。

应用实际

一个典型的实时工作解决流程如下图所示:

  1. ODS层数据通过CDC数据总线导入MaxCompute, 提供离线计算源数据。 同时也会将数据写入到Hologres,用于做数据验证。 在Hologres中,维表存储全量数据。而其余类型的ODS数据表个别存储工夫>离线的计算周期即可,如离线T+1,则存储2天,无相应的离线计算工作依据验证数据周期而定。
  2. Flink工作读取ODS层数据作为输出,与存储在Hologres中的维表做关联,计算的后果存储到DWD/DWS层的Kafka Topic中,同时将后果写入到Hologres用于数据验证,数据存储工夫与ODS层雷同
  3. Flink工作读取DWD/DWS层数据,与存储在Hologres中的维表做关联, 将结算的后果存储到Hologres。依据利用须要,如果是Lambda架构,存储工夫>离线的计算周期即可,如离线T+1,则存储2天,如果是Kappa架构,保留全副数据, 同时将后果数据写入离线数仓用于离线剖析用(可选)。

上面具体介绍在每一步解决流程中的应用实际与教训优化,以帮忙达到更好的成果。

数据验证

因为实时处理源数据和后果都是动静的,数据验证无奈在工作中进行。能够在Hologres中,对实时数仓的各层落仓后果进行验证。因为实时处理和工夫相干,每一档次的数据都须要带上一个解决工夫戳(Process Time)。在Lambda架构中,将实时后果和离线后果进行比对,假如离线解决周期为T+1, 则实时处理取工夫戳与昨天的数据进行比对,计算出准确率。如果是Kappa架构,须要进行逻辑验证,并与业务人员解决的后果数据进行比对。

全量数据初始化

Kafka Topic个别存储几天内的数据,不能提供全量数据,所以须要从离线数仓进行全量数据初始化,将维表、ADS层后果等导入Hologres。

Hologres维表的Lookup和性能优化

1)Lookup
在Flink计算工作中,流表和Hologres的维度数据表Join,就是Lookup。Lookup须要解决两个问题:

  1. 维表索引:理论处理过程是每条流表的数据,应用Join 条件的列去维表中查问,将后果返回。Hologres的维表的索引须要和Flink SQL的Join key统一。
  2. 维表的提早:因为维表的数据导入是另外的工作(CDC工作或者Flink工作),就会呈现数据不同步的状况,流表数据已到,而关联的维度数据没有入库。

对于问题1, 在创立Hologres的维度表时,须要依据Flink SQL的须要去设置表的各类索引,尤其是Distribution key和Clustering key,使之与Join的关联条件列统一,无关Hologres维表的索引会在前面大节提到。

对于问题2,维表和流表Join中,解决两者数据不同步的问题,通过设置窗口能够解决大部分问题,然而因为watermark触发窗口执行,须要兼顾维表数据提早较多的状况,因此watermark duration设置较高,从而导致了数据处理工作的Latency很高,有时不合乎疾速响应的业务要求,这时能够采纳联结Join,,将双流Join和Lookup联合起来。

维表数据包含两局部: 1. Hologres维表,查问全量数据. 2. 从维表对应的Kafka Topic创立的流表,查问最新的数据。Join时,先取维表对应的流表数据,如果不存在取Hologres维表的数据。

以下是一个例子,t_student(学员表)的流表和t_account(用户表) Join获取学员的user id

combined join
//stream table:stream_uc_account
val streamUcAccount: String =
s"""
CREATE TABLE `stream_t_account` (
`user_id` VARCHAR
,`mobile` VARCHAR
.......(omitted)
,WATERMARK FOR event_time AS event_time - INTERVAL '20' SECOND
) WITH (
 'connector' = 'kafka'
 .......(omitted)
)
""".stripMargin
//dim table:t_account
val odsUcAccount: String =
s"""
CREATE TABLE `t_account` WITH (
'connector' = 'jdbc',
.......(omitted)
) LIKE stream_t_account (excluding ALL)
""".stripMargin
//query sql: combined join
val querySql:String =
s"""
select
   coalesce(stm_acc.user_id,acc.user_id) as user_id
from t_student stu
LEFT JOIN stm_acc
ON stu.stu_id  = stm_acc.student_id
AND stu.modified_time 
BETWEEN stm_acc.modified_time - INTERVAL '5' MINUTE 
AND stm_acc.modified_time + INTERVAL '5' SECOND
LEFT JOIN uc_account FOR SYSTEM_TIME AS OF stu.process_time AS acc
ON stu.stu_id = acc.student_id

2)维表性能的优化
Flink SQL在Lookup时,流表每一条数据到来,会对Join的维表执行一次点查,Join的条件就是查问条件,例如对于流表stm_A和维表dim_B,Join条件为stm_A.id = dim.B.id
当 id=id1的stm_A数据到来时,会产生一条查问: select <field list> from dim_B where id=id1,因为维表查问的频率十分高,所以Join的维表列应该有索引。
Hologres索引包含: distribution key,clustering key,bitmap key,segment key(event timestamp) , 无关索引,能够参考 holo表的创立和索引

留神:维表举荐用Hologres行存表,然而在理论状况中,因为维表还用于adhoc一类的剖析查问业务,所以本实际中大部分维表是列存表,以下实际论断是基于列存表和查问状况设定的,仅供参考,请依据业务状况正当设置。

实际论断1:维表的Join列设置成distribution key
因为以后应用列存作为维度表,维表的列数会影响查问性能,对于同一个维表,8个列和16个列的性能相差50%以上,倡议join用到的列都设置成distribution key,不能多也不能少。如果应用行存表,没有这个限度。

实际论断2:尽可能减少维表的属性列
在利用中,维表可能有多个维度列会被用于Join,例如表T1,有两个维度列F1、F2别离用做和流表A,B的Join条件。依据F1和F2之间的关系,如果F1..F2→1..n,就在F1上创立distribution key, 反过来则在F2上创立,即在粒度较大的维度列上创立distribution key。

实际论断3: 一个维度表有多个维度列并且是Hierarchy时,在粒度较大的列上创立distribution key,并用在Join条件中
如果 F1..F2是多对多的关系,阐明一个维表有两个交织的维度,而不是档次维度(hierarchy)上,须要进行拆分。
查问时,不论Lookup是否必须用到distribution key索引列,都要把distribution key索引放在Join条件里
示例: 维表t1有两个维度列:stu_code和roster_code,distribution key加在stu_code上
流表stm_t2须要 Lookup 维表t1,关联条件是两个表的roster_code雷同

select <field list> from  FROM stm_t2 stm JOIN t1 FOR SYSTEM_TIME AS OF stm.process_time AS dim ON stm.stu_code = dim.stu_code and stm.roster_code = dim.roster_code 

业务价值

通过半年的实时数仓建设,并在团体内宽泛应用。为业务的带来的价值如下:

  1. 为经营人员提供了1分钟级/秒级的实时看板服务和实时报表,业务人员能够及时理解到用户的反馈和业务的过程,从而调整经营策略,进步经营效率
  2. 为C端利用提供了秒级的实时用户画像服务和用户圈选服务,从而能够让举荐零碎及时依据用户反馈调整举荐产品列表,进步用户体验
  3. 开发效率大为进步,开发人员从之前的架构迁徙到Hologres+Flink SQL上后,开发效率进步了1-2倍,学习的梯度也升高了很多。
  4. 运维老本降落,之前须要保护MySQL, MongoDB等异构零碎,而Hologres是云原生服务,无需保护一个运维团队。

作者:陈毓林, 新东方互联网技术核心大数据工程师。在新东方从事多年大数据架构研发,数据集成平台开发,以及业务数据分析等,次要技术畛域包含基于flink的实时计算和优化,kafka相干的数据交换和集成等阿里云的云原生技术。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理