业务介绍
新东方教育科技团体定位于以学生全面成长为外围,以科技为驱动力的综合性教育团体。团体由1993年成立的北京新东方学校发展壮大而来,领有短期培训零碎、基础教育零碎、文化流传零碎等业务。
在互联网大潮中,新东方在IT技术上也一直重构,继续投入大数据建设,研发大数据的相干技术和利用,从而疾速而精准地响应业务需要,并用数据为团体各级领导提供决策依据。新东方的大数据利用次要包含两局部:
- 企业应用端的业务场景(B端):包含交易,教学,人员等数据,数据规模为TB级。数据会被依照不同的条件和学校层级等,造成营收、教学、客服、财产人事等实时报表,为CRM零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时也供各级管理人员理解业务的运行状况,辅助业务决策。
- 互联网间接面向用户场景(C端):次要为招生引流类、云教室等利用,包含网页版,App端,H5等,数据量为PB级。这部分数据记录了用户(学员和潜在用户)在新东方的教学闭环轨迹,C端数据除了生成惯例的经营报表外,还会绘制用户画像,进而开发举荐零碎和圈选等利用,改善C端各种利用的用户体验,进一步精细化经营。
数仓建设和利用痛点
为了满足日益增长的业务需要,团体开始投入数仓建设。在数据仓库建设的初期,以业务驱动为主。通过阿里云的MaxCompute为外围构建数据仓库,间接集成业务库数据以及WEB利用的OSS日志等,而后在数据仓库中剖析业务数据并产生统计分析后果。初期的架构如下:
依据业务须要,将中小型规模的后果导入MySQL并反对数据更新。数据规模较大的只读后果则导入 MongoDB。
而后Web服务层查问MySQL和MongoDB并向用户提供服务接口, Web服务层也能够通过Lightning减速接口间接查问MaxCompute的数据,
Lightning协定是MaxCompute查问减速服务,反对以PostgreSQL协定及语法连贯拜访MaxCompute数据,相比MaxCompute提供的odps jdbc接口速度要快得多。起因是后者把每次拜访作为一个Map-Reduce解决,即便很小的数据量查问响应工夫也要超过10秒,而 Lightning能将延时降到百毫秒内,满足业务后果报表的展现需要。目前Lightning服务进入服务下线阶段,新的减速服务由Hologres减速集群代替。
应用这套架构能够在较短的工夫内满足报表开发、用户画像和举荐服务等需要,为新东方的日常经营和招生引流提供较好的数据反对。然而随着业务的发展,这套架构越来越难以满足用户的需要,次要体现在:
- 实时性,业务心愿可能达到1分钟级甚至秒级的实时性,而应用MaxCompute只能实现批量解决,个别只能提供分钟级(个别5分钟以上)的延时
- 来自Web服务层的高并发查问,MaxCompute的大数据量查问只能反对到100左右的QPS,满足不了来自C端利用的高并发查问
- 简单逻辑的大数据量剖析和Ad-hoc查问,随着剖析数据迅速从数百G上涨到TB级,在多个数亿行以上的数据进行简单报表开发,单实例MySQL难以反对;而MongoDB无奈应用规范的SQL进行简单查问,同时MongoDB自身简单的查问业务,开发效率很低。
- Lightning接口尽管反对规范的SQL并且某些场景上速度比拟快,然而Lightning开始逐步下线,须要找到替换的办法。
实时数仓选型
要解决以上的业务痛点,就须要找到能满足实时数仓建设需要的产品。大数据团队调研了多种实时数仓计划,基于新东方的数据和利用特点进行选型,计划比对如下:
产品 | Ad-hoc查问 | 高并发反对(QPS) | SQL反对 | TP(交易)反对 | 与MaxCompute/Flink集成 | 文档和技术支持 |
---|---|---|---|---|---|---|
ClickHouse 20.1 | 反对PB级以上 | 默认反对100的并发查问,qps取决于单个查问的响应工夫 | 单表查问反对较好,简单报表查问反对较弱 | 通过mutation反对update,较弱 | 反对 | 文档丰盛,社区反对较好 |
Doris 0.9 | 反对PB级以上 | 数百 | 兼容MySQL | 不反对 | 通过兼容MySQL与MaxCompute集成,与Flink的集成 不明确 | 文档和社区都较差 |
Hologres 1.1 | 反对PB级以上 | 数万以上 | 兼容PostgreSQL | DDL反对 | 与MaxCompute间接在存储层集成,并且都兼容PostgreSQL,提供Flink Connector集成 | 阿里在线文档和技术支持 |
Tidb 4.x (含Tiflash) | 反对PB级以上 | 数万以上 | 兼容MySQL | 反对 | 反对 | 文档丰盛,社区反对较好 |
Elastic Search 7.x | 反对PB级以上 | 数万以上 | 不反对规范SQL | 不反对 | 反对与MaxCompute集成,Flink Connector只反对Source | 文档丰盛,社区反对较好 |
从以上的表格能看出,Tidb和Hologres能够较好地解决新东方在大数据方面面临的问题。然而Tidb须要公有云部署并运维,而MaxCompute部署在私有云,两者在不同的云环境。Hologres是阿里云提供的云原生服务,并与MaxCompute都部署在私有云,且在Pangu文件层严密集成,数据交换效率远高于其余内部零碎,两者都兼容PostgreSQL,从离线数据仓库开发迁徙到实时数据仓库开发难度升高。
基于以上的剖析,抉择Hologres作为实时数仓。
实时数仓建设
实时数仓是在离线数仓的根底上,基于Lambda架构构建,离线和实时同时进行建设。无关Lambda的,参阅:[Lambda architecture]
架构的各组件阐明:
1)数据源:
- Binlog,即各类利用(B端和C端)的数据库Binlog,对于SQL SERVER的数据库则是CT log;
- App音讯,即App运行时上报的事件;
- Web日志/埋点日志,即Web服务器所产生的ngix日志,以及Web app/H5运行时埋点服务产生的日志
2)CDC数据总线(简称CDC)
- CDC数据总线采集数据源,写入Kafka Topic。对于离线数仓和实时数仓, CDC都是间接交互的数据源/
- CDC包含Source Connector、Kafka 集群、Sink Connector三局部。 Source Connector 负责从数据源采集数据并写入Kafka集群的Topic,而Sink Connector则将Kafka Topic的数据ETL到指标库,包含实时和离线数仓。
- CDC易于部署和监控,并提供了简略的数据过滤,老本较低,数据ETL工作尽量采纳CDC。
3)离线数据处理
- 离线数据处理基于MaxCompute搭建,用于计算全量数据,数据源来自于CDC的实时导入。离线数据通过离线数仓计算(ODS->DWD/DWS→ADS)导入Hologres作为存量数据,一部分离线的DWD/DWS数据也导入Hologres作为维表的存量数据。
- Flink计算工作会将ADS层后果Sink到MaxCompute, 用于数据备份。
4)实时数据处理
实时数据处理基于阿里云托管的 Flink流式计算引擎。与离线数仓解决固定日期的数据(如T+1)不同,实时数仓解决的是流式数据,从工作启动开始,就始终运行,除非异样终止,否则不会完结。数仓的档次与离线数仓相似,依据实时处理的特点做了简化。如下表所示:
数仓档次 | 形容 | 数据载体 |
---|---|---|
ODS层 | 与数据源表构造类似,数据未通过解决 | Kafka Topic/cdc Connector |
DWD/DWS层 | 数据仓库层,依据业务线/主题解决数据,可复用 | Kafka Topic |
DIM层 | 维度层 | holo 维表,Kafka Topic |
ADS层 | 应用层,面向利用创立,存储处理结果 | holo实时后果表,Kafka Topic |
5)Hologres 数据查问
数据表名称 | 形容 | 数仓档次 | 数据源 |
---|---|---|---|
维度数据表 | 维度建模后的数据表,在实时计算时事实表通过JDBC查问 | DIM层 | 初始化数据来自离线数仓dim 层、CDC、Flink维表计算工作 |
实时后果表 | 实时数仓的计算结果表 | 实时数仓DWS/ADS层 | 实时数仓的DWS/ADS层计算工作 |
存量后果表 | 离线数仓的计算结果表 | 实时数仓DWS/ADS层 | 离线数仓的DWS/ADS层计算工作 |
查问view | 合并实时和存量后果,对外提供对立的展现View | 实时数仓ADS层 | 存量后果表 |
实时后果表 | |||
表面 | 来自MaxCompute的数据表援用 | 各层次 | 离线数仓 |
备份表 | 备份实时计算一段时间内的数据,用于做数据校验和问题诊断 | DWD/DWS层 | 实时数仓 |
利用场景
通过新的架构,反对了新东方团体内如下利用场景:
- 实时报表查问:为CRM零碎的成千上万名业务参谋提供线索和商机的明细报表查问,同时为管理层提供实时流动看板服务,延时秒级,辅助业务决策。
- Ad-hoc查问:B端和C端经营人员能够间接通过Hologres定制本人的简单业务查问
- 用户轨迹和画像场景:实时处理用户来自B端和C端的数据,生成用户轨迹和标签,为业务疾速决策提供根据。
- 举荐零碎和圈选业务:通过Maxcompute训练离线模型,并通过Flink数据调整模型的参数。基于用户的实时轨迹数据圈选出符合条件的用户并推送服务,进一步精细化经营。
应用实际
一个典型的实时工作解决流程如下图所示:
- ODS层数据通过CDC数据总线导入MaxCompute, 提供离线计算源数据。 同时也会将数据写入到Hologres,用于做数据验证。 在Hologres中,维表存储全量数据。而其余类型的ODS数据表个别存储工夫>离线的计算周期即可,如离线T+1,则存储2天,无相应的离线计算工作依据验证数据周期而定。
- Flink工作读取ODS层数据作为输出,与存储在Hologres中的维表做关联,计算的后果存储到DWD/DWS层的Kafka Topic中,同时将后果写入到Hologres用于数据验证,数据存储工夫与ODS层雷同
- Flink工作读取DWD/DWS层数据,与存储在Hologres中的维表做关联, 将结算的后果存储到Hologres。依据利用须要,如果是Lambda架构,存储工夫>离线的计算周期即可,如离线T+1,则存储2天,如果是Kappa架构,保留全副数据, 同时将后果数据写入离线数仓用于离线剖析用(可选)。
上面具体介绍在每一步解决流程中的应用实际与教训优化,以帮忙达到更好的成果。
数据验证
因为实时处理源数据和后果都是动静的,数据验证无奈在工作中进行。能够在Hologres中,对实时数仓的各层落仓后果进行验证。因为实时处理和工夫相干,每一档次的数据都须要带上一个解决工夫戳(Process Time)。在Lambda架构中,将实时后果和离线后果进行比对,假如离线解决周期为T+1, 则实时处理取工夫戳与昨天的数据进行比对,计算出准确率。如果是Kappa架构,须要进行逻辑验证,并与业务人员解决的后果数据进行比对。
全量数据初始化
Kafka Topic个别存储几天内的数据,不能提供全量数据,所以须要从离线数仓进行全量数据初始化,将维表、ADS层后果等导入Hologres。
Hologres维表的Lookup和性能优化
1)Lookup
在Flink计算工作中,流表和Hologres的维度数据表Join,就是Lookup。Lookup须要解决两个问题:
- 维表索引:理论处理过程是每条流表的数据,应用Join 条件的列去维表中查问,将后果返回。Hologres的维表的索引须要和Flink SQL的Join key统一。
- 维表的提早:因为维表的数据导入是另外的工作(CDC工作或者Flink工作),就会呈现数据不同步的状况,流表数据已到,而关联的维度数据没有入库。
对于问题1, 在创立Hologres的维度表时,须要依据Flink SQL的须要去设置表的各类索引,尤其是Distribution key和Clustering key,使之与Join的关联条件列统一,无关Hologres维表的索引会在前面大节提到。
对于问题2,维表和流表Join中,解决两者数据不同步的问题,通过设置窗口能够解决大部分问题,然而因为watermark触发窗口执行,须要兼顾维表数据提早较多的状况,因此watermark duration设置较高,从而导致了数据处理工作的Latency很高,有时不合乎疾速响应的业务要求,这时能够采纳联结Join,,将双流Join和Lookup联合起来。
维表数据包含两局部: 1. Hologres维表,查问全量数据. 2. 从维表对应的Kafka Topic创立的流表,查问最新的数据。Join时,先取维表对应的流表数据,如果不存在取Hologres维表的数据。
以下是一个例子,t_student(学员表)的流表和t_account(用户表) Join获取学员的user id
combined join//stream table:stream_uc_accountval streamUcAccount: String =s"""CREATE TABLE `stream_t_account` (`user_id` VARCHAR,`mobile` VARCHAR.......(omitted),WATERMARK FOR event_time AS event_time - INTERVAL '20' SECOND) WITH ( 'connector' = 'kafka' .......(omitted))""".stripMargin//dim table:t_accountval odsUcAccount: String =s"""CREATE TABLE `t_account` WITH ('connector' = 'jdbc',.......(omitted)) LIKE stream_t_account (excluding ALL)""".stripMargin//query sql: combined joinval querySql:String =s"""select coalesce(stm_acc.user_id,acc.user_id) as user_idfrom t_student stuLEFT JOIN stm_accON stu.stu_id = stm_acc.student_idAND stu.modified_time BETWEEN stm_acc.modified_time - INTERVAL '5' MINUTE AND stm_acc.modified_time + INTERVAL '5' SECONDLEFT JOIN uc_account FOR SYSTEM_TIME AS OF stu.process_time AS accON stu.stu_id = acc.student_id
2)维表性能的优化
Flink SQL在Lookup时,流表每一条数据到来,会对Join的维表执行一次点查,Join的条件就是查问条件,例如对于流表stm_A和维表dim_B,Join条件为stm_A.id = dim.B.id
当 id=id1的stm_A数据到来时,会产生一条查问: select <field list> from dim_B where id=id1,因为维表查问的频率十分高,所以Join的维表列应该有索引。
Hologres索引包含: distribution key,clustering key,bitmap key,segment key(event timestamp) , 无关索引,能够参考 holo表的创立和索引
留神:维表举荐用Hologres行存表,然而在理论状况中,因为维表还用于adhoc一类的剖析查问业务,所以本实际中大部分维表是列存表,以下实际论断是基于列存表和查问状况设定的,仅供参考,请依据业务状况正当设置。
实际论断1:维表的Join列设置成distribution key
因为以后应用列存作为维度表,维表的列数会影响查问性能,对于同一个维表,8个列和16个列的性能相差50%以上,倡议join用到的列都设置成distribution key,不能多也不能少。如果应用行存表,没有这个限度。
实际论断2:尽可能减少维表的属性列
在利用中,维表可能有多个维度列会被用于Join,例如表T1,有两个维度列F1、F2别离用做和流表A,B的Join条件。依据F1和F2之间的关系,如果F1..F2→1..n,就在F1上创立distribution key, 反过来则在F2上创立,即在粒度较大的维度列上创立distribution key。
实际论断3: 一个维度表有多个维度列并且是Hierarchy时,在粒度较大的列上创立distribution key,并用在Join条件中
如果 F1..F2是多对多的关系,阐明一个维表有两个交织的维度,而不是档次维度(hierarchy)上,须要进行拆分。
查问时,不论Lookup是否必须用到distribution key索引列,都要把distribution key索引放在Join条件里
示例: 维表t1有两个维度列:stu_code和roster_code,distribution key加在stu_code上
流表stm_t2须要 Lookup 维表t1,关联条件是两个表的roster_code雷同
select <field list> from FROM stm_t2 stm JOIN t1 FOR SYSTEM_TIME AS OF stm.process_time AS dim ON stm.stu_code = dim.stu_code and stm.roster_code = dim.roster_code
业务价值
通过半年的实时数仓建设,并在团体内宽泛应用。为业务的带来的价值如下:
- 为经营人员提供了1分钟级/秒级的实时看板服务和实时报表,业务人员能够及时理解到用户的反馈和业务的过程,从而调整经营策略,进步经营效率。
- 为C端利用提供了秒级的实时用户画像服务和用户圈选服务,从而能够让举荐零碎及时依据用户反馈调整举荐产品列表,进步用户体验
- 开发效率大为进步,开发人员从之前的架构迁徙到Hologres+Flink SQL上后,开发效率进步了1-2倍,学习的梯度也升高了很多。
- 运维老本降落,之前须要保护MySQL, MongoDB等异构零碎,而Hologres是云原生服务,无需保护一个运维团队。
作者:陈毓林, 新东方互联网技术核心大数据工程师。在新东方从事多年大数据架构研发,数据集成平台开发,以及业务数据分析等,次要技术畛域包含基于flink的实时计算和优化,kafka相干的数据交换和集成等阿里云的云原生技术。