共计 3099 个字符,预计需要花费 8 分钟才能阅读完成。
本场视频链接:https://developer.aliyun.com/live/1548?spm=a2c6h.12873581.0.0.71671566Xloy3Z&groupCode=apachespark
本场 PPT 资料:https://www.slidestalk.com/AliSpark/SparkRelationalCache2019_57927
本次分享主要分为以下四个方面:
- 项目介绍
- 技术分析
- 如何使用
- 性能分析
一、项目介绍
项目背景
阿里云 EMR 是一个开源大数据解决方案,目前 EMR 上面已经集成了很多开源组件,并且组件数量也在不断的增加中。EMR 下层可以访问各种各样的存储,比如对象存储 OSS、集群内部自建的 HDFS 以及流式数据等。用户可以利用 EMR 处理海量数据和进行快速分析,也能够支持用户在上面做机器学习以及数据清洗等工作。EMR 希望能够支撑非常大的业务数据量,同时也希望能够在数据量不断增长的时候,能够通过集群扩容实现快速数据分析。
云上 Adhoc 数据分析痛点
在云上做 Adhoc 数据分析的时候,很难实现随着数据量的增长使得查询的延迟不会大幅度增加。虽然目前各种引擎不断出现,并且某些引擎在一些场景下运行很快,但是数据量变大之后,查询响应速度难免有所下降,因此希望在比较统一的平台之上获得较好的性能。与此同时,阿里云也希望能够提供云原生的解决方案。Spark 是目前工业界使用较多的计算引擎,应用非常广泛,但是在处理 Adhoc 上还是存在很多不足之处,因此阿里云在 Spark 上做了大量优化,帮助用户满足 Adhoc 查询的需求。因此就会涉及到缓存方案,虽然 Spark 中很早就有了缓存机制,但想要满足云上 Adhoc 场景却存在很多不足之处,因此阿里云会在 Spark 上做大量优化,帮助用户优化 Adhoc 查询速度。但是如果把数据放到内存中,将所有数据全部用作缓存可能也不足够,因此就催生出了 Spark Relational Cache。
Spark Relational Cache
用户的 SQL 请求过来之后,到了 Spark 上面,会需要比较长的时间在数据来源上进行处理,这里下层的存储包括集群的 HDFS 以及远端的 JindoFS 和阿里云 OSS 等。当有了 Spark Relational Cache 之后,查询过来之后会查询是否能够用到存储在 Relational Cache 中缓存的数据,如果不能用到则会转发到原生路径上,如果能用到则会用非常快的速度从缓存里面将数据读取出来并将结果返回给用户。因为 Relational Cache 构建在高效存储之上,通过用户的 DDL 将数据变成 Relational Cache。
Spark Relational Cache 特点
Spark Relational Cache 希望能够达到秒级响应或者亚秒级响应,能够在提交 SQL 之后很快地看到结果。并且也支持很大的数据量,将其存储在持久化的存储上面,同时通过一些匹配手段,增加了匹配的场景。此外,下层存储也使用了高效的存储格式,比如离线分析都会使用的列式存储,并且对于列式存储进行了大量优化。此外,Relational Cache 也是用户透明的特性,用户上来进行查询不需要知道几个表之间的关系,这些都是已经有过缓存的,不需要根据已有的缓存重写 Query,可以直接判断是否有可以使用的 Relational Cache,对于一个厂商而言只需要几个管理员进行维护即可。Spark Relational Cache 支持自动更新,用户不需要担心因为插入了新的数据就使得 Cache 过时导致查询到错误的数据,这里面为用户提供了一些设置的规则,帮助用户去进行更新。此外,Spark Relational Cache 还在研发方面,比如智能推荐方面进行了大量探索,比如根据用户 SQL 的历史可以推荐用户基于怎样的关系去建立 Relational Cache。
二、技术分析
阿里云 EMR 具有很多核心技术,如数据预计算、查询自动匹配以及数据预组织。
数据预计算
数据在很多情况下都有一个模型,雪花模型是传统数据库中非常常见的模型,阿里云 EMR 添加了 Primary Key/Foreign Key 的支持,允许用户通过 Primary Key/Foreign Key 明确表之间的关系,提高匹配成功率。在数据预计算方面,充分利用 EMR Spark 加强的计算能力。此外,还通过 Data Cube 数据立方来支持多维数据分析。
执行计划重写
这部分首先通过数据预计算生成预计算的结果,并将结果存储在外部存储上,比如 OSS、HDFS 以及其他第三方存储中,对于 Spark DataSource 等数据格式都支持,对于 DataLake 等热门的存储格式后续也会添加支持。在传统数据库中有类似的优化方案,比如物化视图方式,而在 Spark 中使用这样的方式就不合适了,将逻辑匹配放在了 Catalyst 逻辑优化器内部来重写逻辑执行计划,判断 Query 能否通过 Relational Cache 实现查询,并基于 Relational Cache 实现进一步的 Join 或者组合。将简化后的逻辑计划转化成为物理计划在物理引擎上执行。依托 EMR Spark 其他的优化方向可以实现非常快速的执行结果,并且通过开关控制执行计划的重写。
自动查询匹配
这里有一个简单的例子,将三个表简单地 Join 在一起,经过过滤条件获得最终的结果。当 Query 过来之后先判断 Spark Relational Cache 是否能够符合需求,进而实现对于预先计算好的结果进行过滤,进而得到最终想要的结果。
数据预组织
如果将数十 T 的数据存在存储里面,那么从这个关系中获取最终的结果还需要不少的时间,因为需要启动不少的 Task 节点,而这些 Task 的调度也需要不少的开销,通过文件索引的方式将时间开销压缩到秒级水平,可以在执行时过滤所需要读取的文件总量,这样大大减少了任务的数量,这样执行的速度就会快很多。因为需要让全局索引变得更加有效,因此最好让数据是排过序的,如果对于结构化数据进行排序就会知道只是对于排列在第一位的 Key 有一个非常好的优化效果,对于排列在后面的 Key 比较困难,因此引入了 ZOrder 排序,使得列举出来的每个列都具有同等的效果。同时将数据存储在分区表里,使用 GroupID 作为分区列。
三、如何使用
DDL
对于简单的 Query,可以指定自动更新的开关,并起一个名字方便后续管理。还可以规定数据 Layout 的形式,并最终通过 SQL 语句来描述关系,后续提供给用户 WebUI 一样的东西,方便用户管理 Relational Cache。
数据更新
Relational Cache 的数据更新主要有两种策略,一种是 On Commit,比如当依赖的数据发生更新的时候,可以将所有需要添加的数据都追加写进去。还有一种默认的 On Demand 形式,用户通过 Refresh 命令手动触发更新,可以在创建的时候指定,也可以在创建之后手工调整。Relational Cache 增量的更新是基于分区实现的,后续会考虑集成一些更加智能的存储格式,来支持行级别的更新。
四、性能分析
Cube 构建
阿里巴巴的 EMR Spark 对于 1T 数据的构建时间只需要 1 小时。
查询性能
在查询性能方面,SSB 平均查询耗时,无 Cache 时查询 时间按 Scale 成比例增加,Cache Cube 后始终保持在亚秒级响应。
相关文章:EMR Spark Relational Cache 利用数据预组织加速查询
阿里云双 11 领亿元补贴,拼手气抽 iPhone 11 Pro、卫衣等好礼,点此参与:http://t.cn/Ai1hLLJT
本文作者:开源大数据 EMR
阅读原文
本文为云栖社区原创内容,未经允许不得转载。