乐趣区

关于amazon:使用-Amazon-Step-Functions-和-Amazon-Athena-实现简易大数据编排

很多公司都在亚马逊云上围绕 Amazon S3 实现了本人的数据湖。数据湖的建设波及到数据摄入、荡涤、转换,以及出现等多个步骤,还须要对这些步骤进行编排,这对很多人手不足或者初识数据湖的团队造成了挑战。

在本篇文章中,我将介绍一个应用 Amazon Step Functions 和 Amazon Athena 的繁难大数据编排计划。如果你的团队当初曾经有相当局部沉睡数据,想要利用,然而又没有专人或者专门的力量的公司,那么能够参考这个计划,在数天工夫内搭建起一套可用的根底版大数据流水线,开始对数据进行一些摸索和开掘。

计划整体都采纳无服务器服务,用户无需放心基建费用,齐全只为用量付费,实现低成本疾速启动。

亚马逊云科技开发者社区为开发者们提供寰球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、流动与比赛等。帮忙中国开发者对接世界最前沿技术,观点,和我的项目,并将中国优良开发者或技术举荐给寰球云社区。如果你还没有关注 / 珍藏,看到这里请肯定不要匆匆划过,点这里让它成为你的技术宝库!

服务介绍

开始之前,咱们简略介绍下计划的两个外围服务。

Amazon Athena 是一个无服务器版的 SQL 大数据查问服务,底层基于 PrestoDB 引擎。用户能够提交 SQL 语句,而这个引擎则依据语句来分布式扫描数据湖中的文件,最初汇总成后果。除了查问之外,Athena 也能够用作简略的 ETL 工具。它依照扫描的文件的大小来免费。

Amazon Step Functions 是一个无服务器编排服务。它能够帮忙咱们设计一个蕴含多个步骤的流程(有向无环图,Directed Acyclic Graph,简称 DAG),让每个步骤的输入变成下一个步骤的输出,并且反对步骤并发、条件判断以及不同的重试机制等。它和亚马逊云科技的其余服务有着很好的集成,并且也是齐全依照步骤执行的次数来免费。

业务介绍

简略介绍一下业务。

假如咱们是一家传统的白电公司。尽管咱们追寻潮流,在咱们的很多新电器上搭载了 IoT 性能,并且也收到了很多的 IoT 数据,但这些数据其实并没太好地利用起来。当初,咱们心愿能做一个数据湖,用最低的老本,疾速从这些数据外面开掘一些价值。

目前最困扰咱们的问题是电器品质和培修问题,以冰箱为例,如果商用冰箱出故障,可能会导致食品变质导致食品卫生问题,而如果保留的是药品,则更可能导致重大的问题;而家用冰箱如果出故障,也会重大影响客户体验和对品牌的信赖。所以,咱们心愿能对设施回报的数据进行开掘,看看冰箱在故障之前,通常呈现什么指标异样,不同地区的同款冰箱在指标上是否有区别,以及不同的应用形式是否对冰箱的寿命和培修产生影响。

在这些问题之上,咱们可能会造成一套预测性保护的机制,在冰箱出故障之前就做好预判,提前保护颐养,防止问题的产生。

整体架构

架构的整体数据流向图上曾经展现得很分明,咱们本次重点关注这些服务应用的细节,以及串接这些服务时的一些要点。

数据摄入

本次的数据源格局是 GZip 压缩好的 JSON Lines 文件,每天可能是单个或者数个文件。文件曾经寄存在某个内网 HTTP 节点,咱们须要定期去拉取,并且上传到 S3 桶。

数据格式示范如下。

{"model": "model-1234", "city": "test-city-1", "reading_1": "15.6"}
{"model": "model-4323", "city": "test-city-2", "reading_1": "4.5"}
{"model": "model-3135", "city": "test-city-1", "reading_1": "7.4"}
{"model": "model-4237", "city": "test-city-3", "reading_1": "8.1"}
{"model": "model-9928", "city": "test-city-1", "reading_1": "6.3"}

把文件上传到 S3 桶之后,咱们能够间接在 Athena 的查问编辑器中应用如下 SQL 语句创立内部表。

CREATE EXTERNAL TABLE example (
    model STRING,
    city STRING,
    reading_1 STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://{bucket-name}/'

创立胜利后,咱们就能够立刻进行查问。

SELECT * 
FROM example;

这里须要留神的是 Athena 反对的是单个文件压缩,而不是咱们常见的 TAR 包压缩。也就是说,每个文件都是通过 gzip filename.json 命令压缩成 filename.json.gz 而不是通过 tar cfz 命令打包并压缩成 .tar.gz,否则 Athena 将无奈辨认。

当然,通常咱们的 IoT 数据都蕴含大量的字段,这里很可能咱们不会用写 SQL 的形式来建表,而是用 Amazon Glue 的爬虫服务进行爬取,主动建表和识别字段类型。爬虫的应用不是本文的重点,如有须要,读者可参考其余对于 Glue 爬虫的文章。

无格局文本文件解决

在 IoT 场景中,有时候咱们会遇到特定的原始数据格式。它并不是 JSON 格局,也不是其余认可的模式,而是取决于应用的设施,相似上面这样的格局。

DEV {model=23482, sn='238148234571', reading_1=23.5}
DEV {model=36740, sn='9942716322', reading_1=}

此时,咱们能够借用 Athena 的正则表达式匹配编解码器(RegEx SerDe),来把数据读取成字符串,再进行解决。留神:数据依然须要按行宰割。

CREATE EXTERNAL TABLE example_regex (
    model STRING,
    sn STRING,
    reading_1 STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "^DEV\\s*\\{model=(.*?),\\s*sn='(.*?)',\\s*reading_1=(.*?)\\}"
) LOCATION 's3://{bucket-name}/{prefix}';

应用这种形式,所有的字段必须有固定程序,正则表达式捕捉到的字符,会被按程序录入到字段中,不便后续解决。

数据预处理

原始数据传输到 S3 桶后,咱们须要对它做一些预处理,不便后续正式应用。

字段格局转换

首先,咱们须要对字段格局转换。因为原始的 IoT 数据所有字段简直都是字符串格局,不便于操作,所以咱们须要把这些字段格局转换成正确的格局。咱们先创立新的指标表。

CREATE EXTERNAL TABLE example_preprocessed (
    model STRING,
    city STRING,
    reading_1 DOUBLE
)
STORED AS PARQUET
LOCATION 's3://{bucket-name}/{prefix}'
TBLPROPERTIES ("parquet.compression"="SNAPPY");

留神,此时咱们不仅转换字段格局,存储格局也换成了更便于统计操作的 Parquet,并应用 Snappy 进行了压缩。

对于 Athena 来说,字段转换非常简单,只需应用 SQL 的 CAST、DATE_PARSE 等类型转换函数。比方,咱们能够应用如下形式语句把原始数据转换成正确的格局并插入到新的表。

INSERT INTO example_preprocessed 
SELECT model, city, CAST(reading_1 AS DOUBLE) as reading_1 FROM example;

动静字段映射

在 IoT 场景中,咱们还可能会遇到动静字段映射问题。

比方,每个设施都会回传 data_01、data_02、date_03 这样的字段,然而不同设施、不同型号甚至不同版本的设施,所传回来的字段代表的意思可能不同。在 A 设施上 data_01 可能是温度,而在 B 设施上,data_01 则可能是门开关的角度。

这就须要咱们有一个表来保留字段的映射关系,并且能动静地对这些数据进行映射。外围思路如下。

  • 保留一份表、全字段、全映射指标字段的映射关系
  • 遍历这个映射关系,并且应用 INSERT INTO 语句,按程序列出所有源字段和指标字段
  • 增加反向条件,对未在映射关系列表中的设施进行默认映射

这个思路次要是借助了 INSERT INTO 能够同时列出字段和值并按程序来插入的性能。上面是一段示意代码。

import time
import boto3

# 只打印 SQL

dry_run = False

# 源表和指标表

source_table = 'source_table'
target_table = 'target_table'
db = 'dbname'

# 映射关系表,从数据库中取出后改成如下格局

mapping = {
  # 型号名字为 Key
  'BCD': {
    'field': 'filed',  # 所有字段都必须列出来,即使是齐全对应
    'model': 'model',
    'data01': 'temperature',  # 举例映射 data01 > temperature,data01 > door_status
    'data02': 'door_status'
  },
  'ABC': {
    'field': 'field',  # 所有字段都必须列出来,即使是齐全对应
    'model': 'model',
    'data01': 'door_status',  # 举例映射 data01 > door_status,data01 > temperature
    'data02': 'temperature'
  },
  # 未被匹配的型号应用默认映射
  'Other': {
    'field': 'field',
    'model': 'model',
    'data01': 'other', # 映射的指标字段必须存在于指标表,如果有指标表字段没有笼罩,就会变成 NULL
    'data02': 'other2' # 映射的指标字段不能反复
  }
}

# 封装 Athena 申请和 SQL 到函数

client = boto3.client('athena')

def insert_with_mapping(model, mapping):
  source_columns = [f'"{k}"' for k in mapping.keys()]
  target_columns = [f'"{v}"' for v in mapping.values()]

  query = f'INSERT INTO {target_table} ({",".join(target_columns)}) SELECT {",".join(source_columns)} FROM {source_table} WHERE'

  if type(model) == list:
    models = [f"'{m}'" for m in model]
    query += f'model NOT IN ({",".join(models)})'
  else:
    query += f"model ='{model}'"

  print(query)

  if (dry_run):
    return

  query_start = client.start_query_execution(
      QueryString = query,
      QueryExecutionContext = {'Database': db}, 
      ResultConfiguration = {'OutputLocation': 's3://my-athena-result-bucket'}
  )

  max_execution = 100 # 设置最长执行工夫
  state = 'RUNNING'

  while (max_execution > 0 and state in ['RUNNING', 'QUEUED', 'SUCCEEDED','FAILED']):
    max_execution = max_execution - 1
    response = client.get_query_execution(QueryExecutionId = query_start['QueryExecutionId'])

    if 'QueryExecution' in response and \
            'Status' in response['QueryExecution'] and \
            'State' in response['QueryExecution']['Status']:
      state = response['QueryExecution']['Status']['State']
      if state == 'FAILED':
          print(response)
          raise Exception(f'> {model} INSERTION FAILED.')
          break
      elif state == 'SUCCEEDED':
          results = client.get_query_results(QueryExecutionId=query_start['QueryExecutionId'])
          print(f'> {model} INSERTION SUCCEEDED.')
          break

    print('WAITING...')
    time.sleep(1)

# 遍历每个模型,别离插入

mapping_without_other = {k: v for k, v in mapping.items() if k != 'Other' }
mapping_other = mapping['Other']

for model, column_mapping in mapping_without_other.items():
  insert_with_mapping(model, column_mapping)

insert_with_mapping(list(mapping_without_other.keys()), mapping_other)

分段导入

因为 INSERT INTO … SELECT 语句会有 100 个分区的限度,如果咱们按小时分区,一次导入了超过 100 个小时的数据,或者依照模型分区,一次导入超过 100 个模型,就会导入失败。

这时候,咱们须要做分段导入。分段导入的形式很直白,就是用 WHERE 语句把数据分拆。比方每次插入 99 小时数据,或者每次插入 99 个模型。

革除已解决数据

最初,咱们还须要删除曾经预处理的数据,不便下一天导入新的数据持续解决。因为 S3 自身没有提供通配符删除的性能,所以咱们只能应用一个脚本列出所有的数据文件,而后对立删除。

数据统计

业务外围的数据统计反而是整个流程中比较简单的局部,因为所有业务逻辑都应用 SQL 语句来示意。本次文章的重点不是业务梳理,所以对具体的 SQL 查问语句不再做展现,读者可依据本人须要来撰写和调用。

流水线编排

在所有流程都明确下来,并且手动执行结束后,咱们就能够开始设计自动化流水线了。

不论是 Step Functions,还是 Apache Airflow,流水线工具根本都基于「有向无环图」(Directed Acyclic Graph,简称 DAG)的理念。有向,指的是流水线中的步骤都明确指向下一个步骤,直至完结;无环,指的是步骤只往一个方向走,不能折返,造成循环。

之所以要防止循环,是因为调度器须要晓得步骤的先后顺序(依赖关系)。如果呈现了 A → B → A 这样的循环,那么调度器就会发现 A 须要等 B 执行完,但 B 又须要等 A 执行完,就没方法决定先执行哪一个了。反之,如果所有步骤都朝一个方向推动,又没有循环,就能明确先后顺序,并且也能够晓得哪些步骤可能是能够并行执行,晋升效率。

在 Step Functions 中,流水线被称作「状态机」(State Machine)。每个状态机分为多个步骤,而每个步骤则是一个亚马逊云 API 的调用。上一个步骤的输入,会作为下一个步骤的输出,直到出错或者运行完结。当然,步骤也能够调用其余状态机,从而把多个状态机串联成一个大的工作流。

数据摄入

咱们原来是在 Amazon EC2 实例上间接执行命令来下载数据。当初,咱们要把这个命令放到状态机里,有两个抉择。

  • 应用 Amazon Lambda 的无服务器函数间接执行这个命令
  • 应用一台 EC2 机器来执行这个命令

这里次要须要思考的是下载的文件大小。宁夏和北京区域的 Lambda 本地长期存储只有 512MB,海内最高可配置至 10GB,所以,如果下载的文件超过这个下限,就可能须要思考 EFS 等内部存储计划,或者改用 EC2 来执行。

如果用 EC2 实例来执行命令,就没有执行时长和存储空间的问题。不过,咱们还须要一个不便的形式能够调用实例上的命令,并且把执行结返回到步骤中。

要近程执行命令,咱们能够应用 Amazon System Manager(下简称 SSM)。如果你应用的是 Amazon Linux,则其客户端曾经随零碎装置,咱们只须要为这个实例增加如下策略即可应用。

  • arn:aws-cn:iam::aws:policy/AmazonSSMManagedInstanceCore,这个托管策略容许 SSM 操控该实例,包含执行命令、从浏览器中登录实例等。

因为下载和上传的工夫不确定,所以咱们这里须要有一个「期待」的过程。这里,咱们须要调用 Step Functions 的 API,通知它工作执行的后果。这个须要咱们的 EC2 实例具备如下权限。

  • states:SendTaskSuccess,发送工作胜利信号
  • states:SendTaskFailure,发送工作失败信号
  • states:SendTaskHeartbeat,发送工作心跳信号,确认工作还在执行

这里有一个问题,就是 EC2 上的执行者须要晓得当初执行的是哪个工作,这样能力在发送信号的时候附带上工作 ID。Step Functions 提供了一个形式传入元数据,就是在参数键值前面增加 .$,而后在参数中应用 $$ 来援用。

从上图能够看出,咱们把原来的 TaskToken 改成了 TaskToken.$,而后就能够间接应用 $$.Task.Token 来取出元数据中蕴含的「工作令牌」(Task Token)。工作执行实现时,咱们只须要应用 SendTaskSuccess 并带上这个令牌,Step Functions 就会认为这个工作曾经执行实现。

任意一个字符串参数,都能够用这个形式来替换成元数据中的值。借此,咱们能够在任意步骤中取得工作名字、状态机原始参数等元数据。

但这里还有一个问题,那就是 SSM 的 sendCommand API 参数只反对数组,不反对字符串。这就意味着咱们没方法用 .$ 后缀的形式把元数据间接传入,只能通过一个 Lambda 函数做一下转发。此时,Lambda 函数须要有调用 ssm:sendCommand 的权限。

这里我写了一个示范的 Lambda 函数。

import json
import boto3

def lambda_handler(event, context):
    print(event)
    
    client = boto3.client('ssm')

    instance_id = 'i-xxxxxxxx' # 示意代码,应用硬编码
    response = client.send_command(InstanceIds=[instance_id],
        DocumentName='AWS-RunShellScript',
        Parameters={
            'commands': [f'aws stepfunctions send-task-success --region cn-northwest-1 --task-token {event["TaskToken"]} --task-output {{}}'
            ] 
        }
    )
    return {
        'statusCode': 200,
        'body': json.dumps(response, default=str)
    }

这个函数会调用 ssm:sendCommand,在指定实例上运行命令。这里作为演示,只会发送胜利信号。如需减少命令,间接在 commands 参数下,发送信号之前,减少所需的命令即可。如果要在在生产环境下应用,可能咱们还会退出错误处理之类的,或者把所须要的命令间接写成一个欠缺的脚本。

数据处理

数据处理可能会用到「并发」(Parallel)和「判断」(Choice)两种流步骤。流步骤指的是不间接调用 API,而是做一些流程上的操作。比方「并发」让咱们能够并行多个步骤,而「判断」则能够让咱们依据上个步骤的不同输入来抉择执行不同的步骤。

在数据处理阶段,咱们可能会同时执行多个转换,比方可能按日期、城市来把不同的数据提取到不同的表内。在数据计算阶段,咱们也可能会同时执行相互之间没有依赖关系的统计运算。这也是利用了 S3 存储高并发、高吞吐的劣势。

此外,咱们还能够应用条件判断。比方,在收到超过 10 万条记录时,才启动统计操作。再比方,当发现某个城市的故障率飙升时,收回告警等等。

定时触发

还有一个常见的需要是定时触发。如后面业务简介所言,咱们可能会须要每天定时触发某个状态机,或者按周期触发,比方每 6 小时执行一次。此时,咱们能够借助 Amazon EventBridge 的定时性能。

关上 Amazon EventBridge 服务,并找到「规定 > 创立规定」,「规定类型」抉择「打算」。

接下来,咱们就能够输出 cron 表达式,或者输出周期了。

cron 表达式须要填写所有上面的字段,比方在「分钟」框输出 1 就代表每个小时的第 1 分钟,而在「一周中的某天」框输出 2 则代表每周二。留神其中「一个月中的某天」和「一周中的某天」是有抵触的,所以二者只能输出一个,而后把另一个用 ? 代替。如果心愿每分钟、每小时等都执行,那么就应用 * 代替。

输出胜利时,会在下方列出下次执行的期间。留神:目前此处的 cron 表达式仅应用 UTC 工夫,所以在应用时须要把时区也算进去。

接下来,咱们能够把咱们的状态机设置成「指标」。

保留之后,咱们就能够在规定详情页面看到接下来 10 次触发工夫。

总结

这篇文章中,咱们以一个 IoT 场景为例,展现了如何联合 Step Functions 和 Athena 来实现繁难的大数据调用。正确应用这些服务,能够让咱们在数天之内就造成一个数据湖,让咱们能够开始对数据湖中的数据进行摸索。

很多传统公司在开辟新业务时往往会产生大量数据,但这些数据的应用须要大量业余开发和运维,这对很多刚成立的大数据团队造成了很大的压力。应用这些托管服务,用户无需再关怀底层服务器,而能够把大量工夫用在业务梳理和数据的价值开掘上,大大降低了大数据的入门门槛。

当然,这篇文章次要还是抛砖引玉,有很多点因为篇幅问题未能波及。比方:

  • 工作出错时的复原、告警和重试机制
  • 工作的监控和统计
  • 更实时的数据摄入
  • 数据的增量更新
  • 更高效的分区和数据查问形式
  • 数据的安全性和权限管制

这些都是在应用更加深刻后必然会遇到的问题。后续咱们会有更多文章为大数据初学者介绍如何应用托管和无服务器服务来实现这些机制。

心愿这篇文章对读者有所帮忙,疾速搭建其本人的数据湖。

本篇作者

张玳
Amazon 解决方案架构师。十余年企业软件研发、设计和征询教训,专一企业业务与 Amazon 服务的有机联合。译有《软件之道》《精益守业实战》《精益设计》《互联网思维的企业》,著有《体验设计白书》等书籍。

文章起源:https://dev.amazoncloud.cn/column/article/630a141b76658473a32…

退出移动版