乐趣区

关于分析:面对持续不断生成的流数据-Amazon-Kinesis-Data-Analytics-实现及时分析与处理

Amazon Kinesis Data Analytics 介绍

现在各种企业每天都在面对继续一直生成的数据须要解决,这些数据可能来自挪动或 Web 应用程序生成的日志文件、网上购物数据、游戏玩家流动、社交网站信息或者是金融交易等。可能及时地解决并剖析这些流数据对企业来说至关重要,通过良好的流数据处理和利用,企业能够疾速做出业务决策,改良产品或服务的品质,晋升用户的满意度。

目前,市面上曾经有很多工具能够帮忙企业实现流数据的解决和剖析。其中,Apache Flink是一个用于解决数据流的风行框架和引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

  • Apache Flink
    https://flink.apache.org


图片来自 Apache Flink 官网

Amazon Kinesis Data Analytics 是疾速应用 Apache Flink 实时转换和剖析流数据的简略办法,通过无服务器架构实现流数据的解决和剖析。借助 Amazon Kinesis Data Analytics,您能够应用基于 Apache Flink 的开源库构建 Java、Scala 以及 Python 应用程序。

Amazon Kinesis Data Analytics 为您的 Apache Flink 应用程序提供底层基础设施,其外围性能包含提供计算资源、并行计算、主动伸缩和应用程序备份(以检查点和快照的模式实现)。您能够应用高级 Flink 编程个性(如操作符、函数、源和接收器等),就像您本人在托管 Flink 基础设施时应用它们一样。

📢  想要理解更多亚马逊云科技最新技术公布和实际翻新,敬请关注 2021 亚马逊云科技中国峰会!点击图片报名吧~

在 Amazon Kinesis Data Analytics 应用 Python

Amazon Kinesis Data Analytics for Apache Flink 当初反对应用 Python 3.7 构建流数据分析应用程序。这使您可能以 Python 语言在 Amazon Kinesis Data Analytics 上通过 Apache Flink v1.11 运行大数据分析,对 Python 语言开发者来说十分不便。Apache Flink v1.11 通过 PyFlink Table API 提供对 Python 的反对,这是一个对立的关系型 API。


图片来自 Apache Flink 官网

此外,Apache Flink 还提供了一个用于细粒度管制状态和工夫的 DataStream API,并且从 Apache Flink 1.12 版本开始就反对 Python DataStream API。无关 Apache Flink 中 API 的更多信息,请参阅Flink 官网介绍

  • Flink 官网介绍
    https://ci.apache.org/project…

Amazon Kinesis Data Analytics Python 应用程序示例

接下来,咱们将演示如何疾速上手构建 Python 版的 Amazon Kinesis Data Analytics for Flink 应用程序。示例的参考架构如下图所示,咱们将发送一些测试数据到 Amazon Kinesis Data Stream,而后通过 Amazon Kinesis Data Analytics Python 应用程序的 Tumbling Window 窗口函数做根本的聚合操作,再将这些数据长久化到 Amazon S3 中;之后能够应用 Amazon Glue 和 Amazon Athena 对这些数据进行疾速的查问。整个示例应用程序都采纳了无服务器的架构,在能够实现疾速部署和主动弹性伸缩外,还极大地加重了运维和管理负担。

以下示例是在由光环新网经营的亚马逊云科技中国(北京)区域上进行。

创立 Amazon Kinesis Data Stream

示例将在管制台上创立 Amazon Kinesis Data Stream,首先抉择到 Amazon Kinesis 服务 - 数据流,而后点击“创立数据流”。

输出数据流名称,如“kda-input-stream“;数据流容量中的分区数设置为 1,留神这里是为了演示,请依据理论状况配置适合的容量。

点击创立数据流,期待片刻,数据流创立实现。

稍后,咱们将像这个 Amazon Kinesis 数据流发送示例数据。

创立 Amazon S3 存储桶

示例将在管制台上创立 Amazon S3 存储桶,首先抉择到 Amazon Kinesis 服务,而后点击“创立存储桶”。

输出存储桶名称,如“kda-pyflink-”,这个名称稍后咱们在 Amazon Kinesis 应用程序中用到。

放弃其它配置不变,点击“创立存储桶”。

稍等片刻,能够看到已胜利创立存储桶。

发送示例数据到 Amazon Kinesis Data Stream

接下来,咱们将应用一段 Python 程序向 Amazon Kinesis 数据流发送数据。创立 kda-input-stream.py 文件,并复制以下内容到这个文件,留神批改 STREAM_NAME 为您刚刚创立的 Amazon Kinesis 数据流名称,profile_name 配置为对应的用户信息。

import datetime
import json
import random
import boto3

STREAM_NAME = "kda-input-stream"

def get_data():
    return {'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

if __name__ == '__main__':
    session = boto3.Session(profile_name='<your profile>')
    generate(STREAM_NAME, session.client('kinesis', region_name='cn-n

执行以下代码,开始向 Amazon Kinesis 数据流发送数据。

$ python kda-input-stream.py

编写 Pyflink 代码

接下来,咱们编写 PyFlink 代码。创立 kda-pyflink-demo.py 文件,并复制以下内容到这个文件。

# -*- coding: utf-8 -*-

"""
kda-pyflink-demo.py
~~~~~~~~~~~~~~~~~~~
1. 创立 Table Environment
2. 创立源 Kinesis Data Stream
3. 创立指标 S3 Bucket
4. 执行窗口函数查问
5. 将后果写入指标
"""

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.window import Tumble
import os
import json

# 1. 创立 Table Environment
env_settings = (EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
statement_set = table_env.create_statement_set()


APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"


def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at"{}"was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]


def create_source_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)


def create_sink_table(table_name, bucket_name):
    return """ CREATE TABLE {0} (ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                  'connector'='filesystem',
                  'path'='s3a://{1}/',
                  'format'='csv',
                  'sink.partition-commit.policy.kind'='success-file',
                  'sink.partition-commit.delay' = '1 min'
              ) """.format(table_name, bucket_name)


def count_by_word(input_table_name):
    # 应用 Table API
    input_table = table_env.from_path(input_table_name)

    tumbling_window_table = (
        input_table.window(Tumble.over("1.minute").on("event_time").alias("one_minute_window")
        )
        .group_by("ticker, one_minute_window")
        .select("ticker, price.avg as price, one_minute_window.end as event_time")
    )

    return tumbling_window_table


def main():
    # KDA 应用程序属性键
    input_property_group_key = "consumer.config.0"
    sink_property_group_key = "sink.config.0"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    output_sink_key = "output.bucket.name"

    # 输入输出数据表
    input_table_name = "input_table"
    output_table_name = "output_table"

    # 获取 KDA 应用程序属性
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, sink_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_bucket_name = output_property_map[output_sink_key]

    # 2. 创立源 Kinesis Data Stream
    table_env.execute_sql(
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 3. 创立指标 S3 Bucket
    create_sink = create_sink_table(output_table_name, output_bucket_name)
    table_env.execute_sql(create_sink)

    # 4. 执行窗口函数查问
    tumbling_window_table = count_by_word(input_table_name)

    # 5. 将后果写入指标
    tumbling_window_table.execute_insert(output_table_name).wait()

    statement_set.execute()


if __name__ == "__main__":
    main()

因为应用程序要应用到 Amazon Kinesis Flink SQL Connector,这里须要将对应的 amazon-kinesis-sql-connector-flink-2.0.3.jar 下载下来。

  • amazon-kinesis-sql-connector-flink-2.0.3.jar
    https://repo1.maven.org/maven…

将 kda-pyflink-demo.py 和 amazon-kinesis-sql-connector-flink-2.0.3.jar 打包成 zip 文件,例如 kda-pyflink-demo.zip/;而后,将这个 zip 包上传到刚刚创立的 Amazon S3 存储桶中。进入刚刚创立的 Amazon S3 存储桶,点击“上传”。

抉择刚刚打包好的 zip 文件,而后点击“上传”。

创立 Python Amazon Kinesis Data Analytics 应用程序

首先抉择到 Amazon Kinesis 服务 - Data Analytics,而后点击“创立应用程序”

输出应用程序名称,例如“kda-pyflink-demo”;运行时抉择 Apache Flink,放弃默认 1.11 版本。

拜访权限放弃默认,例如“创立 / 更新 Amazon IAM 角色 kinesis-analytics-kda-pyflink-demo-cn-north-1”;应用程序设置的模板抉择“开发”,留神这里是为了演示,能够依据理论状况抉择“生产”。

点击“创立应用程序”,稍等片刻,应用程序创立实现。

依据提醒,咱们持续配置应用程序,点击“配置”;代码地位配置为刚刚创立的 Amazon S3 中的 zip 包地位。

而后开展属性配置。

创立属性组,设置组名为“consumer.config.0”,并配置以下键值对:

input.stream.name 为刚刚创立的 Amazon Kinesis 数据流,例如 kda-input-stream

aws.region 为以后区域,这里是 cn-north-1 flink.stream.initpos 设置读取流的地位,配置为 LATEST

创立属性组,设置组名为“sink.config.0”,并配置以下键值对:
output.bucket.name 为刚刚创立的 Amazon S3 存储桶,例如 kda-pyflink-shtian

创立属性组,设置组名为“kinesis.analytics.flink.run.options”,并配置以下键值对:
python 为刚刚创立的 PyFlink 程序,kda-pyflink-demo.py

jarfile 为 Amazon Kinesis Connector 的名称,这里是 amazon-kinesis-sql-connector-flink-2.0.3.jar

而后点击“更新”,刷新应用程序配置

接下来,配置应用程序应用的 Amazon IAM 角色的权限。进入到 Amazon IAM 界面,抉择角色,而后找到刚刚新创建的角色。

而后,开展附加策略,并点击“编辑策略”。

补充最初两段 Amazon IAM 策略,容许该角色能够拜访 Amazon Kinesis 数据流和 Amazon S3 存储桶,留神须要替换为您的亚马逊云科技中国区账号。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadCode",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws-cn:s3:::kda-pyflink-shtian/kda-pyflink-demo.zip"]
        },
        {
            "Sid": "ListCloudwatchLogGroups",
            "Effect": "Allow",
            "Action": ["logs:DescribeLogGroups"],
            "Resource": ["arn:aws-cn:logs:cn-north-1:012345678901:log-group:*"]
        },
        {
            "Sid": "ListCloudwatchLogStreams",
            "Effect": "Allow",
            "Action": ["logs:DescribeLogStreams"],
            "Resource": ["arn:aws-cn:logs:cn-north-1:012345678901:log-group:/aws/kinesis-analytics/kda-pyflink-demo:log-stream:*"]
        },
        {
            "Sid": "PutCloudwatchLogs",
            "Effect": "Allow",
            "Action": ["logs:PutLogEvents"],
            "Resource": ["arn:aws-cn:logs:cn-north-1:012345678901:log-group:/aws/kinesis-analytics/kda-pyflink-demo:log-stream:kinesis-analytics-log-stream"]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws-cn:kinesis:cn-north-1:012345678901:stream/kda-input-stream"
        },
        {
            "Sid": "WriteObjects",
            "Effect": "Allow",
            "Action": [
                "s3:Abort*",
                "s3:DeleteObject*",
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws-cn:s3:::kda-pyflink-shtian",
                "arn:aws-cn:s3:::kda-pyflink-shtian/*"
            ]
        }
    ]
}

回到 Amazon Kinesis Data Analytics 利用程序界面,点击“运行”。

点击“关上 Apache Flink 控制面板”,跳转到 Flink 的界面。

点击查看正在运行的工作。

您能够依据需要进一步查看详细信息。上面,咱们到 Amazon S3 中验证数据是否曾经写入,进入到创立的存储桶,能够看到数据曾经胜利写入。

应用 Amazon Glue 对数据进行爬取

进入到 Amazon Glue 服务界面,抉择爬网程序,点击“增加爬网程序“,输出爬网程序名称。

放弃源类型不变,增加数据存储为创立的 Amazon S3 存储桶输入门路。

抉择已有角色或者创立一个新的角色。

抉择默认数据库,能够依据需要增加表前缀。

创立实现后,点击执行。

爬取胜利后,能够在数据表中查看到详细信息。

而后,能够切换到 Amazon Athena 服务来查问后果。

注:如果呈现 Amazon Glue 爬网程序或者 Amazon Athena 查问权限谬误,可能是因为开启 Lake Formation 导致,能够参考文档授予角色相应的权限。

  • 文档
    https://docs.aws.amazon.com/l…

小结

本文首先介绍了在亚马逊云科技平台上应用 Apache Flink 的疾速形式 – Amazon Kinesis Data Analytics for Flink,而后通过一个无服务器架构的示例演示了如何在 Amazon Kinesis Data Analytics for Flink 通过 PyFlink 实现 Python 流数据处理和剖析,并通过 Amazon Glue 和 Amazon Athena 对数据进行即席查问。Amazon Kinesis Data Analytics for Flink 对 Python 的反对也曾经在在光环新网经营的亚马逊云科技中国(北京)区域及西云数据经营的亚马逊云科技中国(宁夏)区域上线,欢送应用。

参考资料

1.https://aws.amazon.com/soluti…
2.https://docs.aws.amazon.com/l…
3.https://docs.aws.amazon.com/k…
4.https://ci.apache.org/project…

相干浏览

本篇作者

史天
亚马逊云科技解决方案架构师

领有丰盛的云计算、大数据和机器学习教训,目前致力于数据迷信、机器学习、无服务器等畛域的钻研和实际。译有《机器学习即服务》《基于 Kubernetes 的 DevOps 实际》《Prometheus 监控实战》等。

退出移动版