本文首发于:行者 AI
AWS EMR 是一个计算集群。能够通过 ta 创立自定义配置的虚拟机,并主动装置所需计算框架(Spark,Hadoop,Hive 等),以便用来进行大数据计算。
1. 我的项目背景
公司目前有一个我的项目,通过爬虫收集数据,离线计算失去用户画像,并将最终后果写入 rds,通过 api 向外展现数据。
2. 架构演进
2.1 技术栈
- 计算框架 Spark
- 调度框架 Airflow
- 数据存储 Hadoop,Mysql
- 数仓工具 Hive,Presto
- 辅助工具 Zepplin
- 脚本语言 Java,Scala,Python
2.2 第一版
环境
咱们在某云厂商开了 6 台虚构器(4 核 8G),spark on yarn 模式运行,其中 1 台作为主节点,运行 hadoop 主节点和 airflow 调度程序,其余作为数据节点。
计算过程
- 通过 Spark Streaming 将数据落地到 Hadoop
- Airflow 定时向主节点通过 Spark-submit 形式提交命令
- Spark 计算后将最终后果写入 Mysql
- 平时开发人员能够在 Zepplin 进行查问
成果
计算流程能够失常进行
思考
通过一段时间的察看剖析,咱们发现
- 大部分计算工作都能在 较短时间内 实现
- 机器每天闲置工夫很长
- 业务 没有很高的实时性 要求
- 高配置虚构器老本很高
论断
基于现状,咱们心愿能有个 即开即用 的零碎,就像电脑一样,要用就关上,用完就敞开。通过调研,最终抉择了 AWS 的 EMR。
2.3 第二版
环境
在将零碎迁徙到 AWS EMR 之后,在 AWS 上开了一台虚构器(1 核 2G)运行 Airflow 和 Kinesis
这台虚构器须要始终运行,但 Airflow 自身不须要高配置
计算过程
- 通过 Kinesis 将数据落到 S3
-
Airflow 定时发动工作
- 发动创立 EMR 申请
可自定义机器配置,要装置的计算框架,也可笼罩框架配置。可通过 Python 脚本检测集群是否创立胜利
- 提交计算工作
- 发动创立 EMR 申请
- 敞开集群
成果
计算流程能够失常进行,但不须要长开机器了,只须要一台低配来触发定时工作即可
思考
通过一段时间的察看
- EMR 费用比起虚构器,的确便宜很多
- 能够通过 console 台查看集群状态,管制集群开关
- 不不便的中央,平时要查看 Hadoop 的数据,须要本人写脚本拉取,不能应用辅助工具了
::: hljs-center
Talk is cheap, show me the code
:::
筹备工作
- 注册 AWS 账号,登录
- 开明 EMR,S3
开明 S3 的目标是为了长久化数据,因为 EMR 集群自身不带额定硬盘,须要内部介质贮存
- 开明 AWS 内网可拜访的 Mysql
如果不必 Hive,可跳过这一步,同理,须要内部介质贮存 Hive 的数据结构
- 筹备创立 EMR 集群的脚本
这里有个坑,开始咱们应用的 AWS SDK 来做这件事,但无奈自定义计算框架配置(应该是 BUG),最后咱们通过批改 SDK 源码解决了这个问题,但起初发现根本没用到 SDK 其余性能时,咱们将这部分代码提成了独自的文件,因为应用了 Airflow 进行调度,所以决定用了 Python
- 编写 Spark 工作,打包上传至 S3
EMR LIB
# coding: UTF-8
import boto3, json, requests, requests
from datetime import datetime
def get_region():
# 这个地址不必改
r = requests.get("http://169.254.169.254/latest/dynamic/instance-identity/document")
response_json = r.json()
return response_json.get('region')
def client(region_name):
global emr
emr = boto3.client('emr', region_name=region_name)
# 创立 EMR
def create_cluster(name):
param = {
# 批改须要的框架
"Applications":[{"Name":"Hadoop"},{"Name":"Hive"},{"Name":"Spark"}],
# 这里的名字会显示到控制台
"Name":name,
"ServiceRole":"EMR_DefaultRole",
"Tags":[],
"ReleaseLabel":"emr-5.26.0",
"Instances":{
"TerminationProtected":False,
"EmrManagedMasterSecurityGroup":"sg-0085fba9c3a6818f5",
"InstanceGroups":[{
"InstanceCount":1,
"Name":"主实例组 - 1",
"InstanceRole":"MASTER",
"EbsConfiguration":{
"EbsBlockDeviceConfigs":[{
"VolumeSpecification":{
"SizeInGB":32,
"VolumeType":"gp2"
},
"VolumesPerInstance":1
}]
},
# 批改须要的硬件配置
"InstanceType":"m4.large",
"Market":"ON_DEMAND",
"Configurations":[{
# 批改 Hive 的 meta 源
"Classification":"hive-site",
"Properties":{
"javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
"javax.jdo.option.ConnectionUserName":"user",
"javax.jdo.option.ConnectionPassword":"pwd"
}
},{
"Classification":"yarn-env",
"Properties":{},
"Configurations":[{
"Classification":"export",
"Properties":{
"AWS_REGION":"cn-northwest-1",
"S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
"S3_USE_HTTPS":"0",
"S3_VERIFY_SSL":"0"
}
}]
}]
},{
"InstanceRole":"CORE",
"InstanceCount":1,
"Name":"外围实例组 - 2",
"Market":"ON_DEMAND",
# 批改须要的硬件配置
"InstanceType":"r5d.2xlarge",
"Configurations":[{
"Classification":"hive-site",
"Properties":{
"javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
"javax.jdo.option.ConnectionUserName":"user",
"javax.jdo.option.ConnectionPassword":"pwd"
}
},{
"Classification":"yarn-env",
"Properties":{},
"Configurations":[{
"Classification":"export",
"Properties":{
"AWS_REGION":"cn-northwest-1",
"S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
"S3_USE_HTTPS":"0",
"S3_VERIFY_SSL":"0"
}
}]
}]
},{
# 批改须要的工作节点数
"InstanceCount":4,
"Name":"工作实例组 - 4",
"InstanceRole":"TASK",
"EbsConfiguration":{
"EbsBlockDeviceConfigs":[{
"VolumeSpecification":{
"SizeInGB":32,
"VolumeType":"gp2"
},
"VolumesPerInstance":4
}]
},
# 批改须要的硬件配置
"InstanceType":"r5d.2xlarge",
"Market":"ON_DEMAND",
"Configurations":[{
"Classification":"hive-site",
"Properties":{
"javax.jdo.option.ConnectionURL":"jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=UTF-8",
"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",
"javax.jdo.option.ConnectionUserName":"user",
"javax.jdo.option.ConnectionPassword":"pwd"
}
},{
"Classification":"yarn-env",
"Properties":{},
"Configurations":[{
"Classification":"export",
"Properties":{
"AWS_REGION":"cn-northwest-1",
"S3_ENDPOINT":"s3.cn-northwest-1.amazonaws.com.cn",
"S3_USE_HTTPS":"0",
"S3_VERIFY_SSL":"0"
}
}]
}]
}],
"KeepJobFlowAliveWhenNoSteps":True,
"Ec2SubnetId":"subnet-027bff297ea95039b",
"Ec2KeyName":"hifive.airflow",
"EmrManagedSlaveSecurityGroup":"sg-05a0e076ee7babb9e"
},
"JobFlowRole":"EMR_EC2_DefaultRole",
"Steps":[{
"HadoopJarStep":{"Args":["state-pusher-script"],
"Jar":"command-runner.jar"
},
"Name":"Setup Hadoop Debugging"
}],
"ScaleDownBehavior":"TERMINATE_AT_TASK_COMPLETION",
"VisibleToAllUsers":True,
"EbsRootVolumeSize":10,
"LogUri":"s3n://aws-logs-550775287661-cn-northwest-1/elasticmapreduce/",
"AutoScalingRole":"EMR_AutoScaling_DefaultRole"
}
cluster_response = emr.run_job_flow(**param)
return cluster_response['JobFlowId']
# 获取 EMR 拜访入口
def get_cluster_dns(cluster_id):
response = emr.describe_cluster(ClusterId=cluster_id)
return response['Cluster']['MasterPublicDnsName']
# 期待集群创立实现
def wait_for_cluster_creation(cluster_id):
emr.get_waiter('cluster_running').wait(ClusterId=cluster_id)
# 敞开 EMR
def terminate_cluster(cluster_id):
emr.terminate_job_flows(JobFlowIds=[cluster_id])
调用测试
# 创立 6 台机器的集群(1 master,1 core,4 worker)cluster_id = create_cluster("biz_daily_2020_10_09")
# 阻塞直到创立胜利
wait_for_cluster_creation(cluster_id)
# dns 相当于虚拟机的 ssh 地址,每次都不同
# ssh 登录这个地址能够提交 spark 命令了,这里应用 Airflow 的 SSHOperator 模仿登录并提交命令
cluster_dns = get_cluster_dns(cluster_id)
# 敞开集群
terminate_cluster(cluster_id)
3. 其余坑
- Airflow 1.9.0的工夫模板 {{ds}} 生成的是格林尼治工夫,要改为我国工夫,需手动加 8 小时,不晓得新版本是否反对本地工夫。
- ssh 登录 dns 用户名 hadoop,这个用户是 AWS 生成的,仿佛无奈批改。