本文依据 Flink Forward 寰球在线会议 · 中文精华版整顿而成,由阿里巴巴计算平台事业部资深算法专家杨旭 (品数) 分享。本文次要介绍了 Alink 从发表开源到当初,最近半年来的停顿状况,重点分享了 Alink 的一些个性、原理、应用技巧等,为大家应用 Alink 进行开发提供了参考。
Alink 停顿总览
Alink 到目前曾经公布了四个 Release 版本:
- Alink version 1.0:2019 年 11 月在 Flink Forword Asia 大会上发表开源。
- Alink version 1.0.1:于 2019 年 12 月公布,次要解决一些场景下 PyAlink 的装置问题。在此期间也出了一系列的开发文章,包含 Alink 环境搭建,入门示例等,为大家应用 Alink 第一步提供了领导。
- Alink version 1.1.0:于 2020 年 02 月公布,在 Flink 公布 1.10 版本后,Alink 第一工夫做了兼容,目前 Alink 反对 Flink 1.10 和 Flink 1.9,PyAlink 也兼容 PyFlink。此外,从这个版本开始,Alink 曾经公布到 Maven 地方仓库和 PyPI。这样,Maven 工程中应用 Alink,只须要在 POM 文件中引入 Alink 的相干依赖就能够了,无需本人手动编译,打包装置。Python 环境则能够借助 PyPI 仓库,进行 Alink 的装置。
- Alink version 1.1.1:于 2020 年 04 月公布,次要是晋升了应用体验,晋升了性能。
Alink 倒退之路
上图是 Alink 在公布 1.0 版本的时候,所有的算法以及性能,简略来说,Alink 的批式性能是和 SparkML 对应的,SparkML 有的性能,Alink 根本都提供了。相较于 SparkML,除了批式的性能,Alink 还提供了流式的性能。
Alink 在近半年,性能上整体没有大的变动,上面列举一些正在研发测试,行将开源的一些性能:
- 提供更多数据处理,特色工程相干性能,在小版本就会陆续推出。
- 经典的分类和回归问题上,次要为两个方面:一是对已有模型,咱们将会披露 更多模型外部信息 ,让大家对模型有更多的理解,而不仅仅只是拿模型来进行预测,二是FM 系列算法 的推出。
- 关联规定 & 协同过滤,在协同过滤举荐问题上,SparkML 次要提供的是 ALS,它能够解决一些举荐的问题,然而理论使用过程中,仅仅应用 ALS 是不够的,后续 Alink 将推出更多举荐类的算法。
- 在线学习,在 1.0 公布的时候,曾经提供了在线学习的性能,但在理论利用场景中,用户心愿在线学习可能变得更加灵便,后续的版本中将会对这部分进行增强。
重要个性介绍
在本章,将依照版本的公布程序,逐渐介绍 Alink 的个性,设计原理,以及应用技巧等内容。
1.Alink version 1.1.0
■ 程序构建
从 Alink 1.1.0 开始,应用 Maven 地方仓库即可构建 Alink 我的项目,上面是 POM 文件示例。Flink 1.10 版本依赖:
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.10_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
Flink 1.9 版本依赖:
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.9_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>
■ 环境装置实际
- 筹备环节
次要是 Python 环境搭建,以及 JAVA 8 的装置,Python 环境的搭建咱们举荐装置 Anaconda3,能够对 Python 的版本进行灵便的管制。不同操作系统的环境筹备,请参考上面的教程:
MacOS:https://zhuanlan.zhihu.com/p/…
Linux 环境:https://zhuanlan.zhihu.com/p/…
Windows:https://zhuanlan.zhihu.com/p/…
- PyAlink 装置
从 1.1.0 开始,Alink 曾经公布到了 PyPI,装置更加不便了,请参考如下链接:
如何装置最新版本 PyAlink?
https://zhuanlan.zhihu.com/p/…
- PyAlink 卸载
如果之前装置过 PyAlink,因为之前版本咱们是手动装置的,在降级到新版本时,可能会遇到一些问题,因而须要将其卸载,能够参考上面的文章:
PyAlink 的版本查问、卸载旧版本:
https://zhuanlan.zhihu.com/p/…
■ Notebook 开发实际
在讲 Notebook 示例之前,咱们先来理解一下 PyAlink 的设计背景:
- 在机器学习利用开发过程中,咱们常常会先在批式环境进行模型训练,而后在流式的环境利用模型,从批式环境转换到流式环境,往往须要重写代码,无奈做到代码的复用。Alink 设计之初,就心愿尽量将批和流之间的差别变得最小,比方,批上做完后,只须要将 Batch 字样改成 Stream 字样就能够运行。
- 机器学习开发的过程,咱们个别是心愿越快越好,越麻利越好。其实在本机上开发,体验是最好的,个别做法是,在本机小数据规模上进行验证,而后上到集群上进行成果的评估。然而本机环境往集群环境迁徙,并不容易,咱们心愿这个过程有一个好的体验,不必去编写大量的代码。
基于这样的设计背景,咱们来看下一当初 Notebook 上进行 Alink 开发的实际。
- PyAlink 批式工作在 Notebook 上运行
本地 运行代码示例:
from pyalink.alink import *
## 一个 Batch 作业的例子
useLocalEnv(2)
## prepare data
import numpy as np
import pandas as pd
data = np.array([[0, 0.0, 0.0, 0.0],
[1, 0.1, 0.1, 0.1],
[2, 0.2, 0.2, 0.2],
[3, 9, 9, 9],
[4, 9.1, 9.1, 9.1],
[5, 9.2, 9.2, 9.2]
])
df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})
inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')
FEATURE_COLS = ["f0", "f1", "f2"]
VECTOR_COL = "vec"
PRED_COL = "pred"
vectorAssembler = (VectorAssembler()
.setSelectedCols(FEATURE_COLS)
.setOutputCol(VECTOR_COL)
)
kMeans = (KMeans()
.setVectorCol(VECTOR_COL)
.setK(2)
.setPredictionCol(PRED_COL)
)
pipeline = Pipeline().add(vectorAssembler).add(kMeans)
pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()
集群 运行代码示例:
from pyalink.alink import *
## 一个 Batch 作业的例子
useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False)
## prepare data
import numpy as np
import pandas as pd
data = np.array([[0, 0.0, 0.0, 0.0],
[1, 0.1, 0.1, 0.1],
[2, 0.2, 0.2, 0.2],
[3, 9, 9, 9],
[4, 9.1, 9.1, 9.1],
[5, 9.2, 9.2, 9.2]
])
df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})
inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')
FEATURE_COLS = ["f0", "f1", "f2"]
VECTOR_COL = "vec"
PRED_COL = "pred"
vectorAssembler = (VectorAssembler()
.setSelectedCols(FEATURE_COLS)
.setOutputCol(VECTOR_COL)
)
kMeans = (KMeans()
.setVectorCol(VECTOR_COL)
.setK(2)
.setPredictionCol(PRED_COL)
)
pipeline = Pipeline().add(vectorAssembler).add(kMeans)
pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()
咱们能够看到本地和近程代码上的差异,就只有第 4 行代码不一样,本地应用的是 useLocalEnv(2),近程应用的是 useRemoteEnv(“10.101..”, 31805, 2, shipAlinkAlgoJar=False)。相较于本地环境,集群环境须要指定 Flink 集群的 ip 地址和端口。
- PyAlink 流式工作在 Notebook 上运行
本地 运行示例:
from pyalink.alink import *
## 一个 Stream 作业的例子
## 惟一参数示意并行度
useLocalEnv(2)
source = CsvSourceStreamOp() \
.setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \
.setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")
source.print()
StreamOperator.execute()
集群 运行示例:
from pyalink.alink import *
## 一个 Stream 作业的例子
useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False, localIp="30.39.**.**")
source = CsvSourceStreamOp() \
.setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \
.setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")
source.print()
StreamOperator.execute()
咱们能够看到,流式工作的运行,本地和集群上的代码也只有设置运行环境这一行(第 4 行)代码有差异,其余的代码都是一样的。
集群运行模式中,流式工作和批式工作的设置有点差异,流式的工作须要指定本地的 IP 地址(localIp),咱们应用 Notebook 进行交互式开发时,个别须要看到运行后果,批式工作应用 Flink 现有的机制,是能够间接看到运行后果的。然而流式工作数据流是无边界的,为了将流工作的运行后果返回回来,让用户能够实时看到,咱们独自建设了一个通路进行数据传输,因而咱们须要设置这个本地的 IP 地址,和集群进行交互。当然,这个本地 IP 地址的参数,在 4 月份的版本(Alink 1.1.1 版本)中,曾经能够自动检测到了,能够省略掉了。
■ PyAlink 基于 PyFlink 整合
本节中,重点介绍两点 PyAlink 和 PyFlink 的兼容个性。
- 数据的连通性:Alink 算子的输入输出,实质上是 Flink 的 Table 格局,PyFlink 其实也是 Flink Table,这样 Alink Operator 与 PyFlink 两个就能够互相转化,而且转换的代价十分小,并不波及到数据的重写。有了这种转换,Alink 和 PyFlink 两边的性能就能够混用,不便串联 Flink 和 Alink 的工作流。
上面是一段 Alink 和 PyFlink 代码混用的示例:
### get_mlenv.py
from pyalink.alink import *
env, btenv, senv, stenv = getMLEnv()
### 应用 PyFlink 接口,与 Table 进行互转
table = stenv.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
source = TableSourceStreamOp(table)
source.print()
StreamOperator.execute()
咱们能够看到,应用 PyFlink 构建 Table(第 5 行),能够间接转化为 PyAlink 的数据源算子(第 6 行)。
- 新提供了 getMLEnv 接口 ,能间接应用 flink run -py *.py 往集群提交作业。除了 Notebook 交互式运行 Alink 工作这种形式,对于定时调度的工作,咱们须要一次性提交工作,PyFlink 在这方面反对十分好,PyAlink 和 PyFlink 兼容后,咱们能够达到和 PyFlink 提交工作一样的应用体验。只是有一点不同,在获取运行环境的时候,须要改为调用 getMLEnv() 办法,这个办法会返回 env, btenv, senv, stenv 四个运行环境,这样 PyAlink 的工作能够交给 PyFlink,执行 PyFlink 相干操作。
上面是提交工作的示例:
### 间接运行脚本
python kmeans.py
### 向集群提交作业
PYFLINK_PATH=`python -c "import pyflink;print(pyflink.__path__[0])"`
${PYFLINK_PATH}/bin/flink run -m 10.101.**.**:31805 -py kmeans.py -p 4
■ 读写 Kafka
咱们在 Flink Kafka Connector 根底上,为 Kafka 的输入输出包装了 Source 和 Sink 组件,让大家读写 Kafka 数据更加不便。上面是一个从数据读入,数据解析,对数据进行逻辑回归预测,将后果写入 Kafka 的工作的代码示例:
### 读取数据
data = KafkaSourceStreamOp()\
.setBootstrapServers("localhost:9092")\
.setTopic("iris")\
.setStartupMode("EARLIEST")\
.setGroupId("alink_group")
### 解析 JSON 数据
json_parser = JsonValueStreamOp()
.setSelectedCol("message")
.setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"])
.setJsonPath(["$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width", "$.category"])
data = data.link(json_parser)
### 数据类型转换
data = data.select( \
"CAST(sepal_length AS DOUBLE) AS sepal_length,"
+ "CAST(sepal_width AS DOUBLE) AS sepal_width,"
+ "CAST(petal_length AS DOUBLE) AS petal_length,"
+ "CAST(petal_width AS DOUBLE) AS petal_width, category")
### 读取本地模型文件
model = CsvSourceBatchOp().setFilePath("/path/to/model.csv") \
.setSchemaStr("model_id bigint, model_info string, label_type string")
### 构建逻辑回归预测模型
lr_predictor = LogisticRegressionPredictStreamOp(model) \
.setPredictionCol("pred").setPredictionDetailCol("pred_detail")
### 对数据进行预测
result = data.link(lr_predictor)
### 后果输入到 Kafka
sink = KafkaSinkStreamOp() \
.setBootstrapServers("localhost:9092") \
.setDataFormat("json").setTopic("lr_pred")
data.link(sink)
StreamOperator.execute()
从代码中能够看出,数据通过 KafkaSourceStreamOp 组件读入,通过 JSON Path 解析数据,解析出的值,是 String 类型,再依据数据的理论类型,应用 CAST 函数进行类型转换,而后通过加载本地训练好的模型,构建逻辑回归预测组件,对数据进行预测,最初将后果通过 KafkaSinkStreamOp 组件输入到 Kafka。
咱们也能够看到,在 Alink 1.1.0 这个版本中,数据的解析还是有一点麻烦,在本文前面的局部还会介绍对数据解析局部的简化,让整个流程更简洁。
2.Alink version 1.1.1
本章开始,咱们将具体介绍 Alink 1.1.1 版本的一些优化的点,以及重要个性等。
■ 优化枚举类型参数提醒
在咱们应用算法组件的时候,常常会遇到有些属性是枚举类型的,在 Python 中,个别是通过字符串输出枚举值,理论在应用的过程中,这些枚举值很难全副记住,常常须要去查问 Alink 的文档。为了咱们编写代码更加顺畅,在新版本中,咱们优化了代码的提示信息,咱们能够尝试填写一个代替值,尽管会抛异样,但在运行后果中,能够看到枚举值的明确提醒。
以卡方筛选算子为例,卡方筛选算子的 SelectorType 能够填写 NumTopFeatures,Percentil,FPR 等,是枚举类型变量,咱们如果应用 ’aaa’ 值代替,看下会有什么成果,代码如下:
### Python 代码
selector = ChiSqSelectorBatchOp()\
.setSelectorType("aaa")\
.setSelectedCols(["f_string", "f_long", "f_int", "f_double"])\
.setLabelCol("f_boolean")\
.setNumTopFeatures(2)
在 Alink 1.1.1 之前的版本,会返回下图:
异样信息中打出 SelectorType 输入谬误的值 AAA,但异样信息不显著,也没有指出是哪个参数写错了。
Alink 1.1.1 中,则会呈现下图的后果:
异样信息中会有哪个参数填写谬误,以及会提醒可能的值是什么,这样咱们应用 Alink 算子的时候更加便捷。
下面是 Python 代码的枚举类型的谬误提醒,对于 JAVA 来说,有代码主动提醒,编写时会十分不便:
■ 优化列名参数提醒
咱们进行机器学习开发,算法中往往会有很多列名参数,列名输错状况很常见,如下图所示:
咱们可能将 text 字段谬误的写成了 text1,在 1.1.1 版本里,不仅会指出哪列不存在,也会提醒最可能的列名,帮忙用户做修改,见下图所示。
这样,用户能够更快的定位谬误,排查问题。JAVA 的行为也雷同:
输入提醒如下:
■ PyAlink1.1.1 改良
- 优化了 DataFrame 和 BatchOperator 互转的性能
咱们在应用 Python 时,更多是用 DataFrame 来操作数据,在应用 PyAlink 时,有一个 DataFrame 向 Alink Table 转换的过程,转换的速度会间接影响整个工作的执行时长,为了给用户一个比拟好的用户体验,咱们在转化下面,做了比拟大的性能优化。
以上面的示例代码为例:
n = 50000
users = []
for col in range(n):users.append([col] * 2)
df = pd.DataFrame(users)
source = BatchOperator.fromDataframe(df, schemaStr='id int, label int')
source.link(CsvSinkBatchOp().setOverwriteSink(True).setFilePath('temp.csv'))
BatchOperator.execute()
之前 5W 行数据须要约 55s,当初只须要 5s,当初 100w 行数据约 20s 就能够转换实现。您可能留神到从 5W 到 100W,这种晋升如同不是成程线性关系,这是因为转化的过程中,还蕴含了一些零碎开销。总之,咱们在数据转化中,曾经尽量的压缩了解决工夫,让整个工作运行更快。
- 改良流式组件的 print 性能,将不会因为数据中有 NaN 导致作业失败,进步了程序的稳定性。
- Python UDF 运行中将自动检测 python3 命令,如果环境中同时有 Python 2 和 3,将可能因为 python 命令指向 Python 2 而导致运行不胜利,在 Alink 1.1.1 版本中,将优先应用 python3 来执行 Python UDF。
- useRemoteEnv 将自动检测本机外网 IP,在个别网络配置下,应用 StreamOperator(流式工作)组件的性能,无需设置 localIp 了。
- 新增组件,将 CSV、JSON 和 KV 格局的字符串解析为多列。
上面是一组 JOSN 格局的测试数据。
{"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6}
{"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5}
{"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6}
{"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9}
{"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3}
咱们须要将其解析为下图这样的结构化数据。
Alink 1.1.1 之前,咱们可能须要编写上面这样的代码:
json_parser = JsonValueStreamOp()
.setSelectedCol("message")
.setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"])
.setJsonPath(["$.sepal_length", "$.sepal_width", data = data.link(\
JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double," + "petal_width double, category string").setReservedCols([]))
"$.petal_length", "$.petal_width", "$.category"])
data = data.link(json_parser)
data = data.select( \
"CAST(sepal_length AS DOUBLE) AS sepal_length,"
+ "CAST(sepal_width AS DOUBLE) AS sepal_width,"
+ "CAST(petal_length AS DOUBLE) AS petal_length,"
+ "CAST(petal_width AS DOUBLE) AS petal_width, category")
在 Alink 1.1.1 版本中,咱们增加了 JsonToColumnsStreamOp 组件,代码变成这样:
data = data.link(\
JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double," + "petal_width double, category string").setReservedCols([]))
咱们能够看到,代码精简了很多。
最初,介绍一个日志解析的例子,咱们晓得,日志的格局没有一个残缺的法则,不是一个 JSON 格局,也不是 KV 格局,这就须要用现有工具进行组合来解决。
上面是一段日志记录的内容:
66.249.79.35 - - [14/Jun/2018:06:45:24 +0000] "GET /img/20180504/702434-20180302101540805-554506523.jpg HTTP/1.1" 200 10013 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:25 +0000] "GET /img/20180504/702434-20180302161346635-1714710787.jpg HTTP/1.1" 200 45157 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:56 +0000] "GET /img/2018/05/21/60662344.jpg HTTP/1.1" 200 14133 "-" "Googlebot-Image/1.0"
54.36.148.129 - - [14/Jun/2018:06:46:01 +0000] "GET /archives/91007 HTTP/1.1" 200 8332 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”54.36.148.201 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/88741/feed HTTP/1.1" 200 983 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”5.255.250.200 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/87084 HTTP/1.1" 200 9951 "-" "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)”
上面是具体解析的代码:
source.select("SUBSTRING(text FROM 1 For POSITION('['in text)-2) AS part1,"
+"REGEXT_EXTRACT(text,'(\\[)(.*?)(\\])',2) AS log_time,"
+"SUBSTRING(text FROM 2+POSION(']'IN text)) AS part2"
)
.link(new CsvToColumnBatchOp()
.setSelectCol("part1")
.setFieldDelimiter(" ")
.setSchemaStr("ip string,col1 string,col2 string")
)
.link(new CsvToColumnBatchOp()
.setSelectCol("part2")
.setFieldDelimiter(" ")
.setSchemaStr("cmd string,response int,bytesize int,col3 string,col4 string")
)
.link(new CsvToColumnBatchOp()
.setSelectCol("cmd")
.setFieldDelimiter(" ")
.setSchemaStr("req_method string,url String,protocol string")
)
.select("ip,col1,col2,log_time,req_method,url,protocol,response,bytesize,col3,col4")
下面的代码思路如下:
- 首先咱们将日志依据“[]”将日志划分为三局部,能够应用 Flink 的 SUBSTRING 函数,联合正则表达式 REGEXT_EXTRACT 进行拆分。
- 别离应用 CsvToColumnBatchOp 依照空格分隔对两边的文本(part1,part2 两局部)进行解析,并指定列名。
对 cmd 这个非凡字段做进一步的解析。 - 最初,选出所有解析进去的列,实现。
Alink 相干资料汇总:
- Alink GitHub 地址:
https://github.com/alibaba/Alink - Alink 系列教程:
https://www.zhihu.com/people/…
以上。Alink 是基于 Flink 的机器学习算法平台,欢送拜访 Alink 的 GitHub 链接获取更多信息。也欢送退出 Alink 开源用户群进行交换~
▼ 钉钉扫码退出 Alink 技术交换群 ▼