共计 6150 个字符,预计需要花费 16 分钟才能阅读完成。
编者荐语:随着领创团体的疾速倒退,为了满足十亿级数据量的实时报表统计与决策分析,领创集团选择了 Flink + Doris 的实时数仓计划。本篇文章详尽了介绍了此计划的实际过程。
以下文章来源于领创团体 Advance Group,作者苏浩
原文链接:https://mp.weixin.qq.com/s/qg…
业务背景
Advance Intelligence Group(领创团体)成立于 2016 年,是一家以 AI 技术驱动的科技团体,致力于通过科技翻新的本地化利用,革新和重塑金融和批发行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。团体旗下蕴含企业业务和消费者业务两大板块,企业业务蕴含 ADVANCE.AI 和 Ginee,别离为银行、金融、金融科技、批发和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包含亚洲当先的先享后付平台 Atome 和数字金融服务。
2021 年 9 月,领创团体发表实现超 4 亿美元 D 轮融资,融资实现后领创团体估值已超 20 亿美元,成为新加坡最大的独立科技守业公司之一。业务笼罩新加坡、印度尼西亚、中国大陆、印度、越南等 17 个国家与地区,服务了 15 万以上的商户和 2000 万消费者。
随着团体业务的疾速倒退,为满足十亿级数据量的实时报表统计与决策分析,咱们抉择基于 Apache Flink + Apache Doris 构建了实时数仓的零碎计划。
Doris 基本原理
Apache Doris 根本架构非常简单,只有 FE(Frontend)、BE(Backend) 两种角色,不依赖任何内部组件,对部署和运维十分敌对。架构图如下:
FE(Frontend)以 Java 语言为主。
次要性能职责:
- 接管用户连贯申请(MySQL 协定层)
- 元数据存储与治理
- 查问语句的解析与执行打算下发
- 集群管控
FE 次要有有两种角色,一个是 Follower,还有一个 Observer,Leader 是通过选举推选出的非凡 Follower。Follower 次要是用来达到元数据的高可用,保障单节点宕机的状况下,元数据可能实时地在线复原,而不影响整个服务。
BE(Backend)以 C++ 语言为主。
次要性能职责:
- 数据存储与治理
- 查问打算的执行
技术架构
整体数据链路如下图:
- 通过 FlinkCDC 采集 MySQL Binlog 到 Kafka 中的 Topic1
- 开发 Flink 工作生产上述 Binlog 生成相干主题的宽表,写入 Topic2
- 配置 Doris Routine Load 工作,将 Topic2 的数据导入 Doris
利用实际
对于步骤 1 和步骤 2 的实际,“基于 Flink-CDC 数据同步⽅案”的文章中已有阐明,本文将对步骤 3 开展具体的阐明。
建表
因业务数据常常随同有 UPDATE,DELETE 等操作,为了放弃实时数仓的数据粒度与业务库统一,所以抉择 Doris Unique 模型(数据模型在下文有重点介绍)具体建表语句如下:
CREATE TABLE IF NOT EXISTS table_1 | |
(key1 varchar(32), | |
key2 varchar(32), | |
key3 varchar(32), | |
value1 int, | |
value2 varchar(128), | |
value3 Decimal(20, 6), | |
data_deal_datetime DateTime COMMENT '数据处理工夫', | |
data_status INT COMMENT '数据是否删除,1 示意失常,- 1 示意数据曾经删除' | |
) | |
ENGINE=OLAP | |
UNIQUE KEY(`key1`,`key2`,`key3`) | |
COMMENT "xxx" | |
DISTRIBUTED BY HASH(`key2`) BUCKETS 32 | |
PROPERTIES ( | |
"storage_type"="column", | |
"replication_num" = "3", | |
"function_column.sequence_type" = 'DateTime' | |
); |
能够看到,表构造中有两个字段别离是 data_deal_datetime,data_status。
- data_deal_datetime 次要是雷同 key 状况下数据笼罩的判断根据
- data_status 用来兼容业务库对数据的删除操作
数据导入工作
Doris 提供了被动拉取 Kafka 数据的性能,配置如下:
CREATE ROUTINE LOAD database.table1 ON table1 | |
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status), | |
ORDER BY data_deal_datetime | |
PROPERTIES | |
( | |
"desired_concurrent_number"="3", | |
"max_batch_interval" = "10", | |
"max_batch_rows" = "500000", | |
"max_batch_size" = "209715200", | |
"format" = "json", | |
"json_root" = "$.data", | |
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\", | |
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]" | |
)FROM KAFKA | |
( | |
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3", | |
"kafka_topic"="topic_name", | |
"property.group.id"="group_id", | |
"property.kafka_default_offsets"="OFFSET_BEGINNING" | |
); |
导入语句中:
- ORDER BY data_deal_datetime 示意依据 data_deal_datetime 字段去笼罩 key 雷同的数据
- desired_concurrent_number 示意冀望的并发度。
max_batch_interval/max_batch_rows/max_batch_size 这 3 个参数别离示意 :
- 每个子工作最大执行工夫。
- 每个子工作最多读取的行数。
- 每个子工作最多读取的字节数。
工作监控与报警
Doris routine load 如果遇到脏数据会导致工作暂停,所以须要定时监控数据导入工作的状态并且主动复原失败工作。并且将错误信息发至指定的 lark 群。具体脚本如下:
import pymysql #导入 pymysql | |
import requests,json | |
#关上数据库连贯 | |
db= pymysql.connect(host="host",user="user", | |
password="passwd",db="database",port=port) | |
# 应用 cursor() 办法获取操作游标 | |
cur = db.cursor() | |
#1. 查问操作 | |
# 编写 sql 查问语句 | |
sql = "show routine load" | |
cur.execute(sql) #执行 sql 语句 | |
results = cur.fetchall() #获取查问的所有记录 | |
for row in results : | |
name = row[1] | |
state = row[7] | |
if state != 'RUNNING': | |
err_log_urls = row[16] | |
reason_state_changed = row[15] | |
msg = "doris 数据导入工作异样:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n 行将主动复原,请查看错误信息" % (name, state, | |
reason_state_changed, err_log_urls) | |
payload_message = { | |
"msg_type": "text", | |
"content": {"text": msg} | |
} | |
url = 'lark 报警 url' | |
s = json.dumps(payload_message) | |
r = requests.post(url, data=s) | |
cur.execute("resume routine load for" + name) | |
cur.close() | |
db.close() |
当初线上配置的监控 1 分钟执行一次,如果遇到工作暂停,会主动复原导入工作,然而导致工作失败的脏数据会跳过,此时须要人工排查失败起因,修复后从新触发该条数据的导入。
数据模型
Doris 外部表中,次要有 3 种数据模型,别离是 Aggregate、Unique、Duplicate。在介绍数据模型之前,先解释一下 Column:在 Doris 中,Column 能够分为两大类:Key 和 Value。从业务角度看,Key 和 Value 别离对应维度列和指标列。
Aggregate
简略来说,Aggregate 模型就是预聚合模型,相似于 MOLAP,通过提前定义 Key 列及 Value 列的聚合形式,在数据导入的时候曾经将 Key 列雷同的数据依照 value 列的聚合形式聚合在一起,即最终表里 Key 雷同的数据只保留一条,Value 依照相应的规定计算。上面举例说明。
表构造如下:
CREATE TABLE tmp_table_1 | |
(user_id varchar(64) COMMENT "用户 id", | |
channel varchar(64) COMMENT "用户起源渠道", | |
city_code varchar(64) COMMENT "用户所在城市编码", | |
last_visit_date DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最初一次拜访工夫", | |
total_cost BIGINT SUM DEFAULT "0" COMMENT "用户总生产" | |
) | |
ENGINE=OLAP | |
AGGREGATE KEY(user_id, channel, city_code) | |
DISTRIBUTED BY HASH(user_id) BUCKETS 6 | |
PROPERTIES("storage_type"="column","replication_num" = "1"): |
表构造中,Key 列别离是 user_id、channel、city_code,Value 列是 last_visit_date、total_cost,他们的聚合形式别离为 REPLACE、SUM。
当初,向该表中插入一批数据:
insert into tmp_table_1 values('suh_001','JD','001','2022-01-01 00:00:01','57'); | |
insert into tmp_table_1 values('suh_001','JD','001','2022-02-01 00:00:01','76'); | |
insert into tmp_table_1 values('suh_001','JD','001','2022-03-01 00:00:01','107'); |
依照咱们的了解,当初 tmp_table_1 中尽管咱们插入了 3 条数据,然而这 3 条数据的 Key 都是统一的,那么最终表中应该只有一条数据,并且 last_visit_date 的值应为 ”2022-03-01 00:00:01″,total_cost 的值应为 240。上面咱们验证一下:
能够看到,后果与咱们预期⼀致。
Unique 模型
正如本次建设的实时数仓那样,咱们更加关注的是如何保障主键的唯⼀性,即如何取得 Primary Key 唯⼀性束缚。⼤家能够参考上⾯建表的例⼦,在这⾥不再举例说明。
Duplicate 模型
在某些多维分析场景下,数据既没有主键,也没有聚合需要。因而引⼊ Duplicate 数据模型来满⾜这类需要。举例说明。
表构造如下:
CREATE TABLE tmp_table_2 | |
(user_id varchar(64) COMMENT "用户 id", | |
channel varchar(64) COMMENT "用户起源渠道", | |
city_code varchar(64) COMMENT "用户所在城市编码", | |
visit_date DATETIME COMMENT "用户登陆工夫", | |
cost BIGINT COMMENT "用户生产金额" | |
) | |
ENGINE=OLAP | |
DUPLICATE KEY(user_id, channel, city_code) | |
DISTRIBUTED BY HASH(user_id) BUCKETS 6 | |
PROPERTIES("storage_type"="column","replication_num" = "1"); |
插入数据:
insert into tmp_table_2 values('suh_001','JD','001','2022-01-01 00:00:01','57'); | |
insert into tmp_table_2 values('suh_001','JD','001','2022-02-01 00:00:01','76'); | |
insert into tmp_table_2 values('suh_001','JD','001','2022-03-01 00:00:01','107'); |
因为此时数据是 Duplicate 模型,不会进行任何解决,查问应该能查到 3 条数据
数据模型的抉择倡议
因为数据模型在建表时就曾经确定,且无奈批改。所以,抉择一个适合的数据模型十分重要。
Aggregate 模型能够通过预聚合,极大地升高聚合查问时所需扫描的数据量和查问的计算量,非常适合有固定模式的报表类查问场景。然而该模型对 count(*) 查问很不敌对。同时因为固定了 Value 列上的聚合形式,在进行其余类型的聚合查问时,须要思考语意正确性。
Unique 模型针对须要惟一主键束缚的场景,能够保障主键唯一性束缚,然而无奈利用 ROLLUP 等预聚合带来的查问劣势。
Duplicate 适宜任意维度的 Ad-hoc 查问,尽管同样无奈利用预聚合的个性,然而不受聚合模型的束缚,能够施展列存模型的劣势。
总结
Flink + Doris 构建的实时数仓上线后,报表接口相应速度失去了明显提高,单表 10 亿级聚合查问响应速度 TP95 为 0.79 秒,TP99 为 5.03 秒。到目前为止,整套数仓体系已安稳运行 8 个多月。
欢送更多的开源技术爱好者退出 Apache Doris 社区,携手成长,共建社区生态。
SelectDB 是一家开源技术公司,致力于为 Apache Doris 社区提供一个由全职工程师、产品经理和反对工程师组成的团队,凋敝开源社区生态,打造实时剖析型数据库畛域的国内工业界规范。基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为用户和客户提供开箱即用的能力。
相干链接:
SelectDB 官方网站:
https://selectdb.com (We Are Coming Soon)
Apache Doris 官方网站:
http://doris.apache.org
Apache Doris Github:
https://github.com/apache/doris
Apache Doris 开发者邮件组:
dev@doris.apache.org