Amazon Kinesis Data Analytics介绍

现在各种企业每天都在面对继续一直生成的数据须要解决,这些数据可能来自挪动或 Web 应用程序生成的日志文件、网上购物数据、游戏玩家流动、社交网站信息或者是金融交易等。可能及时地解决并剖析这些流数据对企业来说至关重要,通过良好的流数据处理和利用,企业能够疾速做出业务决策,改良产品或服务的品质,晋升用户的满意度。

目前,市面上曾经有很多工具能够帮忙企业实现流数据的解决和剖析。其中,Apache Flink是一个用于解决数据流的风行框架和引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

  • Apache Flink
    https://flink.apache.org


图片来自Apache Flink官网

Amazon Kinesis Data Analytics 是疾速应用 Apache Flink 实时转换和剖析流数据的简略办法,通过无服务器架构实现流数据的解决和剖析。借助Amazon Kinesis Data Analytics,您能够应用基于Apache Flink的开源库构建Java、Scala以及Python应用程序。

Amazon Kinesis Data Analytics为您的Apache Flink应用程序提供底层基础设施,其外围性能包含提供计算资源、并行计算、主动伸缩和应用程序备份(以检查点和快照的模式实现)。您能够应用高级Flink编程个性(如操作符、函数、源和接收器等),就像您本人在托管Flink基础设施时应用它们一样。

  想要理解更多亚马逊云科技最新技术公布和实际翻新,敬请关注2021亚马逊云科技中国峰会!点击图片报名吧~

在Amazon Kinesis Data Analytics应用Python

Amazon Kinesis Data Analytics for Apache Flink 当初反对应用 Python 3.7 构建流数据分析应用程序。这使您可能以 Python 语言在 Amazon Kinesis Data Analytics 上通过 Apache Flink v1.11 运行大数据分析,对Python语言开发者来说十分不便。Apache Flink v1.11 通过PyFlink Table API 提供对 Python 的反对,这是一个对立的关系型 API。


图片来自Apache Flink官网

此外,Apache Flink还提供了一个用于细粒度管制状态和工夫的DataStream API,并且从Apache Flink 1.12版本开始就反对Python DataStream API。无关Apache Flink中API的更多信息,请参阅Flink官网介绍

  • Flink官网介绍
    https://ci.apache.org/project...

Amazon Kinesis Data Analytics Python应用程序示例

接下来,咱们将演示如何疾速上手构建Python版的Amazon Kinesis Data Analytics for Flink应用程序。示例的参考架构如下图所示,咱们将发送一些测试数据到Amazon Kinesis Data Stream,而后通过Amazon Kinesis Data Analytics Python应用程序的Tumbling Window窗口函数做根本的聚合操作,再将这些数据长久化到Amazon S3中;之后能够应用Amazon Glue和Amazon Athena对这些数据进行疾速的查问。整个示例应用程序都采纳了无服务器的架构,在能够实现疾速部署和主动弹性伸缩外,还极大地加重了运维和管理负担。

以下示例是在由光环新网经营的亚马逊云科技中国(北京)区域上进行。

创立Amazon Kinesis Data Stream

示例将在管制台上创立Amazon Kinesis Data Stream,首先抉择到Amazon Kinesis服务-数据流,而后点击“创立数据流”。

输出数据流名称,如“kda-input-stream“;数据流容量中的分区数设置为1,留神这里是为了演示,请依据理论状况配置适合的容量。

点击创立数据流,期待片刻,数据流创立实现。

稍后,咱们将像这个Amazon Kinesis数据流发送示例数据。

创立Amazon S3存储桶

示例将在管制台上创立Amazon S3存储桶,首先抉择到Amazon Kinesis服务,而后点击“创立存储桶”。

输出存储桶名称,如“kda-pyflink-”,这个名称稍后咱们在Amazon Kinesis应用程序中用到。

放弃其它配置不变,点击“创立存储桶”。

稍等片刻,能够看到已胜利创立存储桶。

发送示例数据到Amazon Kinesis Data Stream

接下来,咱们将应用一段Python程序向Amazon Kinesis数据流发送数据。创立kda-input-stream.py文件,并复制以下内容到这个文件,留神批改STREAM_NAME为您刚刚创立的Amazon Kinesis数据流名称,profile_name配置为对应的用户信息。

import datetimeimport jsonimport randomimport boto3STREAM_NAME = "kda-input-stream"def get_data():    return {        'event_time': datetime.datetime.now().isoformat(),        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),        'price': round(random.random() * 100, 2)}def generate(stream_name, kinesis_client):    while True:        data = get_data()        print(data)        kinesis_client.put_record(            StreamName=stream_name,            Data=json.dumps(data),            PartitionKey="partitionkey")if __name__ == '__main__':    session = boto3.Session(profile_name='<your profile>')    generate(STREAM_NAME, session.client('kinesis', region_name='cn-n

执行以下代码,开始向Amazon Kinesis数据流发送数据。

$ python kda-input-stream.py

编写Pyflink代码

接下来,咱们编写PyFlink代码。创立kda-pyflink-demo.py文件,并复制以下内容到这个文件。

# -*- coding: utf-8 -*-"""kda-pyflink-demo.py~~~~~~~~~~~~~~~~~~~1. 创立 Table Environment2. 创立源 Kinesis Data Stream3. 创立指标 S3 Bucket4. 执行窗口函数查问5. 将后果写入指标"""from pyflink.table import EnvironmentSettings, StreamTableEnvironmentfrom pyflink.table.window import Tumbleimport osimport json# 1. 创立 Table Environmentenv_settings = (    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())table_env = StreamTableEnvironment.create(environment_settings=env_settings)statement_set = table_env.create_statement_set()APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"def get_application_properties():    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:            contents = file.read()            properties = json.loads(contents)            return properties    else:        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))def property_map(props, property_group_id):    for prop in props:        if prop["PropertyGroupId"] == property_group_id:            return prop["PropertyMap"]def create_source_table(table_name, stream_name, region, stream_initpos):    return """ CREATE TABLE {0} (                ticker VARCHAR(6),                price DOUBLE,                event_time TIMESTAMP(3),                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND              )              PARTITIONED BY (ticker)              WITH (                'connector' = 'kinesis',                'stream' = '{1}',                'aws.region' = '{2}',                'scan.stream.initpos' = '{3}',                'format' = 'json',                'json.timestamp-format.standard' = 'ISO-8601'              ) """.format(        table_name, stream_name, region, stream_initpos    )def create_sink_table(table_name, bucket_name):    return """ CREATE TABLE {0} (                ticker VARCHAR(6),                price DOUBLE,                event_time TIMESTAMP(3),                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND              )              PARTITIONED BY (ticker)              WITH (                  'connector'='filesystem',                  'path'='s3a://{1}/',                  'format'='csv',                  'sink.partition-commit.policy.kind'='success-file',                  'sink.partition-commit.delay' = '1 min'              ) """.format(        table_name, bucket_name)def count_by_word(input_table_name):    # 应用 Table API    input_table = table_env.from_path(input_table_name)    tumbling_window_table = (        input_table.window(            Tumble.over("1.minute").on("event_time").alias("one_minute_window")        )        .group_by("ticker, one_minute_window")        .select("ticker, price.avg as price, one_minute_window.end as event_time")    )    return tumbling_window_tabledef main():    # KDA 应用程序属性键    input_property_group_key = "consumer.config.0"    sink_property_group_key = "sink.config.0"    input_stream_key = "input.stream.name"    input_region_key = "aws.region"    input_starting_position_key = "flink.stream.initpos"    output_sink_key = "output.bucket.name"    # 输入输出数据表    input_table_name = "input_table"    output_table_name = "output_table"    # 获取 KDA 应用程序属性    props = get_application_properties()    input_property_map = property_map(props, input_property_group_key)    output_property_map = property_map(props, sink_property_group_key)    input_stream = input_property_map[input_stream_key]    input_region = input_property_map[input_region_key]    stream_initpos = input_property_map[input_starting_position_key]    output_bucket_name = output_property_map[output_sink_key]    # 2. 创立源 Kinesis Data Stream    table_env.execute_sql(        create_source_table(            input_table_name, input_stream, input_region, stream_initpos        )    )    # 3. 创立指标 S3 Bucket    create_sink = create_sink_table(        output_table_name, output_bucket_name    )    table_env.execute_sql(create_sink)    # 4. 执行窗口函数查问    tumbling_window_table = count_by_word(input_table_name)    # 5. 将后果写入指标    tumbling_window_table.execute_insert(output_table_name).wait()    statement_set.execute()if __name__ == "__main__":    main()

因为应用程序要应用到Amazon Kinesis Flink SQL Connector,这里须要将对应的amazon-kinesis-sql-connector-flink-2.0.3.jar下载下来。

  • amazon-kinesis-sql-connector-flink-2.0.3.jar
    https://repo1.maven.org/maven...

将kda-pyflink-demo.py和amazon-kinesis-sql-connector-flink-2.0.3.jar打包成zip文件,例如kda-pyflink-demo.zip/;而后,将这个zip包上传到刚刚创立的Amazon S3存储桶中。进入刚刚创立的Amazon S3存储桶,点击“上传”。

抉择刚刚打包好的zip文件,而后点击“上传”。

创立Python Amazon Kinesis Data Analytics应用程序

首先抉择到Amazon Kinesis服务- Data Analytics,而后点击“创立应用程序”

输出应用程序名称,例如“kda-pyflink-demo”;运行时抉择Apache Flink,放弃默认1.11版本。

拜访权限放弃默认,例如“创立/更新 Amazon IAM 角色 kinesis-analytics-kda-pyflink-demo-cn-north-1”;应用程序设置的模板抉择“开发”,留神这里是为了演示,能够依据理论状况抉择“生产”。

点击“创立应用程序”,稍等片刻,应用程序创立实现。

依据提醒,咱们持续配置应用程序,点击“配置”;代码地位配置为刚刚创立的Amazon S3中的zip包地位。

而后开展属性配置。

创立属性组,设置组名为“consumer.config.0”,并配置以下键值对:

input.stream.name为刚刚创立的Amazon Kinesis数据流,例如kda-input-stream

aws.region为以后区域,这里是cn-north-1 flink.stream.initpos设置读取流的地位,配置为LATEST

创立属性组,设置组名为“sink.config.0”,并配置以下键值对:
output.bucket.name为刚刚创立的Amazon S3存储桶,例如kda-pyflink-shtian

创立属性组,设置组名为“kinesis.analytics.flink.run.options”,并配置以下键值对:
python为刚刚创立的PyFlink程序,kda-pyflink-demo.py

jarfile为Amazon Kinesis Connector的名称,这里是amazon-kinesis-sql-connector-flink-2.0.3.jar

而后点击“更新”,刷新应用程序配置

接下来,配置应用程序应用的Amazon IAM角色的权限。进入到Amazon IAM界面,抉择角色,而后找到刚刚新创建的角色。

而后,开展附加策略,并点击“编辑策略”。

补充最初两段Amazon IAM策略,容许该角色能够拜访Amazon Kinesis数据流和Amazon S3存储桶,留神须要替换为您的亚马逊云科技中国区账号。

{    "Version": "2012-10-17",    "Statement": [        {            "Sid": "ReadCode",            "Effect": "Allow",            "Action": [                "s3:GetObject",                "s3:GetObjectVersion"            ],            "Resource": [                "arn:aws-cn:s3:::kda-pyflink-shtian/kda-pyflink-demo.zip"            ]        },        {            "Sid": "ListCloudwatchLogGroups",            "Effect": "Allow",            "Action": [                "logs:DescribeLogGroups"            ],            "Resource": [                "arn:aws-cn:logs:cn-north-1:012345678901:log-group:*"            ]        },        {            "Sid": "ListCloudwatchLogStreams",            "Effect": "Allow",            "Action": [                "logs:DescribeLogStreams"            ],            "Resource": [                "arn:aws-cn:logs:cn-north-1:012345678901:log-group:/aws/kinesis-analytics/kda-pyflink-demo:log-stream:*"            ]        },        {            "Sid": "PutCloudwatchLogs",            "Effect": "Allow",            "Action": [                "logs:PutLogEvents"            ],            "Resource": [                "arn:aws-cn:logs:cn-north-1:012345678901:log-group:/aws/kinesis-analytics/kda-pyflink-demo:log-stream:kinesis-analytics-log-stream"            ]        },        {            "Sid": "ReadInputStream",            "Effect": "Allow",            "Action": "kinesis:*",            "Resource": "arn:aws-cn:kinesis:cn-north-1:012345678901:stream/kda-input-stream"        },        {            "Sid": "WriteObjects",            "Effect": "Allow",            "Action": [                "s3:Abort*",                "s3:DeleteObject*",                "s3:GetObject*",                "s3:GetBucket*",                "s3:List*",                "s3:ListBucket",                "s3:PutObject"            ],            "Resource": [                "arn:aws-cn:s3:::kda-pyflink-shtian",                "arn:aws-cn:s3:::kda-pyflink-shtian/*"            ]        }    ]}

回到Amazon Kinesis Data Analytics 利用程序界面,点击“运行”。

点击“关上Apache Flink控制面板”,跳转到Flink的界面。

点击查看正在运行的工作。

您能够依据需要进一步查看详细信息。上面,咱们到Amazon S3中验证数据是否曾经写入,进入到创立的存储桶,能够看到数据曾经胜利写入。

应用Amazon Glue对数据进行爬取

进入到Amazon Glue服务界面,抉择爬网程序,点击“增加爬网程序“,输出爬网程序名称。

放弃源类型不变,增加数据存储为创立的Amazon S3存储桶输入门路。

抉择已有角色或者创立一个新的角色。

抉择默认数据库,能够依据需要增加表前缀。

创立实现后,点击执行。

爬取胜利后,能够在数据表中查看到详细信息。

而后,能够切换到Amazon Athena服务来查问后果。

注:如果呈现Amazon Glue爬网程序或者Amazon Athena查问权限谬误,可能是因为开启Lake Formation导致,能够参考文档授予角色相应的权限。

  • 文档
    https://docs.aws.amazon.com/l...

小结

本文首先介绍了在亚马逊云科技平台上应用Apache Flink的疾速形式 – Amazon Kinesis Data Analytics for Flink,而后通过一个无服务器架构的示例演示了如何在Amazon Kinesis Data Analytics for Flink通过PyFlink实现Python流数据处理和剖析,并通过Amazon Glue和Amazon Athena对数据进行即席查问。Amazon Kinesis Data Analytics for Flink对Python的反对也曾经在在光环新网经营的亚马逊云科技中国(北京)区域及西云数据经营的亚马逊云科技中国(宁夏)区域上线,欢送应用。

参考资料

1.https://aws.amazon.com/soluti...
2.https://docs.aws.amazon.com/l...
3.https://docs.aws.amazon.com/k...
4.https://ci.apache.org/project...

相干浏览

本篇作者

史天
亚马逊云科技解决方案架构师

领有丰盛的云计算、大数据和机器学习教训,目前致力于数据迷信、机器学习、无服务器等畛域的钻研和实际。译有《机器学习即服务》《基于Kubernetes的DevOps实际》《Prometheus监控实战》等。