共计 8985 个字符,预计需要花费 23 分钟才能阅读完成。
及时取得见解并对从您的企业和应用程序接管的新信息做出快速反应的最佳形式是剖析 流数据。这些数据通常必须按程序和以增量形式按记录或者通过滑动工夫窗口进行解决,并且可用于各种剖析,包含关联、聚合、筛选和采样。
- 流数据
https://aws.amazon.com/stream…
为了更轻松地剖析流数据,最近,咱们非常高兴推出 Amazon Kinesis Data Analytics Studio。
当初,通过 Amazon Kinesis 控制台 ,您能够抉择 Kinesis 数据流,并且只需单击一下即可启动由 Apache Zeppelin 和 Apache Flink 提供反对的 Kinesis Data Analytics Studio 笔记本,以便交互式剖析流中的数据。同样,您能够在 Amazon Managed Streaming for Apache Kafka 控制台 中抉择集群,以启动笔记本来剖析 Apache Kafka 流中的数据。您还能够从 Kinesis Data Analytics Studio 控制台 中启动笔记本并连贯到自定义源。
- Amazon Kinesis Data Analytics Studio
https://aws.amazon.com/kinesi… - Amazon Kinesis 控制台
https://console.aws.amazon.co… - Kinesis 数据流
https://aws.amazon.com/kinesi… - Apache Zeppelin
https://zeppelin.apache.org/ - Apache Flink
https://flink.apache.org/ - Amazon Managed Streaming for Apache Kafka
https://aws.amazon.com/msk/ - 控制台
https://console.aws.amazon.co… - Apache Kafka
https://kafka.apache.org/ - Kinesis Data Analytics Studio 控制台
https://console.aws.amazon.co…
在笔记本中,您能够应用 SQL 查问 和 Python 或 Scala 程序 与流数据交互并立刻取得后果。如果您对后果称心,则只需单击几下,您即可将代码晋升至大规模、牢靠运行地生产流解决应用程序,无需额定的开发工作。
📢想要理解更多亚马逊云科技 最新技术公布 和实际翻新 ,敬请关注在 上海、北京、深圳三地 举办的2021 亚马逊云科技中国峰会!点击图片报名吧~
对于新我的项目,咱们建议您应用新的 Kinesis Data Analytics Studio,而不是 Kinesis Data Analytics for SQL 应用程序。Kinesis Data Analytics Studio 将 易用性 和高级剖析性能 相结合,这使得能够在 几分钟内疾速构建简单的流解决应用程序。咱们来看看这些步骤的实际操作。
- Kinesis Data Analytics for SQL 应用程序
https://docs.aws.amazon.com/k…
应用 Kinesis Data Analytics Studio 剖析流数据
我想要更好地理解某些传感器发送给 Kinesis 数据流的数据。
为了模仿此工作负载,我应用此 random_data_generator.py
脚本。您无需晓得 Python 即可应用 Kinesis Data Analytics Studio。实际上,我将在以下步骤中应用 SQL。此外,您还能够防止任何编码工作,并应用 Amazon Kinesis 数据生成器 用户界面 (UI) 将测试数据发送 Kinesis Data Streams 或 Kinesis Data Firehose。我将应用 Python 脚本来更精密地管制正在发送的数据。
1import datetime
2import json
3import random
4import boto3
5
6STREAM_NAME = "my-input-stream"
7
8
9def get_random_data():
10 current_temperature = round(10 + random.random() * 170, 2)
11 if current_temperature > 160:
12 status = "ERROR"
13 elif current_temperature > 140 or random.randrange(1, 100) > 80:
14 status = random.choice(["WARNING","ERROR"])
15 else:
16 status = "OK"
17 return {18 'sensor_id': random.randrange(1, 100),
19 'current_temperature': current_temperature,
20 'status': status,
21 'event_time': datetime.datetime.now().isoformat()
22 }
23
24
25def send_data(stream_name, kinesis_client):
26 while True:
27 data = get_random_data()
28 partition_key = str(data["sensor_id"])
29 print(data)
30 kinesis_client.put_record(
31 StreamName=stream_name,
32 Data=json.dumps(data),
33 PartitionKey=partition_key)
34
35
36if __name__ == '__main__':
37 kinesis_client = boto3.client('kinesis')
38 send_data(STREAM_NAME, kinesis_client)
39
40Python
- Amazon Kinesis 数据生成器
https://awslabs.github.io/ama… - Kinesis Data Streams
https://aws.amazon.com/kinesi… - Kinesis Data Firehose
https://aws.amazon.com/kinesi…
此脚本应用 JSON 语法将随机记录发送给我的 Kinesis 数据流。例如:
1{'sensor_id': 77, 'current_temperature': 93.11, 'status': 'OK', 'event_time': '2021-05-19T11:20:00.978328'}
2{'sensor_id': 47, 'current_temperature': 168.32, 'status': 'ERROR', 'event_time': '2021-05-19T11:20:01.110236'}
3{'sensor_id': 9, 'current_temperature': 140.93, 'status': 'WARNING', 'event_time': '2021-05-19T11:20:01.243881'}
4{'sensor_id': 27, 'current_temperature': 130.41, 'status': 'OK', 'event_time': '2021-05-19T11:20:01.371191'}
从 Kinesis 控制台 中,我抉择了一个 Kinesis 数据流 (my-input-stream
) 并从 Process(解决)下拉菜单中抉择 Process data in real time(实时处理)。通过这种形式,流被配置为笔记本的源。
- Kinesis 控制台
https://awsc-integ.aws.amazon…
而后,在上面的对话框中,我将创立一个 Apache Flink – Studio 笔记本。
我为该笔记本输出了一个名称 (my-notebook
) 和形容。从我之前抉择的 Kinesis 数据流 (my-input-stream) 读取的 Amazon Identity and Access Management (IAM) 权限将会主动附加到笔记本负责的 IAM 角色。
- Amazon Identity and Access Management (IAM)
https://aws.amazon.com/iam/
我抉择 Create(创立)以关上 Amazon Glue 控制台 并创立一个空的数据库。返回 Kinesis Data Analytics Studio 控制台,我刷新了列表并抉择新的数据库。它将定义我的源和指标的元数据。从这里,我还能够查看默认的 Studio 笔记本设置。而后,我抉择 Create Studio notebook(创立 Studio 笔记本)。
创立笔记本之后,我抉择 Run(运行)。
- Amazon Glue 控制台
https://console.aws.amazon.co…
在笔记本运行时,我抉择 Open in Apache Zeppelin(在 Apache Zeppelin 中关上)以获取该笔记本的拜访权限,并以 SQL、Python 或 Scala 编写代码以与我的流数据交互并实时取得见解。
在笔记本中,我创立了一个新的笔记并将其称之为Sensors
. 而后,我创立了一个 sensor_data
表,用于形容流中的数据的格局:
1%flink.ssql
2
3CREATE TABLE sensor_data (
4 sensor_id INTEGER,
5 current_temperature DOUBLE,
6 status VARCHAR(6),
7 event_time TIMESTAMP(3),
8 WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
9)
10PARTITIONED BY (sensor_id)
11WITH (
12 'connector' = 'kinesis',
13 'stream' = 'my-input-stream',
14 'aws.region' = 'us-east-1',
15 'scan.stream.initpos' = 'LATEST',
16 'format' = 'json',
17 'json.timestamp-format.standard' = 'ISO-8601'
18)
19
20SQL
上一个命令中的第一行告知 Apache Zeppelin 为 Apache Flink 解释器提供流 SQL 环境 (%flink.ssql)。此外,我还能够应用批量 SQL 环境 (%flink.ssql)、Python (%flink.pyflink
) 或 Scala (%flink) 代码与流数据进行交互。
对于应用过 SQL 和数据库的用户来说,CREATE TABLE
语句的的一部分十分相熟。将会创立一个表用于存储流中的数据。WATERMARK
(水印)选项用于以事件工夫掂量进度,如 Apache Flink 文档的“事件工夫和水印”局部所述。
- Apache Flink 文档的“事件工夫和水印”局部
https://ci.apache.org/project…
CREATE TABLE
语句的第二局部形容用于接管表中的数据(如 kinesis
或 kafka
)、流名称、 亚马逊云科技区域 、流的整体数据格式(如 json 或 csv)和工夫戳应用的语法(在本例中为 ISO 8601)的连接器。我也能够抉择解决流的 起始地位 ,我首先将会应用 LATEST
读取最新的数据。
- 亚马逊云科技区域:
https://aws.amazon.com/about-… - ISO 8601:
https://en.wikipedia.org/wiki… - 起始地位:
https://docs.aws.amazon.com/k…
表就绪后,我将会在创立笔记本时抉择的 Amazon Glue 数据目录 数据库中找到它:
- Amazon Glue 数据目录
https://docs.aws.amazon.com/g…
当初,我能够在 sensor_data
表中运行 SQL 查问,并应用滑动或翻滚窗口来更好地理解我的传感器中产生的状况。
为了概括理解流中的数据,我将会从简略的 SELECT
动手以获取 sensor_data
表中的所有内容:
1%flink.ssql(type=update)
2
3SELECT * FROM sensor_data;
4
5SQL
此时,第一行命令中蕴含一个参数 (type=update
),因而当新的数据到达时 SELECT
的输入(不只一行)将继续更新。
在我的笔记本终端上,我启动 random_data_generator.py
脚本:
1$ python3 random_data_generator.py
首先,我会看到一个蕴含数据的表。为了更好地理解,我抉择了 bar graph(条形图)视图。而后,我按 状态
对后果进行了分组,以查看其均匀 current_temperature
,如下所示:
正如我生成这些后果所预期的一样,我会看到不同的平均温度,具体取决于 状态
(OK
(失常)、WARNING
(正告)或 ERROR
(谬误))。温度越高,传感器中呈现不失常工作的可能性越大。
我能够应用 SQL 语法显式运行聚合查问。这一次,我想要在 1 分钟的滑动窗口计算结果,并且后果每 10 秒钟更新一次。为此,我在 SELECT 语句的 GROUP BY
局部中应用 HOP 函数。要将工夫增加到抉择的输入,我应用 HOP_ROWTIME
函数。无关更多信息,请参阅 Apache Flink 文档中的“组窗口聚合工作原理”。
-
Apache Flink 文档中的“组窗口聚合工作原理”
https://ci.apache.org/project…1%flink.ssql(type=update) 2 3SELECT sensor_data.status, 4 COUNT(*) AS num, 5 AVG(sensor_data.current_temperature) AS avg_current_temperature, 6 HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time 7 FROM sensor_data 8 GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status; 9 10SQL
此时,我将查看以表格局显示的后果:
为了将查问后果发送至指标流,我创立了一个表并将该表连贯到流。首先,我须要授予笔记本电脑写入流的权限。
在 Kinesis Data Analytics Studio 控制台 中,我抉择了 my-notebook
。而后,在 Studio notebooks details(Studio 笔记本详细信息)局部,我抉择了 Edit IAM permissions(编辑 IAM 权限)。在这里,我能够配置笔记本应用的源和指标,并且 IAM 角色权限将会自动更新。
- Kinesis Data Analytics Studio 控制台
https://awsc-integ.aws.amazon…
在 Included destinations in IAM policy(IAM 策略中所含的指标)局部,我抉择了指标并抉择 my-output-stream。保留更改并期待笔记本更新。当初,我将开始应用指标流。
在笔记本中,我创立了一个连贯到 my-output-stream
的 sensor_state
表。
1%flink.ssql
2
3CREATE TABLE sensor_state (4 status VARCHAR(6),
5 num INTEGER,
6 avg_current_temperature DOUBLE,
7 hop_time TIMESTAMP(3)
8)
9WITH (
10'connector' = 'kinesis',
11'stream' = 'my-output-stream',
12'aws.region' = 'us-east-1',
13'scan.stream.initpos' = 'LATEST',
14'format' = 'json',
15'json.timestamp-format.standard' = 'ISO-8601');
16
17SQL
当初,我将应用此 INSERT INTO
语句来将抉择的后果继续插入到 sensor_state
表中。
1%flink.ssql(type=update)
2
3INSERT INTO sensor_state
4SELECT sensor_data.status,
5 COUNT(*) AS num,
6 AVG(sensor_data.current_temperature) AS avg_current_temperature,
7 HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
8FROM sensor_data
9GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;
10
11SQL
这些数据还会发送至指标 Kinesis 数据流 (my-output-stream
),以供其余应用程序应用。例如,指标流中的数据可用于更新实时仪表板,或者在软件更新后监控我的传感器的行为。
我对后果十分称心。我想要将此查问及其输入部署为 Kinesis Analytics 应用程序。
首先,我须要在我的笔记本中创立一个 SensorsApp
笔记,并复制我想要作为应用程序的一部分执行的语句。表已创立结束,因而,我只需复制下面的 INSERT INTO 语句。
而后,从我的笔记本右上角的菜单中,抉择 Build SensorsApp and export to Amazon S3(构建 SensorsApp 并导出到 Amazon S3),而后确认应用程序名称。
导出就绪后,我在雷同菜单中选择 Deploy SensorsApp as Kinesis Analytics application(将 SensorsApp 部署为 Kinesis Analytics 应用程序)。之后,我对应用程序的配置进行了微调。我将 parallelism(平行度)设为 1,因为我的输出 Kinesis 数据流中只有一个分片,并没有太多流量。而后,我运行应用程序,无需编写任何代码。
- parallelism
https://docs.aws.amazon.com/k…
从 Kinesis Data Analytics 应用程序控制台 中,抉择 Open Apache Flink dashboard(关上 Apache Flink 仪表板),以理解无关应用程序的执行状况的更多信息。
- Kinesis Data Analytics 应用程序控制台
https://awsc-integ.aws.amazon…
可用性和定价
当初,您能够在正式推出 Kinesis Data Analytics 的所有 Amazon 区域 中应用 Amazon Kinesis Data Analytics Studio。无关更多信息,请参阅 Amazon 区域服务列表。
- Amazon 区域服务列表
https://aws.amazon.com/about-… - Amazon Kinesis Data Analytics Studio
https://aws.amazon.com/kinesi… - Amazon 区域服务列表
https://aws.amazon.com/about-…
在 Kinesis Data Analytics Studio 中,咱们运 Apache Zeppelin 和 Apache Flink 的开源版本,并且咱们在上游做出了更改。例如,咱们对 Apache Zeppelin 进行了谬误修复,并且咱们为 Apache Flink 提供了 Amazon 连接器,如实用于 Kinesis Data Streams 和 Kinesis Data Firehose 的连接器。此外,咱们将与 Apache Flink 社区携手单干,以进步可用性,包含对运行时谬误主动进行分类,以理解是用户代码还是应用程序基础架构中存在谬误。
- Apache Zeppelin
https://zeppelin.apache.org/ - Apache Flink
https://flink.apache.org/
应用 Kinesis Data Analytics Studio 时,您须要依据每小时的均匀 Kinesis 处理单元 (KPU)(包含正在运行的笔记本应用的 KPU)数量付费。一个 KPU 蕴含 1 个计算 vCPU、4 GB 内存和关联的网络。您还须要为正在运行的应用程序存储以及长久应用程序存储付费。无关更多信息,请参阅 Kinesis Data Analytics 定价页面。
- Kinesis Data Analytics 定价页面
https://aws.amazon.com/kinesi…
立刻开始应用 Kinesis Data Analytics Studio,以更好地理解您的流数据。