乐趣区

关于sql:基于MaxCompute-SQL-的半结构化数据处理实践

简介:MaxCompute 作为企业级数据仓库服务,集中存储和治理企业数据资产、面向数据利用解决和剖析数据,将数据转换为业务洞察。通过与阿里云内、内部服务灵便组合,可构建丰盛的数据利用。全托管的数据与剖析解决方案,可简化平台运维、治理投入,晋升面向业务的服务能力,减速价值实现。
本文作者 孔亮 阿里云智能 产品专家

一、MaxCompute 根底介绍

阿里云数据与剖析产品解决方案

MaxCompute 作为企业级数据仓库服务,集中存储和治理企业数据资产、面向数据利用解决和剖析数据,将数据转换为业务洞察。通过与阿里云内、内部服务灵便组合,可构建丰盛的数据利用。全托管的数据与剖析解决方案,可简化平台运维、治理投入,晋升面向业务的服务能力,减速价值实现。

从下图能够看出 MaxCompute 是处于一个外围地位,首先 MaxCompute 是一个数据仓库,而且是一个基于 Serverless 架构超大规模集群产品,具备平安治理能力等企业级能力。之前是偏离线数据处理平台,以后曾经具备 BI 剖析能力的企业级数据仓库。

个别企业的离线数据链路,从数据源蕴含关系型数据库、非结构化存储、大数据存储、音讯队列等等,都能够通过数据集成离线的形式,批的形式,进入到数据仓库中。前端的各种剖析利用,也能够有一些实时的剖析,通过 MaxCompute-Hologres 做查问减速,这是离线的场景。实时链路是从音讯队列数据源,通过 Datahub 数据总线,到实时计算 Flink,对接到实时数仓 Hologres,再对接到前台。两头的数据是通过 Hologres 和 MaxCompute 还有 Flink,做流批一体。MaxCompute 在数据仓库的根底上,扩大了数据库的一些联邦查问能力,包含 Data Lake、Mysql、Hbase 等,数据的集成,包含元数据的同步和查问去同步数据数据湖数据处理的能力。还蕴含基于数据仓库数据之后,人工智能 PAI 机器学习的能力。形成了残缺的大数据底座。在这之上,DataWorks 提供了,一站式的开发治理平台,能够做任务调度、元数据数据数据品质的血统治理、数据开发等能力。

MaxCompute 大数据计算服务简介

MaxCompute(大数据计算服务)是一款多功能、高性能、易于应用的数据仓库服务。

MaxCompute 内建欠缺的企业级平安及治理性能、反对凋谢数据生态,以对立平台满足多应用场景(数据仓库 /BI、数据湖剖析、机器学习)须要,被宽泛用于数据化经营、画像及举荐、智能预测等利用场景。

MaxCompute 底层有对立 Iass 层的存储和计算调度,存储是盘古,调度是伏羲,把存储跟计算资源做了一层封装,用资源池的形式对下层利用无感知的提供应用,下层利用只须要应用这个资源组,而不须要晓得具体的工作运行在什么资源下面。MaxCompute 能够提供结构化数仓的存储能力,也能够提供数据湖包含凋谢格局、半结构化、非结构化等数据处理的能力。对于用户来说,所有应用场景都在我的项目外面,每个我的项目之间租户隔离,能够有本我的项目的资源,也能够共享资源。我的项目间接通过平安共享的形式,能够同步数据。再下层用对立的拜访认证、治理、平安、监控、元数据等能力。

能够总结出以下几点:

  • 简略易用的 SQL 端到端开发方式,反对 Spark,分布式 Python(Mars)等开源技术栈,内置欠缺的企业治理性能和凋谢的接口,简略易用凋谢
  • 对立元数据、对立数据存储,一份对立的企业数据资产,云原生的多租户零碎,最高效的跨业务 & 跨组织数据连贯与共享
  • 自适应的按需弹性资源,精准匹配业务规模的变动,防止资源节约或有余,业务负载隔离,打消业务间资源争抢
  • 存储与计算独立伸缩,反对 TB 到 EB 级的存储扩大;连贯宽泛内部数据源,发展联邦计算
  • 深度优化,主动数仓,集成多年双 11 优化能力,智能调优 + 专家服务反对

MaxCompute 性能介绍

MaxCompute 性能能够分为下图几类。最外围的存储计算能力是不对外开发的。存储次要用的是数据库表,计算资源会在 SQL 工作或者其余计算模型上应用时体现进去。看 SQL 计算能力,能够端到端实现整个数仓的数据处理和数据模型治理等能力,包含一些根底的数据类型,外部表做一些分区,内部表处理非机构化数据等。反对流式写入,流式 upsert 插入数据,删改数据等能力。查问时能够用非常复杂的查询方法,能够看解析打算 Explain。UDF 侧,MaxCompute 反对 Java UDF 和 Python UDF,还包含内容平安 UDF。

治理能力,下图深色局部是在专有云里会有独立的加强包来对企业提供额定的计算能力。公共云上是间接 Serverless 形式提供给用户,只收资源费用。治理能力也蕴含计量计费的能力,包含预付费和按量计费。工作有工作治理,查问有查问减速,专有云部署时,大规模跨域计算等企业级能力。MaxCompute 除了 SQL 引擎外,还有向量检索,TensorFlow,Spark,Mars,Hologres,这些都能够基于 MaxCompute 的底层存储计算资源,用不同的引擎,提供对应场景的能力。合规治理局部,灰色局部有一些是 DataWorks 提供,包含自身数仓平安治理能力,元数据管理能力,审计能力,数据加密,数据脱敏,数据品质等性能。再通过 SDK/API 和 MaxCompute 提供的配套工具来实现数据开发,数据上传下载,还有一些三方利用,二方利用来实现整个数仓的生态构建。

MaxCompute 次要解决方案

企业数据仓库 / 数据中台

将原始数据整合为可被宽泛应用的常识,用于后续生产应用,包含:

•集成存储:收集、存储和集中管理企业内外数据;

•解决剖析:荡涤、加工、整合多方数据;面向业务需要统计、开掘;对立的存储和解决能够提供弹性伸缩的存储计算能力,缩小老本

•标准化:建设企业数据仓库模型(分层 / 分主题),建设数据规范,造成可复用数据资产,并且通过数据治理,进行数据生命周期平安、老本治理等、继续保障数据品质和标准化

•数据互通:在企业内流转共享规范数据,买通数据孤岛,让关联的数据施展更大的价值

数据中台不仅是技术平台,还蕴含组织和治理流程要求,强调以公共数据产品服务业务,实现”数据业务化”,可认为数据仓库的一种最佳实际。MaxCompute+DataWorks 是开箱即用的数据仓库解决方案。

BI 剖析 / 数据分析

BI 剖析并不必然要应用数据仓库,如可间接基于交易数据库剖析

数据仓库可能帮忙提供 BI 剖析须要的企业视角的全面数据

通过数据仓库的数据资产治理,BI 剖析人员可更好地检索、了解数据

数据仓库还可能以弱小的性能,满足多用户并发、剖析不同数据规模需要

MaxCompute 提供数据的集中管理、解决剖析,可间接对接 BI 或者将数据同步到内部剖析型数据库 (OLAP) 进行 BI 剖析

预测剖析 / 智能利用

数据仓库与 AI 集成日益严密

数据仓库为机器学习进行数据加工、数据筹备

机器学习对数据进行模型训练,数据预测,后果可间接固化在数据仓库进行常识共享,如用户画像剖析对客户性别、偏好的预测

MaxCompute 无缝集成 PAI、SparkML,1 个平台无需数据挪动即可在企业数据之上建设基于机器学习的智能利用,如 CTR 预估、个性化举荐

二、MaxCompute 半结构化数据处理

什么是半结构化数据

本文的主题是 MaxCompute 半结构化解决能力,咱们先看一下什么是半结构化数据。

结构化数据,即行数据,存储在数据库里,能够用二维表构造来逻辑表白实现的数据

非结构化数据,包含所有格局的办公文档、文本、图片、XML、HTML、各类报表、图像和音频 / 视频信息等等

半结构化数据,就是介于齐全结构化数据(如关系型数据库、面向对象数据库中的数据)和齐全无构造的数据(如声音、图像文件等)之间的数据,HTML 文档就属于半结构化数据。它个别是自描述的,常常变动的,数据的构造和内容混在一起,个别由一个三元组示意,包含标记、类型和对象的值。

通过数据模型比拟:

  • 结构化数据:关系型(二维表)
  • 半结构化数据:由一个由节点汇合和弧段汇合组成的具根有向图构造。(树、图)
  • 非结构化数据:无

最初从 wiki 带的定义看,半结构化的特点是简单类型构造,易变,须要从自描述构造中提取数据进行计算。

Semi-structured data[1] is a form of structured data that does not obey the tabular structure of data models associated with relational databases or other forms of data tables, but nonetheless contains tags or other markers to separate semantic elements and enforce hierarchies of records and fields within the data. Therefore, it is also known as self-describing structure.

In semi-structured data, the entities belonging to the same class may have different attributes even though they are grouped together, and the attributes’ order is not important.

Semi-structured data are increasingly occurring since the advent of the Internet where full-text documents and databases are not the only forms of data anymore, and different applications need a medium for exchanging information.

半结构化数据利用宽泛,因为:

•简洁、简略、体积小等。

•上手容易,高效。

•跨语言,用于 Web 我的项目的前后端交互接口,配置文件,文件存储等等。挪动端利用的火爆,进一步带动 json 等半结构化数据的应用(json 示例见下图,来源于网络)。这些数据都能够作为数据源,存入数据仓库做剖析。

所以半结构化数据处理的能力是数据仓库的一个典型利用场景。

半结构化数据处理

个别的场景,依照数仓的流程来看,从数据源 -> 数据处理 -> 数据存储,这个阶段次要是半结构化解决的次要环节,因为往下层看,可能数据曾经加工实现,间接面向利用了,半结构化数据体现的就没有那么显著。

这里的半结构化数据处理,有两种做法,一种是把数据同步到一个字段外面,每次利用时用一些简单类型,或者是 json 函数间接提取,就是按需提取,但数据是放到一个字段外面。这种长处是不必思考半结构化数据结构变动。毛病是性能不佳,每次选用适宜的处理函数和办法,开发简单。不论什么样的数据,都是一个大 string 存进去,还十分大,比方 MaxCompute,个别的状况反对 8M,但为了解决这种状况,MaxCompute 也能够开到最大 256M。另一种办法是导入时或者批处理时依照 json 构造拆成一张宽表,再随着 json 构造批改而批改 / 重建表构造。这样做的长处是存储和计算都能失去优化。但毛病是表机构常常批改,批改不便。

MaxCompute 半结构化数据处理

MaxCompute 提供了以下四方面能力解决半结构化数据

提供简单类型反对存储半结构化数据

首先提供了简单数据类型 存储对应的半结构化数据

Schema evolution(表构造演进)对应半结构化数据结构定义的批改

而后提供了 schema evolution 的能力,能够批改表和嵌套列,包含:

  • 删除列
  • 增加列
  • 批改列程序
  • 批改列名
  • 批改列数据类型(兼容类型)
  • 批改简单类型嵌套构造(与批改表构造雷同)

Semi-structured data processing function 用于解决半结构化数据各节点的值

  • MaxCompute SQL 为晋升简单数据类型(ARRAY、MAP、STRUCT)数据的解决能力和效率,减少了大量内建函数,能够应用内建函数对输出的简单数据类型数据进行解决,或通过函数解决输入简单数据类型数据。
  • 同时提供了高阶函数加强简单数据类型数据的解决能力,相较于一般函数的输出参数只能是数据,高阶函数的输出参数自身能够是一个函数。因而高阶函数能够解决输出的简单数据类型数据,并应用 lambda 表达式简化解决逻辑语法表白。

间接应用半结构化数据节点 value 进行计算

CREATE TABLE evol_t2 (id int, name struct<given: string, family: string>,phones array<struct<type: string, num: string>>) ;
insert into table evol_t2 select 1, STRUCT('Bill', 'Gates'), array(STRUCT('work', '1234567890'),STRUCT('cell', '9876543210'));
insert into table evol_t2 select 2, STRUCT('Michael', 'Jordan'), array(STRUCT('work', '1111111111'),STRUCT('cell', '9999999999'));

插入后果如下:

select name.given as firstname,c.phones[1].num as phonenum
from evol_t2 c
where c.phones[1].type = 'cell';

查问后果如下:

MaxCompute 表构造批改

灵便批改表构造,既能够反对半结构化数据源 schema 的变动,也不便数仓建模模型调整,不便对存量表增补、剔除字段,而后把雷同的字段放在一起或批改类型。

语法定义和示例如下:

删除列

ALTER TABLE <table_name> DROP COLUMN <column_name>;
create table if not exists evol_t(id bigint,value1 bigint,value2 bigint);
ALTER TABLE evol_t DROP COLUMN value2;

增加列

ALTER TABLE <table_name> ADD COLUMNS (col_name1 type1[, col_name2 type2...]);
create table if not exists evol_t(id bigint,value1 bigint,value2 bigint);
ALTER TABLE evol_t ADD COLUMNS value3 STRING;

阐明:增加的新列不反对指定程序,默认在最初一列。

批改列程序

ALTER TABLE <table_name>
CHANGE COLUMN <original_column_name> <new_column_name> <column_type> AFTER <column_name>;
create table if not exists evol_t(id bigint,value1 bigint,value2 bigint);
ALTER TABLE evol_t CHANGE COLUMN value2 value3 bigint AFTER id;

阐明:目前不反对 BEFORE 关键词,能够通过 AFTER 实现,如有必要能够在后续性能中减少。

批改列名

ALTER TABLE <table_name> 
CHANGE COLUMN <original_column_name> RENAME TO <new_column_name>;

MaxCompute 简单类型数据结构批改

简单类型数据的各层嵌套列的 schema 也反对灵便批改,嵌套列和表构造一样都能够享受列存优化的性能和间接查问的便捷

CREATE TABLE evol_t (id int, point struct<x: double, y: double>) ;
ALTER TABLE evol_t ADD COLUMNS (points_map map<string, struct<x: double, y: double>>);
ALTER TABLE evol_t ADD COLUMNS (points_arr array<struct<x: double, y: double>>); 

因为所有的嵌套列都当作一张嵌套的表处理和辨认,那么嵌套列也能够取得如下能力:

  • 表的构造能够批改(增、删、改名字、改程序、改类型)
  • 更精密的列存储和压缩
  • 针对数据类型的存储和计算优化
  • 间接用节点值进行计算
  • 更丰盛的函数进行半结构化数据处理

MaxCompute 简单类型数据处理函数

丰盛的简单类型数据处理函数不便间接对半结构化数据进行解决,且更多更易用的函数在一直推出中





MaxCompute 高阶函数反对 lambda 表达式

简单类型数据处理函数中高阶函数包含:

ANY_MATCH、ALL_MATCH、ARRAY_REDUCE、ARRAY_SORT、FILTER、TRANSFORM、ZIP_WITH、MAP_FILTER、MAP_ZIP_WITH、TRANSFORM_KEYS、TRANSFORM_VALUES 函数

反对 lambda 表达式语法,简化了对简单数据类型数据处理的表白。

局部函数的阐明和示例如下:

判断 ARRAY 数组 array(1, 2, -10, 100, -30)中是否有元素满足 x -> x > 3 条件。命令示例如下:-- 返回 true。selectany_match(array(1, 2, -10, 100, -30), x-> x > 3);
将 ARRAY 数组 a 中的元素利用 func 进行过滤,返回一个新的 ARRAY 数组。-- 返回[2, 3]。selectfilter(array(1, 2, 3), x -> x > 1);
将 ARRAY 数组 a 和 b 的元素依照地位,应用 combiner 进行元素级别的合并,返回一个新的 ARRAY 数组。-- 返回[2, 4, 6, NULL]。selectzip_with(array(1,2,3), array(1,2,3,4), (x,y) -> x + y);
将 MAP 对象 input 的元素进行过滤,只保留满足 predicate 条件的元素。-- 返回{-30:100, 20:50}。selectmap_filter(map(10, -20, 20, 50, -30, 100, 21, null), (k, v) -> (k+v) > 10);
对输出 MAP 对象 input 进行变换,放弃 Key 不变,通过 func 计算新的 Value 值。-- 返回{-30:71, 10:-10, 20:NULL}。selecttransform_values(map(10, -20, 20, null, -30, 101), (k, v) -> k + v);

三、实操演示

MaxCompute 半结构化数据处理和 schema evolution

请点击视频查看演示 demo

性能阐明

列举几个罕用的批改简单类型的节点 schema 的命令示例:

-- 给 struct 新增一列
ALTER TABLE evol_t ADD COLUMNS (point.z double);
-- map 的 value 是 struct,新增一列
ALTER TABLE evol_t ADD COLUMNS (points_map.value.z double);
-- array 的元素是 struct,新增一列
ALTER TABLE evol_t ADD COLUMNS (points_arr.element.z double);

-- 示例中的一些用法
-- 增列
ALTER TABLE evol_t2 ADD COLUMNS (phones.element.type2 string);  
-- 删列
ALTER TABLE evol_t2 DROP COLUMNS (phones.element.type); 
-- 改名
ALTER TABLE evol_t2 CHANGE COLUMN phones.type2 phones.type0 string; 
-- 改程序
ALTER TABLE evol_t2 CHANGE phones.num phones.num string AFTER type0; 

Demo 中残缺性能演示脚本

DROP  table evol_t2;

CREATE TABLE evol_t2 (id int, name struct<given: string, family: string>,phones array<struct<type: string, num: string>>) ;
insert into table evol_t2  select 1, STRUCT('Bill', 'Gates'), array(STRUCT('work', '1234567890'),STRUCT('cell', '9876543210'));
insert into table evol_t2  select 2, STRUCT('Michael', 'Jordan'), array(STRUCT('work', '1111111111'),STRUCT('cell', '9999999999'));
select * from evol_t2;

ALTER TABLE evol_t2 ADD COLUMNS (position map<string, struct<x: double, y: double>>);
insert into table evol_t2  select 3, STRUCT('Michael', 'Jackson'), array(STRUCT('work', '1231231231'),STRUCT('cell', '1231231233')),map('p1',struct(1.1,1.2),'p2',struct(1.5,1.3));
select * from evol_t2;

ALTER TABLE evol_t2 ADD COLUMNS (position.value.z double);
insert into table evol_t2  select 4, STRUCT('Ming', 'Yao'), array(STRUCT('work', '5555555555'),STRUCT('cell', '6666666666')),map('p1',struct(5.5,1.0,12.0),'p2',struct(6.5,3.0,8.1));
select * from evol_t2;

ALTER TABLE evol_t2 DROP COLUMNS (phones.element.type); -- 删列
select * from evol_t2;

ALTER TABLE evol_t2 ADD COLUMNS (phones.element.type2 string);  -- 增列
select * from evol_t2;

ALTER TABLE evol_t2 CHANGE COLUMN phones.type2 phones.type0 string; -- 改名
select * from evol_t2;

insert into table evol_t2  select 5, STRUCT('Lei', 'Li'), array(STRUCT('9999999999','work'),STRUCT('8888888888','cell')),map('p1',struct(9.5,6.0,10.0),'p2',struct(5.5,2.0,3.0));
select * from evol_t2;

ALTER TABLE evol_t2 CHANGE phones.num phones.num string AFTER type0; -- 改程序
select * from evol_t2;

select name.given as firstname,c.phones[1].num as phonenum
from evol_t2 c
where c.phones[1].type0 = 'cell';


select c.name.family||c.name.Given,c.phones[1].num,SQRT(POW(position['p2'].x-position['p1'].x,2)+POW(position['p2'].y-position['p1'].y,2)+POW(position['p2'].z-position['p1'].z,2))
from evol_t2 c
where name.given in ('Ming','Lei');

目前反对的数据类型转化关系

近期将会灰度公布批改类型的 feature,具体反对的数据类型转化关系如下:

四、演进方向

  • 持续加强的性能和演进方向
  • 减少更多的简单数据类型处理函数
  • 更多的数据类型兼容转换
  • 自动识别简单类型数据类型的 schema 并存储优化
  • 更灵便的节点值提取和计算
  • 更高性能的列剖析能力
  • Timetravel 对于 schema evolution 和数据批改的版本治理
  • ……

原文链接
本文为阿里云原创内容,未经容许不得转载。

退出移动版