Amazon Elastic MapReduce(Amazon EMR)是Amazon Web Services提供的托管集群平台,用户能够十分不便的应用Amazon EMR搭建起一套集群,用来撑持大数据框架的利用,如Apache Spark,Hive,Flink,Presto等等。因为Amazon EMR具备很好的可配置性和伸缩性,使用者能够灵便的依据本人的需要进行定制,在满足生产需要的同时,减低对基础设施的运维老本。
FreeWheel大数据团队在搭建数据仓库的过程中,在Amazon EMR的应用上积攒了大量的实际和运维教训,本文将从Amazon EMR实际的角度登程,讲述FreeWheel Transformer团队在搭建ETL pipeline的过程中是如何玩转Amazon EMR的,以期抛砖引玉。
后盾回复"FreeWheel",更多精彩内容等着你
想要理解更多亚马逊云科技最新技术公布和实际翻新,敬请关注2021亚马逊云科技中国峰会!点击图片报名吧~
一个典型的Spark on Amazon EMR上集群架构概览
咱们先来理解一下一个典型的Amazon EMR集群是什么样子的。Amazon EMR默认应用了Yarn来治理集群资源。Amazon EMR将node分为了Master,Core和Task三个组(通过Yarn label实现)。
主节点(Master node)
Amazon EMR的Master node用来治理整个集群,集群至多要有一个Master node(从Amazon EMR-5.23开始,如果须要Master node HA,能够抉择实例计数为3,此时依据不同的利用,会有不同的HA计划)。
以运行Hadoop HDFS和Yarn为例,Master上会运行Yarn ResourceManger和HDFS NameNode。在高可用计划下,Yarn ResourceManager会在所有三个主节点上运行,并采纳Active/Standby模式进行工作。如果ResourceManager的主节点产生故障,Amazon EMR将启动主动故障转移过程,具备Standby ResourceManager的Master node将接管所有操作,能够通过yarn rmadmin-getAllServiceState 来获取以后服务状态。对于Yarn ResourceManger HA的具体细节,能够参考ResourceManager HA。
与ResourceManager相似,NodeManager会在三个主节点中的两个节点上运行,别离处于Active和Standby状态。如果Active NameNode的主节点产生故障,Amazon EMR会将启动HDFS故障转移过程。此时 Standby状态的NameNode将变为Active并接管集群中的所有对HDFS的操作。咱们能够通过hdfs haadmin-getAllServiceState命令来获取以后NameNode状态。对于HDFS HA的具体细节,能够参考HDFS HA。
- ResourceManager HA
https://hadoop.apache.org/doc... - HDFS HA
https://hadoop.apache.org/doc...
外围节点(Core node)
Core node为可选组,其上运行了HDFS DataNode。当Core node个数少于4时,Amazon EMR默认会将HDFS的replica设置为1,Core node少于10个时replica为2,其余状况为3。如果须要进行自定义设置,能够批改启动Amazon EMR对hdfs-site.xml中dfs.replication的配置;这里要留神一点,如果咱们在一个曾经启动的Amazon EMR中,在对Core node进行伸缩的时候,会影响到HDFS数据的re-balance,这是一个耗时的操作,不倡议频繁做Core node的scaling。而且对于replica是3的集群,如果将core node的个数缩减为少于3个的话,也将无奈胜利。
工作节点(Task node)
Task node为一般的工作节点。非常适合作为单纯的工作节点应答工作负载须要频繁伸缩的场景,其上运行了NodeManager,在其启动之后,会退出到Yarn的集群治理之中。对于须要频繁scale的场景,仅仅scale Task node是一个比拟理论的计划,效率比拟高。
工作队列(InstanceFleet)
Core node和Task node都能够配置实例队列,实例队列是一个十分实用的性能,尤其是在Spot实例的场景,在一些热门事件产生时,一些热门机型的中断率会变得很高,如果抉择繁多类型实例的话,很容易呈现无奈满足需要,咱们能够在spot advisor上查看以后的实例中断状况。前面章节我会详细描述咱们是如何应用实例队列的。
- spot advisor
https://aws.amazon.com/cn/ec2...
Amazon EMR版本
在创立Amazon EMR时,须要额定留神Amazon EMR的release版本,不同版本反对的利用版本也不同。从Amazon EMR-6.1.0版本起,曾经开始反对Spark3.0.0 + Hadoop 3.2.1的组合了,目前最新的Amazon EMR-6.2.0版本,曾经能够反对稳固版本的Spark3.0.1了。如果须要运行Spark2.x版本,能够抉择Amazon EMR-5.x版本或者Amazon EMR-6.0.0。
除利用版本区别,从Amazon EMR-6.0.0起,零碎基于Amazon Linux 2和Corretto JDK 8构建,绝对于以往的Amazon Linux 1 最大的区别是提供了新的零碎工具,如Systemctl,以及优化的Amazon linux LTS内核。另外Amazon Corretto JDK 8提供通过 Java SE 认证的兼容 JDK,包含长期反对、性能加强和平安修复程序。对于Amazon emr-6.x的release note,能够参考Amazon EMR release note。
另外,Amazon Web Services最新曾经反对Amazon EMR on Amazon EKS, 咱们能够更灵便的将Amazon EMR创立在Amazon EKS中,以期更灵便应用和更低的费用,目前这一块咱们团队正在调研和adoption,我会在和后续的文章中专门来聊这一块。
- Amazon EMR release note
https://docs.aws.amazon.com/e...
Amazon EMR在FreeWheel批处理ETL pipeline中的实际
在Freewheel批处理ETL pipeline中有两个重要的组件别离叫Optimus和JetFire,就是大家耳熟能详的擎天柱和天火,是由我司FreeWheel建设的一套基于Spark的数据建模和解决框架(所以我司FreeWheel Transformer team的产品都以Transformer中的角色来命名)。Optimus次要针对的是数据仓库层的建设,次要性能是将广告零碎产生的log经前端模块收集起来后,依据商业逻辑的需要进行抽取转换,对立建模,并做了大量业务的enrichment,将原始数据转换成不便上游利用端进行剖析应用的Context Data,最终由Spark SQL生成宽表落盘。JetFire更偏差于一个灵便通用的基于Spark的ETL Framework,能让用户更简略不便的将基于宽表之上的数据加工需要进行疾速实现。这些pipelines都是由Airflow进行工作的编排和调度。
工作特点
基于Optimus的批处理ETL pipeline
- ETL pipeline每小时一个batch,因为客户散布在寰球各个时区,所以数据的公布需要是依照客户所在时区的零点之后公布以后时区客户前24小时的数据;另外有些产品也须要反对每个小时的数据都不能提早,每个小时的数据处理须要管制在30min以内;
- 数据量不稳固;客户在不同时区散布的密度,各个小时的流量也不同,以及区域性事件的产生,以及外部上游模块可能会产生delay等都会造成每小时数据量不平均。尽管每天24个小时的数据量散布尽管大抵趋势雷同,然而不能精确预测,只有将要开始解决这个batch的时候,能力从上游拿到这个batch的数据量信息;
- 数据在ETL的两头过程中在HDFS上没有长久化需要;对于HDFS的需要是撑持Spark工作以及Amazon EMR集群的拜访,保障一次批工作外部的事务性即可。须要长久化的数据会由后续的模块load到clickhouse,以及同步公布到Amazon S3上交由hive来治理;
- Spark工作对于集群资源的需要:Optimus中因为存在大量的计算(如数据序列化反序列化,metric的计算,数据的排序聚合,Hyperloglog计算等)和缓存(分层建模中,在DAG中被重复用到的数据),Spark工作在不同阶段对集群资源的需要点是不同的:从数据load进内存到对数据进行transform进行建模的过程,是计算密集型的,须要耗费大量的CPU,同时因为有些dataframe须要被更下层的模型复用,须要cache起来,这里须要大量的memory;而最终在撑持大量并发SparkSQL的数据抽取和聚合的运算中,网络和CPU都是很大性能瓶颈。咱们针对Spark利用做了十分精密的调优工作,具体能够参考这些文章Spark实际一,Spark实际二。在解决一个batch的过程中,集群资源应用状况能够对应比拟上面三个图。
- Spark实际一
https://mp.weixin.qq.com/s/Mc... - Spark实际二
https://mp.weixin.qq.com/s/E1...
基于JetFire的DataFeed pipeline
- Datafeed中的工作具备不同的schedule,不同的input数据量,提交到Amazon EMR集群的工作负载无奈提前预知。这种场景下, Amazon EMR更像是一个共享的计算平台,咱们很难从独自一个利用的角度对整体资源进行布局。
- 思考到基于Optimus的ETL pipeline是泛滥上游利用的独特依赖,须要保障很高的稳定性,咱们冀望可能在独占一个Amazon EMR集群的前提下尽量升高开销。而Datafeed pipeline工作系统且多,绝大部分工作轻量须要较快实现。
咱们来各个击破上述需要
Amazon EMR集群配置
基于前文对Amazon EMR集群的介绍以及理论利用需要,总的来说,咱们应用了在Long term的Amazon EMR集群,运行单点Ondemand机型的Master node + 单点Ondemand机型的Core node + 动静伸缩的由Spot&Ondemand机型的InstanceFleet组成的Task node。
Long term Amazon EMR VS 长期Amazon EMR集群
咱们不抉择在每次须要Amazon EMR集群的时候去从新创立一个集群的起因是,除了机器实例provision的工夫外,Amazon EMR还须要额定执行较长时间的bootstraping脚本去启动很多服务能力让整个集群ready。而Master node和Core node咱们在上文介绍过,相较于Task node,他们不仅须要撑持Hadoop服务,还须要下载并配置Spark, Ganglia等环境,以及更多服务(依据用户勾选的application决定),这样频繁创立和销毁带来的工夫开销绝对于hourly schedule的工作而言,是不能疏忽的。所以咱们抉择创立保护一个long term Amazon EMR集群,通过scale worker node来管制负载的计划。而对于应用距离较长,如daily甚至weekly job 来讲,在每次须要应用Amazon EMR的时候进行长期创立是一个更正当的计划。
抉择Long term之后咱们就须要关注集群的稳定性,然而咱们依然抉择了没有HA的计划,因为Master和Core均为Ondemand机型,这样就能够保障集群在非极其非凡状况下不会呈现crash的状况。而对于极其非凡状况,思考到两个pipeline对数据长久化并无需要,如果能在分钟级别的工夫内重建Amazon EMR集群并复原工作,绝对于长期存在的HA计划的master + core计划来说,在都能满足生产需要的状况下ROI更高。所以综合思考,咱们抉择了单点master和单点core node的计划,配合上Terraform脚本和Airflow的调度,咱们能够在产生小概率集群事变失去状况下疾速重建Amazon EMR集群并重跑工作,从而复原pipeline。
额定值得一提的是,在Amazon EMR应用初期,Terraform上并不反对创立具备InstanceFleet的Amazon EMR,过后仅可能应用命令行或者应用boto3的库创立一个Amazon EMR集群,因为没有state文件进行track,所以也无奈通过Terraform进行对立治理(如销毁和批改等),针对Amazon EMR这部分只能本人实现一套stateful的治理脚本。目前最新版本的Terraform曾经退出了对带有InstanceFleet的Amazon EMR的反对。然而理论应用当中,对于应用InstanceFleet的Amazon EMR集群,很多配置只能在创立时指定,如Master&Core&Task的机型配置,InstanceFleet的机型抉择,Hadoop和Spark的一些customize的配置(这个仍然能够通过启动之后批改对应的xml或者conf文件,而后重启对应服务进行批改,然而这种做法对于产品环境并不敌对),Security group等,如果须要批改这些,依然须要销毁集群,用新的配置从新创立。须要留神的是,对于应用InstanceGroup的集群是能够通过reconfiguration的形式批改。
- reconfiguration的形式
https://docs.aws.amazon.com/e...
对于Spot机型和AZ的抉择
思考到老本,咱们会优先应用Spot机型作为Task node,应用Ondemand机型做补充。Spot机型在资源紧俏的状况下有可能申请不到,即便申请到也有可能会呈现中断被发出的状况,咱们会在创立Amazon EMR时将AZ设置为多个,以减少胜利申请到机器的概率。不过多AZ的配置仅在创立Amazon EMR时失效,Amazon EMR会抉择创立时资源较为拮据的AZ申请instances,而在Amazon EMR创立好之后,就只能在一个固定的AZ中应用了,后续即便这个AZ机器资源紧俏而其余可选的AZ资源富余,因为集群Master和Core曾经在以后AZ存在,不能再到其余AZ申请机器。思考到Spark集群通信开销,跨AZ带来对性能的影响不能疏忽,这种设计也是很正当的。因而,多AZ的设置对于每次须要从新创立Amazon EMR的场景是一个十分有用的性能,而对于long term集群的来说就无限了。在创立long term的集群之后,为了升高Spot机型带来的影响,就须要应用InstanceFleet配合 Spot和ondemand混合应用来升高机器了,这一部分我会在Task node伸缩策略外面详细描述。
Master和Core node机型抉择
在Master和Core node的机型抉择上,因为Core node须要运行HDFS Datanode,咱们冀望在运行Spark工作时在存储这里有更高的吞吐和IOPS,所以咱们抉择了存储优化型的i系列。另外因为Spark的driver executor默认会运行在core node上,对于一些须要Spark中须要应用到并发的代码块,driver的CPU core的数量决定了并发的下限,所以这里须要按利用需要抉择适合的CPU机型。这里须要提及的一点,从Amazon EMR 6.x版本开始,默认状况下禁用的Yarn label的性能,然而思考到core node为ondemand机型,而worker node为spot机型并且会频繁伸缩,运行在ondemand的core node上会更加稳固, 所以咱们依然抉择开启Yarn label,并让Driver executor运行在Core node之上。
能够通过:
yarn.node-labels.enabled: true
yarn.node-labels.am.default-node-label-expression: ‘CORE’
来开启这项配置。
对于Master node,除了NameNode的内存开销外,就是Spark historyServer和ResourceManager的内存开销,绝对worker node来讲,资源并不是很缓和,所以适当抉择就好。
Task node伸缩策略
为了满足对不同hourly数据量的解决可能在限定工夫内实现,须要依据以后小时input的数据量进行集群capacity的调整,在对Task node的伸缩过程中,咱们思考了以下方面。
InstanceFleet的应用
出于减低老本的思考,咱们会优先选择Spot机型来代替onDemand机型,然而在遇到机型紧俏,有大型流动的时候,热门机型就会供不应求,甚至会呈现频繁的被发出的状况,对于集群中有节点被回收的状况,Spark工作尽管能够handle,并在resource可行的状况下将那一部分数据shuffle到其余可用的worker上进行调度解决,然而数据的搬移以及DAG后续task的数据歪斜带来的性能的降落,甚至是资源有余导致的工作最终失败,可能会导致spark工作运行更长的工夫,从而可能引发更大的spot发出的机会。因而,咱们采纳了InstanceFleet来升高对繁多机型的依赖,InstanceFleet能够针对Amazon EMR的Master,Core和Task node别离设置混合机型和lifeCycle类型,以期可能在实例资源缓和的状况下满足指标容量。
一个InstanceFleet目前能够最大反对配置15种机型,目前罕用的几类能够用来运行Spark的机型有C型,R型和M型,C型更偏向于计算密集型的工作。除了C型的CPU主频更高以外,几类机型的次要区别在于vCPU/memory的比例,R型更适宜内存应用比拟高的工作,C型比拟适宜计算类型的工作,而M型比拟折中。
不同类型的instance capacity不同,在InstanceFleet中能够依据容量设置不同的unit值,unit值默认等于vCore的数量。理论应用中,咱们有两种做法:
- 一种是时候用默认,如12xlarge是48个unit,那么c5.24xlarge就是96个,这样在预估好了集群所需资源之后,能够间接依据所需CPU资源转换成unit个数就能够申请了,这种状况适宜于配置到InstanceFleet的机型都具备一样的CPU/memory比例,或者是局部机型的MEMORY比例大于目标值,且违心承当一部分的memory资源节约。如果选入了memory比例小的机型就可能会因为该种机型memory有余,无奈无奈依照预期申请到足够的executor,导致spark application因为资源不够停留在accept阶段;
- 另一种计划是依据Spark利用的executor配置资源,依据预期所选的机型上能够启动的executor最大个数的最大公约数作为一个unit,这种状况的限度在于集群的申请和spark利用的配置绑定了起来,尽管能够更高效的应用集群资源,然而一旦利用配置变动,就须要重新部署集群,定制化程度较高,有利有弊。
在long term Amazon EMR的运维期间,咱们有时会遇到须要批改Amazon EBS存储的需要,对于Master和Core node(须要是不依赖NVMe SSD的机型,如非i3系列),咱们能够借助Elastic volumes的性能来实现Amazon EBS的动静伸缩,具体介绍能够参考这篇文章Dynamically scale up storage on Amazon EMR clusters。然而对于Task node须要频繁scaling的场景,咱们应用动静伸缩Amazon EBS的计划就得失相当了,这种场景下如果Amazon EMR可能反对批改InstanceFleet的配置,并在下次scaling的时候失效,是最不便的方法。目前为了满足应用InstanceFleet的Task node的配置批改,咱们只能采纳重建集群。
- Dynamically scale up storage on Amazon EMR clusters
https://aws.amazon.com/cn/blo...
InstanceFleet目前还存在一些限度,如:
咱们无奈被动设置不同instance type的provision优先级。在一些场景下,咱们冀望退出更多的机型备选,然而其中的某些实例类型咱们仅心愿在主力机型有余的状况下作为补充机型退出,这种诉求目前是无奈实现的;官网给出的解释是,其外部会有算法来计算咱们须要的units,会联合总体开销和实例资源状况综合思考进行机器的provision。
另外就是在Amazon console上查看InstanceFleet的状态时,无奈高效的找到以后处于某种状态的机器,因为fleet会保留历史上1000个机器的记录,当超过1000个历史机器时,就会抛弃掉最早的记录,始终保持然而页面上须要一直刷新能力获取残缺的list,此时对于在WEB上进行直观调试的用户而言不太敌对,而对于应用amazon cli的用户来说,咱们能够通过list fleet并filter某种状态的instance来获取本人想要的后果。
上面是应用的InstanceFleet的配置状况:
集群伸缩策略
对于不同的利用场景,咱们目前应用了两种伸缩策略,一种是由任务调度端依据工作状况进行被动scale,一种是通过监控集群状态由Amazon EMR进行被动的scale。
被动伸缩的形式
对于基于Optimus的ETL pipeline来说,对于每个batch,Spark须要多少resource进行计算,咱们能够通过历史数据进行拟合,找出数据量和所需资源的关系,并通过一直反馈,能够越来越精确的预计出对于给定量的数据集所需的集群资源,因为Optimus运行的Amazon EMR集群是dedicated的,所以咱们就能够在提交Spark工作之前就去精准的scale集群达到目标capacity。下图是Task node伸缩在Airflow 调度下的workflow。
被动伸缩的形式
Datafeed pipeline,实际上是多条不同schedule距离,不同资源需要,以及不同工作SLA需要的工作汇合,因为每个工作之间互相独立,此时Amazon EMR相当于一个共享的计算资源池,很难从每个工作schedule的维度去治理Amazon EMR集群的capacity。所以咱们采纳了Amazon EMR managed scaling。启动了此项性能之后,Amazon EMR会继续评估集群指标,做出扩大决策,动静的进行扩容。此性能实用于由实例组或实例队列组成的集群。另外咱们在Yarn上设置了不同的queue,以期可能将不同资源需要和优先级的工作做粗粒度的隔离,联合上Yarn的capacity scheduler,让整体集群资源尽量正当应用。在理论应用中,咱们屡次遇到了无奈主动伸缩的状况,此时须要手动触发一次伸缩,之后就能够复原主动了,目前起因不详。
- Amazon EMR managed scaling
https://docs.aws.amazon.com/z...
以上采纳的两种计划各有利弊,对于第一种计划,更实用于一个dedicated集群用于提交齐全可预知capacity的场景,这种状况下咱们在提交工作之前就能够被动的将集群size设置成咱们想要的capacity,疾速而精确;而对于第二种场景,非常适合用于Amazon EMR作为一个共享的计算平台,利用端作为繁多的工作提交者无奈获取以后及将来提交的工作全貌,也就无奈计算Amazon EMR所需裁减的capacity,这种状况下Amazon EMR集群须要在工作提交之后依据一些集群metrics能力进行动静调整伸缩,在时效性上会有提早,对于一些DAG较为简单,两头步骤较多且对shuffle和数据歪斜敏感的Spark利用来讲也不敌对。
针对以上case,Transformer团队正在开发一套运行与Amazon EMR和Yarn之上的基于利用和策略角度运维的Framework – Cybertron。咱们冀望能通过这个服务能够站在更全局的角度去正当的治理多个Amazon EMR集群资源,可能让利用端不去关注Amazon EMR集群的资源状况,综合Yarn的scheduler和Queue等的配置,对集群资源和任务调度能有一个更高视角的管制。
Spot和Ondemand机型的混用
理论利用中,即便咱们抉择了InstanceFleet,也会因为极其的状况导致无奈配齐资源,此时咱们不得不用Ondemand机型来补足缺口,如下面的流程图所示,当期待集群scale肯定工夫之后,如果仍然无奈配齐需要指标,咱们就会申请ondemand机型用来补足,在InstanceFleet的场景下,ondemand类型的机型是和Spot机型范畴是一样的。值得注意的一点是,尽管Amazon EMR中能够配置Provisioning timeout的action是After xx minutes, Switch to On-Demand instances,但理论状况是这个策略仅在首次创立时provisioning失效,不实用在运行集群中启动spot的状况,因而咱们依然须要依据理论InstanceFleet的状况去被动申请Ondemand机型。
对于更极其的情景,如黑五到来或者美国大选期间,咱们会被动间接将所以申请机型间接替换成ondemand,以避免频繁的无奈配齐Spot带来的额定工夫开销。另外,在Spark job运行期间,如果遇到了因为Spot机型回收导致的工作中断的状况,咱们会在Airflow的调度下,依据以后集群状态加大ondemand机型进行配比,以期可能疾速复原。
成果
下图是应用了被动scale计划的集群,在一天内24个小时集群依据数据量的状况进行伸缩的状况,红色柱子是集群的Memory capacity,蓝色+绿色的累加局部为理论应用的内存量。从理论应用来看,咱们能够做到在工作提交前被动scale out到冀望的capacity,每个batch的工作能够充分利用集群资源,在SLA工夫内实现数据处理,并在不须要集群之后马上scale in进来。在升高开销的同时满足工作需要。
下图是应用了Amazon EMR autoScale计划的集群。咱们能够看到随着更多的Spark工作提交,集群负载变高,集群依据负载状况scale out了两次,最终在工作全副做完,并闲暇一段时间后将Task node缩回了0。
HDFS的依赖
在咱们的应用场景中,Amazon EMR仅作为计算资源,其上的HDFS仅须要撑持Spark利用即可。在一次批处理的过程中,生成的数据会先存储在HDFS上,而后由publisher将数据搬移并长久化到Amazon S3上。对于为什么不间接写入Amazon S3,次要是思考到业务数据的特点以及Amazon S3的个性(笔者在写这篇文章的时候,看到Amazon Web Services曾经解决了Amazon S3的强统一模型,极大的简化了很多零碎设计,能够参考Strong Read-After-Write Consistency的形容)。从零碎设计的角度,咱们也不冀望数据流上下游因为数据模式耦合的太紧,这样不仅不利于保护,还会加大Spark利用的运行工夫,变相减少了老本同时,对于应用Spot竞价机器的场景,更长的运行工夫也就意味着更大的被中断机会。
- Strong Read-After-Write Consistency
https://aws.amazon.com/cn/blo...
总结
随着咱们对Amazon EMR应用的越来越深刻,在满足产品需要的同时,咱们也在一直“榨干”Amazon EMR开销,本文冀望能通过咱们的Amazon EMR应用教训给读者启发。也感激在这个过程中Amazon EMR团队的反对。
另外Amazon EMR团队也在依据客户的理论需要不断完善和推出新的性能。目前咱们正在调研试用Amazon Web Services最新推出的Amazon EMR on Amazon EKS,冀望可能有更灵便的应用形式,更快的scale速度和更小的开销。咱们会在后续的文章中持续更新停顿。
本篇作者
彭康
FreeWheel高级软件工程师
毕业于中科院计算所,目前就任于 Comcast FreeWheel 数据产品团队,次要负责广告数据平台数据仓库的建设