共计 9029 个字符,预计需要花费 23 分钟才能阅读完成。
导读: 随着叮咚买菜业务的倒退,不同的业务场景对数据分析提出了不同的需要,他们心愿引入一款实时 OLAP 数据库,构建一个灵便的多维实时查问和剖析的平台,对立数据的接入和查问计划,解决各业务线对数据高效实时查问和精细化经营的需要。通过调研选型,最终引入 Apache Doris 作为最终的 OLAP 剖析引擎,Doris 作为外围的 OLAP 引擎反对简单地剖析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。
作者 | 叮咚买菜资深数据工程师 韩青
叮咚买菜创建于 2017 年 5 月,是一家专一美妙食物的守业公司。叮咚买菜专一吃的事业,为满足更多人“想吃什么”而致力,通过美妙食材的供给、美妙味道的开发以及美食品牌的孵化,一直为人们提供美好生活的解决方案,致力让更多人吃得陈腐、吃得省心、吃得丰盛、吃得衰弱 …… 以更美妙的舌尖体验,为古代家庭发明美味与幸福感。
业务需要
随着叮咚买菜业务的倒退,不同的业务场景对数据分析提出了不同的需要,这些需要最终被数据看板、实时 Ad-Hoc、行为剖析、B/C 端业务平台和标签平台等零碎利用所承载,为实现这些零碎利用,叮咚大数据心愿引入一款实时 OLAP 数据库,旨在提供一个灵便的多维实时查问和剖析的平台,对立数据的接入和查问计划,解决各业务线对数据高效实时查问和精细化经营的需要。基于上述诉求,咱们心愿所引入的数据库具备以下能力:
- 能够实时高效地剖析和应用数据;
- 能够反对明细数据、汇总数据两种不同的数据查问形式;
- 能够对入库后的数据即席抉择维度和条件查问,实时 / 近实时返回后果
选型和比照
咱们次要比照了 Apache Doris 和 ClickHouse 两款市面上最常见的开源 OLAP 引擎,在选型过程中咱们次要思考以下几个方面:
- 反对规范 SQL,无需投入额定的工夫适应和学习新的 SQL 方言、间接用规范 SQL 即可间接查问,最大化升高应用门槛;
- 反对 Join 操作,不便事实表与维度表进行关联查问,在应答维度更新时具备更高的灵活性、无需对解决后的宽表进行重刷;
- 反对高并发查问,零碎面临多条业务线的同时应用,因而须要有比拟强的并行查问能力,以更好满足业务需要;
- 反对离线和实时导入,可与已有技术栈轻松对接,反对多个数据源或大数据组件的离线和实时导入,以更好适配不同应用场景;
- 反对大数据量的明细数据查问,以满足不同业务用户灵便多变的剖析需要;
通过详尽的技术调研,Apache Doris 各项能力都比拟优异,在咱们的大多数业务场景中都须要明细数据级别的查问、高并发的点查和大数据量的 Join,而这几个方面 Apache Doris 相较于 ClickHouse 均更胜一筹,因而咱们决定应用 Apache Doris 来搭建新的架构体系。
架构体系
在整体架构中,各个组件承载着特定的性能,Elasticsearch 次要负责存储标签零碎的数据,HBase 是实时数仓的维表层,MySQL 用于存储业务零碎的数据存储,Kafka 次要存储实时数据,Spark 次要提供 Ad-Hoc 查问的计算集群服务,而 Apache Doris 作为外围的 OLAP 引擎反对简单地剖析操作、提供多维的数据视图。
- 离线局部: 数据从业务库通过 DataX 导入到数据仓库 ODS 层,通过层层解决输入到 Doris 中,下层 BI 零碎链接到 Doris 进行报表展现。
- 实时局部: 数据通过 Flink 生产 Kafka 中的数据,进行相应的逻辑解决后,输入到 Doris 或者 HDFS 中,供应用层应用。
在数据利用的 OLAP 层中,Doris 利用计划如下图所示:
- 模型创立规范化: 采纳流程审批的形式进行数据建模,依据具体的业务场景来搭建 Duplicate,Unique Key 和 Aggregate 模型,并依照用户提供的数据量设置适合的 Bucket 数目,做好模型归属关系。
- 数据入口的对立: 数据的流入次要有实时和离线两种,实时数据用 Flink 工作从 Kafka 生产数据,逻辑解决流入 Doris;离线数据通过 Broker Load 形式从 Hive 中将数据灌入 Doris 中。
- 服务对外进口的对立: 对外服务次要通过两种形式裸露接口,一种是应用 JDBC 直连,下层系统配置 Doris 集群的 FE 的连贯信息直连 Doris;另一种是业务通过外部的 One API 服务配置业务接口应用 Doris。
- 业务 SQL 的优化治理: 通过采集 Doris FE 的审计日志,以此来对 SQL 的性能进行剖析优化,以及对 Doris 服务进行治理。
利用实际
叮咚买菜目前曾经将 OLAP 引擎对立为 Apache Doris 广泛应用于业务场景中,咱们将 Doris 集群拆分成了四个集群,别离反对外围报表、行为剖析与算法利用、B/C 端业务和实时数据,依据不同业务场景的业务量及数据规模,集群的资源配置也不尽相同。目前总的机器规模达数十台,以行为剖析场景为例,单个集群近 20 个节点、 存储数据量超过百 TB,每日新增数据量达数十亿条 。
接下来分享 Apache Doris 在叮咚买菜常见业务场景中的利用实际及应用教训。
实时数据分析
从下方数仓模型图可知,数据通过 Flink 作业进行逻辑解决,在不同层级 Kafka 中进行流转加工,通过数据汇总层之后,应用层须要一个组件来存储后果数据,该组件个别是从 MySQL 数据库、KV 存储和 OLAP 引擎三者中抉择其一。
思考到咱们的后果数据大多以计算指标数据居多,不足维度数据,因而应用层的组件须要具备高效、低提早的数据 Join 能力。基于以上因素,咱们最终抉择 Apache Doris 作为实时数仓和实时业务的数据应用层,Doris 能够无效升高数据处理延时,进步查问效率。
比方在销量打算我的项目中,该我的项目须要每日实时写入大量预测数据,并且这些数据须要较低时延提供给分析师进行及时比照剖析、批改该预测值,并提供到供应链端。因批改预测值会影响到零碎调拨,所以选用的存储必须是要具备高吞吐低提早个性,Doris 完全符合该要求。从销量打算的整个数据生产及解决链路来看,应用 Doris 后,最慢 2 秒内 能够看到最新的数据。
以后公司曾经有数十个实时业务需要接入到 Doris 中,随着业务的一直倒退,这个数字也在缓缓减少。
B 端业务查问取数
在理论的应用中,通常会有 B 端业务或零碎须要从数据仓库中取数的需要,比方自研 Pylon 零碎(次要用来基于用户偏好的数据查问)会依据 UID 查问用户画像信息。在这种场景下,通常须要进行简单的模型关联,同时要求在秒级或者毫秒级返回查问后果。
- 应用前:咱们最后应用 Spark 的 JDBC 形式来间接查问数据仓库 Hive 表数据,因为寄存用户标签数据的 Hive 表的数据量有几千万体量,通过 Spark JDBC 形式要消耗几分钟能力查出后果,同时还存在 Yarn 调度耗时较高的问题,有时会因为队列资源缓和产生提早,导致一个一般的查问甚至须要十几分钟能力跑出后果,用户的体验度十分不好。
- 应用后:通过咱们对数据链路的革新,将 Hive 的用户标签数据离线灌入 Doris 中,再用同样的 SQL 查问,Doris 的性能在绝大多数场景下比 Spark 要好很多,能够在 秒级别 失去返回后果。
标签零碎
最后咱们的标签数据寄存在 ES 中,然而随着业务的扩大、上游业务越来越多,标签数据规模急速收缩,策略规定一直减少变动,标签零碎遭逢重大的性能瓶颈。
- 聚合和 Join 查问的性能低
- 人群圈选破费工夫近 20 分钟
- ES 导入慢、查问性能低
为解决以上问题,咱们目前正在尝试应用 Doris 来替换 ES,心愿通过 Doris 解决上述问题,抉择 Doris 次要思考以下三点:
1、分布式 Join 大大晋升查问效率
原有商品 ID 和仓库 ID 通过嵌套类型存储在 ES 中,替换为 Doris 存储之后,须要将简单的嵌套类型拆解为两张表来做表级关联,同时能够试用 Doris 的多种分布式的 Join 进步查问得性能。Doris 的分布式 Join 次要有 Shuffle Join、Broadcast Join 和 Colocate Join。
其中 Colocate Join 查问性能是最好的,旨在为某些 Join 查问提供本地性优化,来缩小数据在节点间的传输耗时、减速查问,另外咱们在该场景下根本均为千万级的表。综合来看,Colocate Join 比拟合乎场景与需要,最终决定应用 Colocate Join 形式晋升 Join 性能。
如何应用: 标签数据的应用次要波及到两张大表的 Join,建表时须要设置雷同的 Distributed Key、雷同的 Bucket 数、雷同的正本数,还要将两个表通过 colocate_with
属性划分到一个组 Colocation Group(CG)。
CREATE TABLE `profile_table` (
`pdate` date NULL COMMENT "null",
`product_mongo_id` varchar(4000) NULL COMMENT "商品 ID",
`station_id` varchar(4000) NULL COMMENT "仓 id",
......
) ENGINE=OLAP
UNIQUE KEY(`pdate`,
`product_mongo_id`, `station_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`pdate`)()
DISTRIBUTED BY
HASH(`product_mongo_id`) BUCKETS 7
PROPERTIES ("colocate_with" = "profile_table","in_memory" = "false","storage_format" = "V2")
<!—->
CREATE TABLE
`station_info_table` (`product_mongo_id` varchar(4000) NULL COMMENT "商品 id", `station_id` varchar(4000)NULL
COMMENT "站点 id",
`snapshot` date NULL COMMENT "日期",
`product_id` bigint(20) NULL COMMENT "商品 id", ......)
ENGINE=OLAPUNIQUE KEY(`product_mongo_id`, `station_id`, `snapshot`)
COMMENT "OLAP"
PARTITION BY RANGE(`snapshot`)()
DISTRIBUTED BY
HASH(`product_mongo_id`) BUCKETS 7
PROPERTIES ("colocate_with" = "profile_table","in_memory" = "false","storage_format" = "V2")
比方咱们有这样一条查问语句:
select count(psp.product_mongo_id) from profile_table psp
left join station_info_table psi on psp.product_mongo_id=psi.product_mongo_id and psp.station_id=psi.station_id
where psp.pdate='2023-03-16' and psp.four_category='特色醋' and psp.brand_name='宝鼎天鱼' and psp.weight_unit='ml' and psp.pmg_name='粮油调味组';
通过应用 Colocate Join 形式优化后,能够达到 毫秒级 的查问成果。接下来咱们介绍一下 Colocate Join 的查问性能高的起因有哪些呢?
A. 数据导入时保证数据本地性
Doris 的分区形式如下所示,先依据分区字段 Range 分区,再依据指定的 Distributed Key Hash 分桶。
所以咱们在数据导入时保障本地性的核心思想就是 两次映射,对于 Colocate Tables,咱们保障雷同 Distributed Key 的数据映射到雷同的 Bucket Seq,再保障雷同 Bucket Seq 的 Buckets 映射到雷同的 BE。能够同查看执行打算查看是否应用了 Colocate Join:
对于 HashJoinFragment,因为 Join 的多张表有了数据本地性保障,所以能够去掉 Exchange Node,防止网络传输,将 ScanNode 间接设置为 Hash Join Node 的 Child。
B. 查问调度时保证数据本地性
- 查问调度的指标:一个 Colocate Join 中所有 ScanNode 中所有 Bucket Seq 雷同的 Buckets 被调度到同一个 BE。
- 查问调度的策略:第一个 ScanNode 的 Buckets 随机抉择 BE,其余的 ScanNode 和第一个 ScanNode 保持一致。
C. 数据 Balance 后保证数据本地性
新增一个 Daemon 线程专门解决 Colocate Table 的 Balance,并让失常的 Balance 线程不解决 Colocate Table 的 Balance。失常 Balance 的粒度是 Bucket,然而对于 Colocate Table,必须保障同一个 Colocate Group 下所有 Bucket 的数据本地性,所以 Balance 的单位是 Colocate Group。
2、高效繁难的 array_contains
函数
在做人群圈选时,有以下相似的 Json 构造 [{"K1":"V1","K2":200},{"k1":"v2","k2":300}]
,当配置 k1=v1,k2=200
,只有该 Value 里的 Json 项有一项满足全副条件就会被圈进去,咱们能够借助 Doris 1.2 版本中的 array_contains
数组函数解决,将 Json 转化为 Array 数组解决。
3、Broker Load 减速数据导入效率
Doris Broker Load 是一种高效、稳固的数据导入形式,它能够将数据分成多个分片,而后将每个分片调配给不同的 Broker 节点进行解决,咱们应用 Broker Load 将标签数据从 Hive 导入 Doris 中,无效进步了数据导入的效率和稳定性。
BI 数据看板
咱们商业智能剖析应用的 BI 剖析平台次要是帆软和自研的阿尔法 BI,底层应用 Doris 来存储数据,目前用来做报表的 Doris 表数量已达到了 3000 多张 ,四个 Doris 集群的日 UV 1000+,PV 达到十几万,因为 Doris 能够达到 毫秒级响应速度 、 反对高并发查问,因而单集群的 QPS 能够达到达到 120 次 / 秒,合乎咱们的要求。
OLAP 多维分析
随着业务的增长,咱们在经营的过程中咱们经常有一些疑难:最近三个月哪个品类的下单量最高?变化趋势如何?各个时段人均下单是多少?某个区域,产生购买行为的年龄段散布是怎么的?…… 而想要取得后果,必须依据用户行为日志进行事件剖析。
目前咱们的用户行为数据日均增量为 20 亿 +,高峰期 100 亿 +,为了更好的进行事件剖析,咱们须要保留半年的数据,也就是几千亿的数据量。 咱们应用 Doris 来存储如此宏大的数据量,在应答简单的剖析场景时能够达到 分钟级的响应。在多维分析的过程中,往往也随同着大数据量的简单查问,接下来分享如何应用 Doris 应答:
1、Bitmap 去重
业务应用过程中须要剖析用户参加状况以及沉闷水平,考查进行初始行为后的用户中,有多少人会进行后续行为,这时候个别都是应用留存分析模型实现此类需要。该模型应用中有去重操作,计算周期有某天 / 某周 / 某月 / 最近三个月等,因为每天的埋点数据量都能达到几十亿,高峰期 100 亿,在这个状况下,应用 count(distinct)
性能太差、甚至查问超时(或超过设置的工夫),而如果应用 Bitmap 来能够成倍的缩短查问工夫。
select
event_id,
date,
count(distinct uid) as count
from event
where
dt>='2022-06-01' and dt<'2022-06-06' and event_id in (......) group by event_id, str_to_date(dt,'%Y-%m-%d');
应用 Bitmap 优化 SQL 后
select
event_id,
date,
bitmap_count(uid) as count
from event
where
dt>='2022-06-01' and dt<'2022-06-06' and event_id in (......) group by event_id, str_to_date(dt,'%Y-%m-%d');
应用中须要留神 Bitmap 函数在 Apache Doris 中依然须要先把数据汇聚到一个 FE 节点能力执行计算,并不能充分发挥分布式计算的劣势,在数据量大到肯定的状况下,Bitmap 函数并不能取得比 COUNT(DISTINCT)
更好的性能,上述实例之所以能达到预期后果是因为做了分组计算。
如果解决大数据量的全量去重,在建表时将 Bitmap 列的值依照 Range 划分,不同 Range 的值存储在不同的分桶中,保障了不同分桶的 Bitmap 值是正交的。当查问时,先别离对不同分桶中的正交 Bitmap 进行聚合计算,而后顶层节点间接将聚合计算后的值合并汇总并输入,从而解决顶层单节点计算瓶颈问题。
2、前缀索引和 Bloom Filter 索引
Doris 次要反对两类索引:内建的智能索引(包含前缀索引)和创立的二级索引(包含 Bloom Filter 索引和 Bitmap 倒排索引)。理论应用时咱们会用到前缀索引和 Bloom Filter 索引来进步查问效率。
前缀索引
Aggregate、Unique 和 Duplicate 三种数据模型中,底层的数据存储是依照各自建表语句中 AGGREGATE KEY、UNIQUE KEY 和 DUPLICATE KEY 指定的列进行排序存储的。前缀索引即在排序的根底上实现的一种依据给定前缀列、疾速查问数据的索引形式,实现形式是将一行数据的前 36 个字节作为这行数据的前缀索引,当遇到 VARCHAR 类型时,前缀索引会间接截断。
比方咱们要查问依照日期和 event_id
分组的去重人数,建表语句如下:
CREATE TABLE ubs_event_log_small_event (event_id int(11) NULL COMMENT "事件 id",
dt datetime NOT NULL COMMENT "事件工夫",
uid char(128) NULL COMMENT "用户 id",
dict_id int(11) NULL COMMENT "用户 id 字典值",
os varchar(24) NULL COMMENT "操作系统",
......
dict_id_bitmap bitmap BITMAP_UNION NULL COMMENT "bitmap 用户 id"
) ENGINE=OLAP
AGGREGATE KEY(event_id, dt, uid, dict_id, os, ......)
COMMENT "用户行为事件表"
PARTITION BY RANGE(dt)
()
DISTRIBUTED BY HASH(dt, event_id, uid) BUCKETS 64
SQL 查问的 Where 条件个别遵循建表的 AGGREGATE 模型的 KEY 的程序,这样能够命中 Doris 内置的前缀索引。
SELECT
CONCAT(TO_DATE(dt),
'00:00:00'
) AS tm,
event_id,
BITMAP_UNION_COUNT(dict_id_bitmap) AS UNIQ_1908_1
FROM
kepler.ubs_event_log_small_event
WHERE event_id = 1908 AND
dt >= '2023-03-26'
AND dt < '2023-04-05'
AND
os IN (1, 2)
GROUP BY
1,
2;
Bloom Filter 索引
针对大数据量的事件表进行查问时咱们会设置 bloom_filter_columns
,放慢查问效率:
alter table datasets set("bloom_filter_columns" = "area_name, is_booking, user_source, source_first_order......");
查问语句中 where
条件有以上设置的字段就会命中该索引。
SELECT * FROM datasets WHERE area_name="****" AND is_booking=0
3、物化视图
为了取得更粗粒度的聚合数据,Doris 容许在建表语句创立进去的 Base 表的根底上,创立若干 Rollup 表。
例如上表 ubs_event_log_small_event
,咱们能够对 dt
,event_id
,dict_id_bitmap
建设 Rollup 物化视图,这样 Rollup 只蕴含三列:dt
,event_id
,dict_id_bitmap
。
这时再进行上述查问就会命中这个 Rollup,从而只扫描极少的数据量就能够实现此次聚合查问。
优化教训
Broker Load 导数工作流程化
为了 Doris 应用更加便捷,我司在外部自研的叮咚大数据平台上对整个过程进行流程化;从建模到配置 Broker Load 导数工作再到导数任务调度均进行了调整,具体优化如下所述:
建模过程: 须要用户发动建模流程申请,填写需要内容、具体建模语句、预估数据量大小、数据保留时长、所需相干权限账号等信息,足够残缺的信息能够在审批时取得建模过程中的元数据信息以及抉择更适合的数据模型。
Broker Load 导数工作配置: 为了进步用户应用效率、升高应用门槛,咱们通过 Mapping 映射和自动化配置形式,主动生成导数工作。
导数任务调度: 配置完 Broker Load 导数工作,就能够由用户依据需要配置小时或者天级别的调度,这样整个 Doris 数据导入流程,就能够由用户配置主动实现。
总结与瞻望
Apache Doris 作为叮咚买菜整体架构体系中的外围 OLAP 剖析引擎,不论是作为数仓数据看板类的应用层、还是作为实时数据汇总后果接入层、或是作为 B/C 端服务数据提供方,均可能很好的满足业务需要。除此之外,Doris 使得咱们无需在存储选型上消耗过多工夫,大大缩短了开发的周期;同时,Doris 反对 MySQL 协定和规范 SQL,大大降低内部人员的应用老本和门槛。将来,咱们心愿应用 Doris 撑持外部更多的业务场景,更大范畴了晋升工作效率。咱们也会紧跟社区步调,踊跃应用推出的新版本个性,以更好解决场景需要,晋升业务成果。
最初,非常感谢 SelectDB 团队对咱们在 Doris 应用上的技术支持,祝福 SelectDB 和 Apache Doris 倒退越来越好!