乐趣区

关于数据库:如何基于-Apache-Doris-与-Apache-Flink-快速构建极速易用的实时数仓

随着大数据利用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需要已成为数据利用新常态。随同着实时剖析需要的一直收缩,传统的数据架构面临的老本高、实时性无奈保障、组件繁冗、运维难度低等问题日益凸显。为了适应业务疾速迭代的特点,帮忙企业晋升数据生产和利用的时效性、进一步开掘实时数据价值,实时数仓的构建至关重要。

本文将分享如何基于 Apache Doris 和 Apache Flink 疾速构建一个极速易用的实时数仓,包含数据同步、数据集成、数仓分层、数据更新、性能晋升等方面的具体利用计划,在这之前,咱们先能够先理解一下传统的数据架构如何设计的、又存在哪些痛点问题。

#  实时数仓的需要与挑战

上图所示为传统的数据架构,如果咱们 从数据流的⻆度剖析传统的数据处理架构,会发现从源端采集到的业务数据和日志数据次要会分为实时和离线两条链路:

  • 在实时数据局部,通过 Binlog 的⽅式,将业务数据库中的数据变更(CDC,Change Data Capture)采集到实时数仓。同时,通过 Flume-Kafka-Sink 对日志数据进⾏实时采集。当不同起源的数据都采集到实时存储系统后,便能够基于实时存储系统来构建实时数仓。在实时数仓的外部,咱们依然会恪守传统数仓分层实践,将数据分为 ODS 层、DWD 层、DWS 层、ADS 层以实现最大水平的模型复用。
  • 在离线数据局部,通过 DataX 定时同步的⽅式,批量同步业务库 RDS 中的数据。当不同起源的数据进⼊到离线数仓后,便能够在离线数仓外部,依赖 Spark SQL 或 Hive SQL 对数据进⾏定时解决,拆散出不同层级 (ODS、DWD、ADS 等)的数据,并将这些数据存在⼀个存储介质上,⼀般会采纳如 HDFS 的分布式文件系统或者 S3 对象存储上。通过这样的⽅式,离线数仓便构建起来了。与此同时,为了保障数据的⼀致性,通常须要数据荡涤工作使⽤离线数据对实时数据进⾏荡涤或定期笼罩,保障数据最终的⼀致性。

从技术架构的⻆度对传统数据技术栈进行剖析,咱们同样会发现,为了投合不同场景的需要,往往会采纳多种技术栈,例如在湖仓局部通常应用的是 Hive、Iceberg、Hudi 等数据湖;面向湖上数据的 Ad-hoc 查问个别抉择 Impala 或 Presto;对于 OLAP 场景的多维分析,个别使⽤ Doris 或 Kylin、Druid。除此之外,为应答半结构化数据的剖析需要,例如日志剖析与检索场景,通常会使⽤ ES 进行剖析;面对高并发点查问的 Data Serving 场景会使⽤ HBase;在某些场景下可能还须要对外提供统⼀的数据服务,这时可能会使⽤基于 Presto/Trino 的查问⽹关,对⽤户提供对立查问服务。其中波及到的数据组件有数十种,昂扬的应用老本和组件间兼容、保护及扩大带来的沉重压力成为企业必须要面临的问题。

从上述介绍即可晓得,传统的数据架构存在几个外围的痛点问题:

  • 传统数据架构组件繁多,保护简单,运维难度十分高。
  • 计算、存储和研发老本都较高,与行业降本提效的趋势南辕北辙。
  • 同时保护两套数据仓库(实时数仓和离线数仓)和两套计算(实时数据量和实时计算工作),数据时效性与一致性无奈保障。

在此背景下,咱们亟需⼀个“极速、易用、对立、实时”的数据架构来解决这些问题

  • 极速:更快的查问速度,最大化晋升业务剖析人员的效率;
  • 易用:对于用户侧的应用和运维侧的管控都提供了极简的应用体验;
  • 统⼀:异构数据与剖析场景的对立,半结构化和结构化数据能够统⼀存储,多剖析场景能够对立技术栈;
  • 实时:端到端的高时效性保障,施展实时数据的价值。

#  如何构建极速易用的实时数仓架构

基于以上的需要,咱们采取 Apache Doris 和 Apache Flink 来构建极速易用的实时数仓,具体架构如下图所示。多种数据源的数据通过 Flink CDC 集成或 Flink Job 加⼯解决后,⼊库到 Doris 或者 Hive/Iceberg 等湖仓中,最终基于 Doris 提供统⼀的查问服务。

在数据同步上,通过 Flink CDC 将 RDS 的数据实时同步到 Doris;通过 Routine Load 将 Kafka 等音讯零碎中的数据实时同步到 Doris。在数仓分层上,ODS 层通常抉择应用明细模型构建,DWD 层能够通过 SQL 调度工作对 ODS 数据抽取并获取,DWS 和 ADS 层则能够通过 Rollup 和物化视图进行构建。在数据湖上,Doris ⽀持为 Hive、Iceberg、Hudi 以及 Delta Lake(todo)提供联邦剖析和湖仓减速的能⼒。在数据利用上,Apache Doris 既能够承载批量数据加工解决的需要,也能够承载高吞吐的 Adhoc 和高并发点查问等多种应⽤场景。

#  解决方案

如何实现数据的增量与全量同步

1. 增量及全量数据同步

在全量数据和增量的同步上,咱们采取了 Flink CDC 来实现。其原理非常简单,Flink CDC 实现了基于 Snapshot 的全量数据同步、基于 BinLog 的实时增量数据同步,全量数据同步和增量数据同步能够⾃动切换,因而咱们在数据迁徙的过程中,只须要配置好同步的表即可。当 Flink 工作启动时,优先进⾏历史表的数据同步,同步完后⾃动切换成实时同步。

2. 数据一致性保障

如何保证数据一致性是大家重点关注的问题之一,那么在新架构是如何实现的呢?

数据⼀致性⼀般分为“最多⼀次”、“⾄少⼀次”和“准确⼀次”三种模型。

  • 最多⼀次(At-Most-Once):发送⽅仅发送音讯,不期待任何回复。在这种模型中,数据的⽣产和生产过程中可能呈现数据失落的问题。
  • ⾄少⼀次(At-Least-Once):发送⽅一直重试,直到对⽅收到为⽌。在这个模型中,⽣产和生产过程都可能呈现数据反复。
  • 准确⼀次(Exactly-Once):可能保障音讯只被严格发送⼀次,并且只被严格解决⼀次。这种数据模型可能严格保证数据⽣产和生产过程中的精确⼀致性。
  • Flink CDC 通过 Flink Checkpoint 机制联合 Doris 两阶段提交能够实现端到端的 Exactly Once 语义。具体过程分为四步:
  • 事务开启(Flink Job 启动及 Doris 事务开启):当 Flink 工作启动后,Doris 的 Sink 会发动 Precommit 申请,随后开启写⼊事务。
  • 数据传输(Flink Job 的运⾏和数据传输):在 Flink Job 运⾏过程中,Doris Sink 一直从上游算⼦获取数据,并通过 HTTP Chunked 的⽅式继续将数据传输到 Doris。
  • 事务预提交:当 Flink 开始进⾏ Checkpoint 时,Flink 会发动 Checkpoint 申请,此时 Flink 各个算⼦会进⾏ Barrier 对⻬和快照保留,Doris Sink 收回停⽌ Stream Load 写⼊的申请,并发动⼀个事务提交申请到 Doris。这步实现后,这批数据曾经齐全写⼊ Doris BE 中,但在 BE 没有进⾏数据公布前对⽤户是不可⻅的。
  • 事务提交:当 Flink 的 Checkpoint 实现之后,将告诉各个算⼦,Doris 发动⼀次事务提交到 Doris BE,BE 对此次写⼊的数据进⾏公布,最终实现数据流的写⼊。

综上可知,咱们利用 Flink CDC 联合 Doris 两阶段事务提交保障了数据写入一致性。须要留神的是,在该过程中可能遇到一个问题:如果事务预提交胜利、但 Flink Checkpoint 失败了该怎么办?针对该问题,Doris 外部反对对写⼊数据进⾏回滚(Rollback),从⽽保证数据最终的⼀致性。

3. DDL 和 DML 同步

随着业务的倒退,局部⽤户可能存在 RDS Schema 的变更需要。当 RDS 表构造变更时,⽤户冀望 Flink CDC 岂但可能将数据变动同步到 Doris,也心愿将 RDS 表构造的变更同步到 Doris,⽤户则无需担⼼ RDS 表构造和 Doris 表构造不⼀致的问题。

Light Schema Change

目前,Apache Doris 1.2.0 曾经实现了  Light Schema Change 性能,可满⾜ DDL 同步需要,疾速⽀持 Schema 的变更。

Light Schema Change 的实现原理也比较简单,对数据表的加减列操作,不再须要同步更改数据文件,仅需在 FE 中更新元数据即可,从而实现毫秒级的 Schema Change 操作,且存在导入工作时效率的晋升更为显著。在这个过程中,因为 Light Schema Change 只批改了 FE 的元数据,并没有同步给 BE。因而会产⽣ BE 和 FE Schema 不⼀致的问题。为了解决这种问题,咱们对 BE 的写出流程进⾏了批改,具体蕴含三个⽅⾯。

  • 数据写⼊:FE 会将 Schema 长久化到元数据中,当 FE 发动导⼊工作时,会把最新的 Schema 一起发给 Doris BE,BE 依据最新的 Schema 对数据进⾏写⼊,并与 RowSet 进⾏绑定。将该 Schema 长久化到 RowSet 的元数据中,实现了数据的各⾃解析,解决了写⼊过程中 Schema 不⼀致的问题。
  • 数据读取:FE ⽣成查问打算时,会把最新的 Schema 附在其中⼀起发送给 BE,BE 拿到最新的 Schema 后对数据进⾏读取,解决读取过程中 Schema 发⽣不⼀致的问题。
  • 数据 Compaction:当数据进⾏ Compaction 时,咱们选取须要进⾏ Compaction 的 RowSet 中最新的 Schema 作为之后 RowSet 对应的 Schema,以此解决不同 Schema 上 RowSet 的合并问题。

通过对 Light Schema Change 写出流程的优化后,单个 Schema Chang 从 310 毫秒升高到了 7 毫秒 ,整体性能有 近百倍的晋升,彻底的解决了海量数据的 Schema Change 变动难的问题。

Flink CDC DML 和 DDL 同步

有了 Light Schema Change 的保障,Flink CDC 可能同时⽀持 DML 和 DDL 的数据同步。那么是如何实现的呢?

  • 开启 DDL 变更配置:在 Flink CDC 的 MySQL Source 侧开启同步 MySQL DDL 的变更配置,在 Doris 侧辨认 DDL 的数据变更,并对其进⾏解析。
  • 辨认及校验:当 Doris Sink 发现 DDL 语句后,Doris Sink 会对表构造进⾏验证,验证其是否⽀持 Light Schema Change。
  • 发动 Schema Change:当表构造验证通过后,Doris Sink 发动 Schema Change 申请到 Doris,从⽽实现此次 Schema Change 的变动。

解决了数据同步过程中源数据⼀致性的保障、全量数据和增量数据的同步以及 DDL 数据的变更后,一个残缺的数据同步⽅案就根本造成了。

如何基于 Flink 实现多种数据集成

除了上文中所提及的基于 Flink CDC 进行数据增量 / 全量同步外,咱们还能够基于 Flink Job 和 Doris 来构建多种不同的数据集成形式:

  • 将 MySQL 中两个表的数据同步到 Flink 后,在 Flink 外部进⾏多流 Join 实现数据打宽,后将⼤宽表同步到 Doris 中。
  • 对上游的 Kafka 数据进⾏荡涤,在 Flink Job 实现荡涤后通过 Doris-Sink 写⼊ Doris 中。
  • 将 MySQL 数据和 Kafka 数据在 Flink 外部进⾏多流 Join,将 Join 后的宽表后果写⼊ Doris 中。
  • 在 Doris 侧事后创立宽表,将上游 RDS 中的数据依据 Key 写入,使⽤ Doris 的局部列更新将多列数据别离写⼊到 Doris 的⼤宽表中。

如何抉择数据模型

Apache Doris 针对不同场景,提供了不同的数据模型,别离为聚合模型、主键模型、明细模型。

AGGREGATE 聚合模型

在企业理论业务中有很多须要对数据进行统计和汇总操作的场景,如须要剖析网站和 APP 拜访流量、统计用户的拜访总时长、拜访总次数,或者像厂商须要为广告主提供广告点击的总流量、展现总量、生产统计等指标。在这些不须要召回明细数据的场景,通常能够应用聚合模型,比方上图中须要依据门店 ID 和工夫对每个门店的销售额实时进行统计。

UNIQUE KEY 主键模型

在某些场景下用户对数据更新和数据全局唯一性有去重的需要,通常应用 UNIQUE KEY 模型。在 UNIQUE 模型中,会依据表中的主键进⾏ Upsert 操作:对于已有的主键做 Update 操作,更新 value 列,没有的主键做 Insert 操作,比方图中咱们以订单 id 为惟一主键,对订单上的其余数据(工夫和状态)进行更新。

DUPLICATE 明细模型

在某些多维分析场景下,数据既没有主键,也没有聚合需要,Duplicate 数据模型能够满足这类需要。明细模型次要用于须要保留原始数据的场景,如日志剖析,用户行为剖析等场景。明细模型适宜任意维度的 Ad-hoc 查问。尽管同样无奈利用预聚合的个性,然而不受聚合模型的束缚,能够施展列存模型的劣势(只读取相干列,而不须要读取所有 Key 列)。

如何构建数仓分层

因为数据量级广泛较大,如果间接查问数仓中的原始数据,须要拜访的表数量和底层文件的数量都较多,体现在日常工作中就是 SQL 异样简单、计算耗时增高。而分层要做的就是对原始数据从新做演绎整顿,在不同层级对数据或者指标做不同粒度的形象,通过复用数据模型来简化数据管理压力,利用血缘关系来定位数据链路的异样,同时进一步晋升数据分析的效率。在 Apache Doris 能够通过以下多种思路来构建数据仓库分层:

微批调度

通过 INSERT INTO SELECT 能够将原始表的数据进行解决和过滤并写入到指标表中,这种 SQL 抽取数据的行为个别是以微批模式进行(例如 15 分钟一次的 ETL 计算工作),通常产生在从 ODS 到 DWD 层数据的抽取过程中,因而须要借助内部的调度工具例如 DolphinScheduler 或 Airflow 等来对 ETL SQL 进行调度。

Rollup 与物化视图

物化视图实质是一个事后计算的过程。咱们能够在 Base 表上,创立不同的 Rollup 或者物化视图来对 Base 表进行聚合计算。通常在明细层到汇总层(例如 DWD 层到 DWS 层或从 DWS 层到 ADS 层)的汇聚过程中能够应用物化视图,以此实现指标的高度聚合。同时物化视图的计算是实时进行的,因而站在计算的角度也能够将物化视图了解为一个单表上的实时计算过程。

多表物化视图

Apache Doris 2.0 将实现多表物化视图这一性能,能够将带有 Join 的查问后果固化以供用户间接查问,反对定时主动或手动触发的形式进行全量更新查问后果,将来还将进一步反对更加欠缺的主动增量刷新。基于多表物化视图这一性能的实现,咱们能够做更简单的数据流解决,比方数据源侧有 TableA、TableB、TableC,在多表物化视图的状况下,用户就能够将 TableA 和 TableB 的数据进行实时 Join 计算后物化到 MV1 中。在这个角度上来看,多表物化视图更像一个多流数据实时 Join 的过程。

如何应答数据更新

在实时数据仓库构建的过程中,还须要面临高并发写入和实时更新的挑战。如何在亿级数据中疾速找到须要更新的数据,并对其进⾏更新,⼀直都是⼤数据畛域一直追寻的答案。

1. 高并发数据更新

在 Apache Doris 中通过 Unique Key 模型来满足数据更新的需要,同时通过 MVCC 多版本并发机制来实现数据的读写隔离。当新数据写入时,如果不存在雷同 Key 的数据则会间接写⼊;如果有雷同 Key 的数据则减少版本,此时数据将以多个版本的模式存在。后盾会启动异步的 Compaction 过程对历史版本数据进⾏清理,当⽤户在查问时 Doris 会将最新版本对应的数据返回给⽤户,这种设计解决了海量数据的更新问题。

在 Doris 中提供了 Merge-on-Read 和 Merge-on-Write 两种数据更新模式。

在此咱们以订单数据的写入为例介绍 Merge-on-Read 的数据写入与查问流程,三条订单数据均以 Append 的模式写⼊ Doris 表中:

  • 数据 Insert:首先咱们写入 ID 为 1,2,3 的三条数据;
  • 数据 Update:当咱们将订单 1 的 Cost 更新为 30 时,其实是写⼊⼀条 ID 为 1,Cost 为 30 的新版本数据,数据通过 Append 的模式写⼊ Doris;
  • 数据 Delete:当咱们对订单 2 的数据进⾏删除时,依然通过 Append ⽅式,将数据多版本写⼊ Doris,并将 _DORIS_DELETE_SIGN 字段变为 1,则示意这条数据被删除了。当 Doris 读取数据时,发现最新版本的数据被标记删除,就会将该数据从查问后果中进⾏过滤。

Merge-on-Read 的特点是写⼊速度比拟快,然而在数据读取过程中因为须要进⾏多路归并排序,存在着大量非必要的 CPU 计算资源耗费和 IO 开销。

因而在 1.2.0 版本中,Apache Doris 在原有的 Unique Key 数据模型上减少了 Merge-on-Write 的数据更新模式。Merge-on-Write 兼顾了写入和查问性能。在写⼊的过程中引⼊了 Delete Bitmap 数据结构,使⽤ Delete Bitmap 标记 RowSet 中某⼀⾏是否被删除,为了放弃 Unique Key 原有的语义,Delete Bitmap 也⽀持多版本。另外使⽤了兼顾性能和存储空间的 Row Bitmap,将 Bitmap 中的 MemTable ⼀起存储在 BE 中,每个 Segment 会对应⼀个 Bitmap。

  • 写入流程:
  • DeltaWriter 先将数据 Flush 到磁盘
  • 批量查看所有 Key,在点查过程中通过区间树,查找到对应的 RowSet。
  • 在 RowSet 外部通过 BloomFilter 和 index 进行⾼效查问。

当查问到 Key 对应的 RowSet 后,便会笼罩 RowSet Key 对应的 Bitmap,接着在 Publish 阶段更新 Bitmap,从⽽保障批量点查 Key 和更新 Bitmap 期间不会有新的可⻅ RowSet,以保障 Bitmap 在更新过程中数据的正确性。除此之外,如果某个 Segment 没有被批改,则不会有对应版本的 Bitmap 记录。

  • 查问流程:
  • 当咱们查问某⼀版本数据时,Doris 会从 LRU Cache Delete Bitmap 中查找该版本对应的缓存。
  • 如果缓存不存在,再去 RowSet 中读取对应的 Bitmap。
  • 使⽤ Delete Bitmap 对 RowSet 中的数据进⾏过滤,将后果返回。

该模式不须要在读取的时候通过归并排序来对主键进行去重,这对于高频写入的场景来说,大大减少了查问执行时的额定耗费。此外还可能反对谓词下推,并可能很好利用 Doris 丰盛的索引,在数据 IO 层面就可能进行充沛的数据裁剪,大大减少数据的读取量和计算量,因而在很多场景的查问中都有非常明显的性能晋升。在实在场景的测试中,通过 Merge-on-Write 能够在保障数万 QPS 的高频 Upset 操作的同时实现性能 3-10 倍的晋升。

2. 局部列更新

局部列更新是一个比拟广泛的需要,例如广告业务中须要在不同的工夫点对同一个广告行为(展现、点击、转换等)数据的更新。这时能够通过 Aggregate Key 模型的 replace_if_not_null 实现。具体建表语句如下:

CREATE TABLE IF NOT EXISTS request_log
(
    `session_id` LARGEINT NOT NULL COMMENT "id",

    `imp_time` DATE REPLACE_IF_NOT_NULL COMMENT "展现",  #展现数据更新
    `imp_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `click_time` DATE REPLACE_IF_NOT_NULL COMMENT "点击",# 点击数据更新
    `click_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT "",

    `conv_time` DATE REPLACE_IF_NOT_NULL COMMENT "转化",# 转换数据更新
    `conv_data` VARCHAR(20)  REPLACE_IF_NOT_NULL COMMENT ""
)
AGGREGATE KEY(`session_id`)
DISTRIBUTED BY HASH(`session_id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");

具体更新过程如下:

(1)更新展现数据

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

(2)更新点击数据

mysql> insert into request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query OK, 1 row affected (0.05 sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa', 'status':'VISIBLE', 'txnId':'385642'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | NULL       | NULL       | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

(3)更新转化数据

ysql> insert into request_log(session_id,click_time,click_data)VALUES(1,'2022-12-21','click');
Query OK, 1 row affected (0.03 sec)
{'label':'insert_2649087d8dc046bd_a39d367af1f93ab0', 'status':'VISIBLE', 'txnId':'385667'}

mysql> select * from request_log;
+------------+------------+----------+------------+------------+-----------+-----------+
| session_id | imp_time   | imp_data | click_time | click_data | conv_time | conv_data |
+------------+------------+----------+------------+------------+-----------+-----------+
| 1          | 2022-12-20 | imp      | 2022-12-21 | click      | NULL      | NULL      |
+------------+------------+----------+------------+------------+-----------+-----------+
1 row in set (0.01 sec)

mysql>

同时局部列更新还可用于反对画像场景的宽表列实时更新。

另外值得期待的是 Apache  Doris 的 Unique Key 模型也行将实现局部列更新的性能,能够通过 Apache Doris GitHub 代码仓库及官网,关注新版本或新性能的公布(相干地址可下滑至文章底部获取)。

如何进一步晋升查问性能

1. 智能物化视图

物化视图除了能够作为高度聚合的汇总层外,其更宽泛的定位是减速绝对固定的聚合剖析场景。物化视图是指依据预约义的 SQL 剖析语句执⾏预计算,并将计算结果长久化到另一张对用户通明但有理论存储的表中,在须要同时查问聚合数据和明细数据以及匹配不同前缀索引的场景,命中物化视图时能够取得更快的查问性能。在应用物化视图时须要建⽴ Base 表并基于此建⽴物化视图,同⼀张 Base 表能够构建多个不同的物化视图,从不同的维度进⾏统计。物化视图在查问过程中提供了智能路由抉择的能力,如果数据在物化视图中存在会间接查问物化视图,如果在物化视图中不存在才会查问 Base 表。对于数据写入或更新时,数据会在写入 Base 表的同时写入物化视图,从⽽让 Doris 保障物化视图和 Base 表数据的齐全⼀致性。

智能路由抉择遵循最⼩匹配准则,只有查问的数据集⽐物化视图汇合⼩时,才可能⾛物化视图。如上图所示智能抉择过程包含抉择最优和查问改写两个局部:

抉择最优

  • 在过滤候选集过程中,被执行的 SQL 语句通过 Where 条件进⾏判断,Where 条件为 advertiser=1。由此可⻅,物化视图和 Base 表都有该字段,这时的全集是物化视图和 Base 表。
  • Group By 计算,Group By 字段是 advertiser 和 channel,这两个字段同时在物化视图和 Base 表中。这时过滤的候选集依然是物化视图和 Base 表。
  • 过滤计算函数,⽐如执⾏ count(distinctuser_id),而后对数据进⾏计算,因为 Count Distinct 的字段 user_id 在物化视图和 Base 表中都存在,因而过滤后果仍是物化视图和 Base 表。
  • 抉择最优,通过⼀系列计算,咱们发现查问条件⽆论是 Where、Group By 还是 Agg Function 关联的字段,后果都有 Base 表和物化视图,因而须要进⾏最优抉择。Doris 通过计算发现 Base 表的数据远⼤于物化视图,即物化视图的数据更⼩。

由此过程可⻅,如果通过物化视图进行查问,查问效率更⾼。当咱们找到最优查问打算,就能够进⾏⼦查问改写,将 Count Distinct 改写成 Bitmap,从⽽实现物化视图的智能路由。实现智能路由之后,咱们会将 Doris ⽣成的查问 SQL 发送到 BE 进⾏分布式查问计算。

2. 分辨别桶裁剪

Doris 数据分为两级分区存储,第一层为分区(Partition),目前反对 RANGE 分区和 LIST 分区两种类型, 第二层为 HASH 分桶(Bucket)。咱们能够依照工夫对数据进⾏分区,再依照分桶列将⼀个分区的数据进行 Hash 分到不同的桶⾥。在查问时则能够通过分辨别桶裁剪来疾速定位数据,减速查问性能的同时实现高并发。

3. 索引查问减速

除了分辨别桶裁剪,还能够通过存储层索引来裁剪须要读取的数据量,仅以减速查问:

  • 前缀索引:在排序的根底上疾速定位数据
  • Zone Map 索引:保护列中 min/max/null 信息
  • Bitmap 索引:通过 Bitmap 减速去重、交并查问
  • Bloom Filter 索引:疾速判断元素是否属于汇合;
  • Invert 倒排索引:反对字符串类型的全文检索;

4. 执行层查问减速

同时 Apache Doris 的 MPP 查问框架、向量化执行引擎以及查问优化器也提供了许多性能优化形式,在此仅列出局部、不做具体开展:

  • 算子下推:Limit、谓词过滤等算子下推到存储层;
  • 向量化引擎:基于 SIMD 指令集优化,充沛开释 CPU 计算能力;
  • Join 优化:Bucket Shuffle Join、Colocate Join 以及 Runtime Filter 等;

行业最佳实际

截止目前,Apache Doris 在寰球范畴内企业用户规模已超过 1500 家,广泛应用于数十个行业中。在用户行为剖析、AB 试验平台、日志检索剖析、用户画像剖析、订单剖析等方向均有着丰盛的利用。在此咱们列出了几个基于 Doris 构建实时数据仓库的实在案例作为参考:

第 1 个案例是较为典型的基于 Doris 构建实时数仓,上层数据源来自 RDS 业务库、⽂件零碎数据以及埋点日志数据。在数据接⼊过程中通过 DataX 进⾏离线数据同步以及通过 Flink CDC 进⾏实时数据同步,在 Doris 外部构建不同的数据分层;最初在下层构建不同的数据应⽤,⽐如⾃助报表、⾃助数据抽取、数据⼤屏。除此之外,它还联合了⾃⼰的应⽤平台构建了数据开发与治理平台,实现了源数据管理、数据分析等操作。

应用收益:

  • 业务计算耗时从之前的两⼩时升高到三分钟。
  • 全链路的更新报表的工夫从周级别更新到⼗分钟级别。
  • Doris ⾼度兼容 MySQL,报表迁徙无压力,开发周期从周级别降至⾄天级别。

第 2 个案例是在某经营服务商的利用,其架构是通过 Flink CDC 将 RDS 的数据同步到 Doris 中,同时通过 Routine Load 间接订阅 Kafka 中接入的日志数据,而后在 Doris 外部构建实时数仓。在数据调度时,通过开源 DolphinScheduler 实现数据调度;使⽤ Prometheus+Grafana 进⾏数据监控。

应用收益: 采⽤ Flink+Doris 架构体系后,架构简洁、组件缩小,解决了多架构下的数据的冗余存储,服务器资源节俭了 30%,数据存储磁盘占⽤节俭了 60%,经营老本⼤幅升高。该案例每天在⽤户的业务场景上,⽀持数万次的⽤户的在线查问和剖析。

第 3 个利用是在供应链企业,在过来该企业采取了 Hadoop 体系,应用组件⽐较繁多,有 RDS、HBase、Hive、HDFS、Yarn、Kafka 等多个技术栈,在该架构下,查问性能无奈失去无效疾速的晋升,保护和开发成本始终居高不下。

应用收益: 引入 Doris 之后,将 RDS 的数据通过 Flink CDC 实时同步到 Doris ⾥,服务器资源老本失去了很⼤的升高。数据的查问工夫从 Spark 的 2~5 ⼩时,缩短到⼗分钟,查问效率也⼤⼤晋升。在数据的同步过程中,使⽤了 Flink CDC+MySQL 全量加增量的数据同步⽅式,同时还利⽤ Doris 的 Light Schema Change 个性实时同步 Binlog ⾥的 DDL 表构造变更,实现数据接⼊数仓零开发成本。

#  总结

凭借 Apache Doris 丰盛的剖析性能和 Apache Flink 弱小的实时计算能力,曾经有越来越多的企业抉择基于 Apache Doris 和 Flink 构建极速易用的实时数仓架构,更多案例欢送关注 SelectDB 公众号以及相干技术博客。后续咱们仍会继续晋升 Apache Doris 在实时数据处理场景的能力和性能,包含 Unique 模型上的局部列更新、单表物化视图上的计算加强、主动增量刷新的多表物化视图等,后续研发停顿也将在社区及时同步。在构建实时数据仓库架构中遇到任何问题,欢送分割社区进行反对。同时也欢送退出 Apache Doris 社区,一起将 Apache Doris 建设地更加弱小!

作者介绍:

王磊,SelectDB 资深大数据研发专家、Apache Doris Contributor、阿里云 MVP,具备超 10 年大数据畛域工作教训,对数据治理、数据湖和实时数仓有深刻了解和实际,人气技术畅销书《图解 Spark 大数据疾速剖析实战》、《offer 来了:Java 面试外围知识点精讲(原理篇 & 架构篇)》作者。

# 相干链接:

SelectDB 官网

https://selectdb.com 

Apache Doris 官网

http://doris.apache.org

Apache Doris Github

https://github.com/apache/doris

退出移动版