本文依据 Flink Forward 寰球在线会议 · 中文精华版整顿而成,由阿里巴巴计算平台事业部资深算法专家杨旭(品数)分享。本文次要介绍了 Alink 从发表开源到当初,最近半年来的停顿状况,重点分享了 Alink 的一些个性、原理、应用技巧等,为大家应用 Alink 进行开发提供了参考。
Alink停顿总览
Alink 到目前曾经公布了四个 Release 版本:
- Alink version 1.0:2019年11月在Flink Forword Asia大会上发表开源。
- Alink version 1.0.1:于2019年12月公布,次要解决一些场景下PyAlink的装置问题。在此期间也出了一系列的开发文章,包含Alink环境搭建,入门示例等,为大家应用Alink第一步提供了领导。
- Alink version 1.1.0:于2020年02月公布,在Flink公布1.10版本后,Alink 第一工夫做了兼容,目前Alink反对Flink 1.10和Flink 1.9,PyAlink也兼容PyFlink。此外,从这个版本开始,Alink曾经公布到Maven地方仓库和PyPI。这样,Maven工程中应用Alink,只须要在POM文件中引入Alink的相干依赖就能够了,无需本人手动编译,打包装置。Python环境则能够借助PyPI仓库,进行Alink的装置。
- Alink version 1.1.1 :于2020年04月公布,次要是晋升了应用体验,晋升了性能。
Alink 倒退之路
上图是Alink在公布1.0版本的时候,所有的算法以及性能,简略来说,Alink的批式性能是和SparkML对应的,SparkML有的性能,Alink根本都提供了。相较于SparkML,除了批式的性能,Alink还提供了流式的性能。
Alink在近半年,性能上整体没有大的变动,上面列举一些正在研发测试,行将开源的一些性能:
- 提供更多数据处理,特色工程相干性能,在小版本就会陆续推出。
- 经典的分类和回归问题上,次要为两个方面:一是对已有模型,咱们将会披露更多模型外部信息,让大家对模型有更多的理解,而不仅仅只是拿模型来进行预测,二是FM系列算法的推出。
- 关联规定&协同过滤,在协同过滤举荐问题上,SparkML次要提供的是ALS,它能够解决一些举荐的问题,然而理论使用过程中,仅仅应用ALS是不够的,后续Alink将推出更多举荐类的算法。
- 在线学习,在1.0公布的时候,曾经提供了在线学习的性能,但在理论利用场景中,用户心愿在线学习可能变得更加灵便,后续的版本中将会对这部分进行增强。
重要个性介绍
在本章,将依照版本的公布程序,逐渐介绍Alink的个性,设计原理,以及应用技巧等内容。
1.Alink version 1.1.0
■ 程序构建
从Alink 1.1.0开始,应用Maven地方仓库即可构建Alink我的项目,上面是POM文件示例。Flink 1.10版本依赖:
<dependency> <groupId>com.alibaba.alink</groupId> <artifactId>alink_core_flink-1.10_2.11</artifactId> <version>1.1.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version></dependency>
Flink 1.9版本依赖:
<dependency> <groupId>com.alibaba.alink</groupId> <artifactId>alink_core_flink-1.9_2.11</artifactId> <version>1.1.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.9.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.9.0</version></dependency>
■ 环境装置实际
- 筹备环节
次要是Python环境搭建,以及JAVA 8的装置,Python环境的搭建咱们举荐装置Anaconda3,能够对Python的版本进行灵便的管制。不同操作系统的环境筹备,请参考上面的教程:
MacOS: https://zhuanlan.zhihu.com/p/...
Linux环境: https://zhuanlan.zhihu.com/p/...
Windows:https://zhuanlan.zhihu.com/p/...
- PyAlink装置
从1.1.0开始,Alink曾经公布到了PyPI,装置更加不便了,请参考如下链接:
如何装置最新版本PyAlink?
https://zhuanlan.zhihu.com/p/...
- PyAlink卸载
如果之前装置过PyAlink,因为之前版本咱们是手动装置的,在降级到新版本时,可能会遇到一些问题,因而须要将其卸载,能够参考上面的文章:
PyAlink的版本查问、卸载旧版本:
https://zhuanlan.zhihu.com/p/...
■ Notebook开发实际
在讲 Notebook 示例之前,咱们先来理解一下 PyAlink 的设计背景:
- 在机器学习利用开发过程中,咱们常常会先在批式环境进行模型训练,而后在流式的环境利用模型,从批式环境转换到流式环境,往往须要重写代码,无奈做到代码的复用。Alink设计之初,就心愿尽量将批和流之间的差别变得最小,比方,批上做完后,只须要将Batch字样改成Stream字样就能够运行。
- 机器学习开发的过程,咱们个别是心愿越快越好,越麻利越好。其实在本机上开发,体验是最好的,个别做法是,在本机小数据规模上进行验证,而后上到集群上进行成果的评估。然而本机环境往集群环境迁徙,并不容易,咱们心愿这个过程有一个好的体验,不必去编写大量的代码。
基于这样的设计背景,咱们来看下一当初Notebook上进行Alink开发的实际。
- PyAlink 批式工作在 Notebook 上运行
本地运行代码示例:
from pyalink.alink import *## 一个 Batch 作业的例子useLocalEnv(2)## prepare dataimport numpy as npimport pandas as pddata = np.array([ [0, 0.0, 0.0, 0.0], [1, 0.1, 0.1, 0.1], [2, 0.2, 0.2, 0.2], [3, 9, 9, 9], [4, 9.1, 9.1, 9.1], [5, 9.2, 9.2, 9.2]])df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')FEATURE_COLS = ["f0", "f1", "f2"]VECTOR_COL = "vec"PRED_COL = "pred"vectorAssembler = ( VectorAssembler() .setSelectedCols(FEATURE_COLS) .setOutputCol(VECTOR_COL))kMeans = ( KMeans() .setVectorCol(VECTOR_COL) .setK(2) .setPredictionCol(PRED_COL))pipeline = Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()
集群运行代码示例:
from pyalink.alink import *## 一个 Batch 作业的例子useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False)## prepare dataimport numpy as npimport pandas as pddata = np.array([ [0, 0.0, 0.0, 0.0], [1, 0.1, 0.1, 0.1], [2, 0.2, 0.2, 0.2], [3, 9, 9, 9], [4, 9.1, 9.1, 9.1], [5, 9.2, 9.2, 9.2]])df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')FEATURE_COLS = ["f0", "f1", "f2"]VECTOR_COL = "vec"PRED_COL = "pred"vectorAssembler = ( VectorAssembler() .setSelectedCols(FEATURE_COLS) .setOutputCol(VECTOR_COL))kMeans = ( KMeans() .setVectorCol(VECTOR_COL) .setK(2) .setPredictionCol(PRED_COL))pipeline = Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()
咱们能够看到本地和近程代码上的差异,就只有第4行代码不一样,本地应用的是useLocalEnv(2),近程应用的是useRemoteEnv("10.101..", 31805, 2, shipAlinkAlgoJar=False)。相较于本地环境,集群环境须要指定Flink集群的ip地址和端口。
- PyAlink 流式工作在 Notebook 上运行
本地运行示例:
from pyalink.alink import *## 一个 Stream 作业的例子## 惟一参数示意并行度useLocalEnv(2)source = CsvSourceStreamOp() \ .setSchemaStr( "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \ .setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")source.print()StreamOperator.execute()
集群运行示例:
from pyalink.alink import *## 一个 Stream 作业的例子useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False, localIp="30.39.**.**")source = CsvSourceStreamOp() \ .setSchemaStr( "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \ .setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")source.print()StreamOperator.execute()
咱们能够看到,流式工作的运行,本地和集群上的代码也只有设置运行环境这一行(第4行)代码有差异,其余的代码都是一样的。
集群运行模式中,流式工作和批式工作的设置有点差异,流式的工作须要指定本地的IP地址(localIp),咱们应用Notebook进行交互式开发时,个别须要看到运行后果,批式工作应用Flink现有的机制,是能够间接看到运行后果的。然而流式工作数据流是无边界的,为了将流工作的运行后果返回回来,让用户能够实时看到,咱们独自建设了一个通路进行数据传输,因而咱们须要设置这个本地的IP地址,和集群进行交互。当然,这个本地IP地址的参数,在4月份的版本(Alink 1.1.1版本)中,曾经能够自动检测到了,能够省略掉了。
■ PyAlink 基于 PyFlink 整合
本节中,重点介绍两点PyAlink和PyFlink的兼容个性。
- 数据的连通性:Alink 算子的输入输出,实质上是Flink的Table格局,PyFlink其实也是Flink Table,这样Alink Operator与PyFlink两个就能够互相转化,而且转换的代价十分小,并不波及到数据的重写。有了这种转换,Alink和PyFlink两边的性能就能够混用,不便串联 Flink 和 Alink 的工作流。
上面是一段Alink和PyFlink代码混用的示例:
### get_mlenv.pyfrom pyalink.alink import *env, btenv, senv, stenv = getMLEnv()### 应用 PyFlink 接口,与 Table 进行互转table = stenv.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])source = TableSourceStreamOp(table)source.print()StreamOperator.execute()
咱们能够看到,应用PyFlink 构建Table(第5行),能够间接转化为PyAlink的数据源算子(第6行)。
- 新提供了 getMLEnv 接口,能间接应用 flink run -py *.py 往集群提交作业。除了Notebook交互式运行Alink工作这种形式,对于定时调度的工作,咱们须要一次性提交工作,PyFlink在这方面反对十分好,PyAlink和PyFlink兼容后,咱们能够达到和PyFlink提交工作一样的应用体验。只是有一点不同,在获取运行环境的时候,须要改为调用getMLEnv()办法,这个办法会返回env, btenv, senv, stenv四个运行环境,这样PyAlink的工作能够交给PyFlink,执行PyFlink 相干操作。
上面是提交工作的示例:
### 间接运行脚本python kmeans.py### 向集群提交作业PYFLINK_PATH=`python -c "import pyflink;print(pyflink.__path__[0])"`${PYFLINK_PATH}/bin/flink run -m 10.101.**.**:31805 -py kmeans.py -p 4
■ 读写Kafka
咱们在Flink Kafka Connector根底上,为Kafka的输入输出包装了Source和Sink组件,让大家读写Kafka数据更加不便。上面是一个从数据读入,数据解析,对数据进行逻辑回归预测,将后果写入Kafka的工作的代码示例:
### 读取数据data = KafkaSourceStreamOp()\ .setBootstrapServers("localhost:9092")\ .setTopic("iris")\ .setStartupMode("EARLIEST")\ .setGroupId("alink_group")### 解析JSON数据json_parser = JsonValueStreamOp() .setSelectedCol("message") .setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"]) .setJsonPath(["$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width", "$.category"])data = data.link(json_parser)### 数据类型转换data = data.select( \ "CAST(sepal_length AS DOUBLE) AS sepal_length, " + "CAST(sepal_width AS DOUBLE) AS sepal_width, " + "CAST(petal_length AS DOUBLE) AS petal_length, " + "CAST(petal_width AS DOUBLE) AS petal_width, category")### 读取本地模型文件model = CsvSourceBatchOp().setFilePath("/path/to/model.csv") \ .setSchemaStr("model_id bigint, model_info string, label_type string")### 构建逻辑回归预测模型lr_predictor = LogisticRegressionPredictStreamOp(model) \ .setPredictionCol("pred").setPredictionDetailCol("pred_detail")### 对数据进行预测result = data.link(lr_predictor)### 后果输入到Kafkasink = KafkaSinkStreamOp() \ .setBootstrapServers("localhost:9092") \ .setDataFormat("json").setTopic("lr_pred")data.link(sink)StreamOperator.execute()
从代码中能够看出,数据通过KafkaSourceStreamOp组件读入,通过JSON Path解析数据,解析出的值,是String类型,再依据数据的理论类型,应用CAST函数进行类型转换,而后通过加载本地训练好的模型,构建逻辑回归预测组件,对数据进行预测,最初将后果通过KafkaSinkStreamOp组件输入到Kafka。
咱们也能够看到,在Alink 1.1.0这个版本中,数据的解析还是有一点麻烦,在本文前面的局部还会介绍对数据解析局部的简化,让整个流程更简洁。
2.Alink version 1.1.1
本章开始,咱们将具体介绍Alink 1.1.1版本的一些优化的点,以及重要个性等。
■ 优化枚举类型参数提醒
在咱们应用算法组件的时候,常常会遇到有些属性是枚举类型的,在Python中,个别是通过字符串输出枚举值,理论在应用的过程中,这些枚举值很难全副记住,常常须要去查问Alink的文档。为了咱们编写代码更加顺畅,在新版本中,咱们优化了代码的提示信息,咱们能够尝试填写一个代替值,尽管会抛异样,但在运行后果中,能够看到枚举值的明确提醒。
以卡方筛选算子为例,卡方筛选算子的SelectorType能够填写NumTopFeatures, Percentil,FPR等,是枚举类型变量,咱们如果应用'aaa'值代替,看下会有什么成果,代码如下:
### Python代码selector = ChiSqSelectorBatchOp()\ .setSelectorType("aaa")\ .setSelectedCols(["f_string", "f_long", "f_int", "f_double"])\ .setLabelCol("f_boolean")\ .setNumTopFeatures(2)
在Alink 1.1.1之前的版本,会返回下图:
异样信息中打出SelectorType输入谬误的值AAA,但异样信息不显著,也没有指出是哪个参数写错了。
Alink 1.1.1中,则会呈现下图的后果:
异样信息中会有哪个参数填写谬误,以及会提醒可能的值是什么,这样咱们应用Alink算子的时候更加便捷。
下面是Python代码的枚举类型的谬误提醒,对于JAVA来说,有代码主动提醒,编写时会十分不便:
■ 优化列名参数提醒
咱们进行机器学习开发,算法中往往会有很多列名参数,列名输错状况很常见,如下图所示:
咱们可能将text字段谬误的写成了text1,在1.1.1版本里,不仅会指出哪列不存在,也会提醒最可能的列名,帮忙用户做修改,见下图所示。
这样,用户能够更快的定位谬误,排查问题。JAVA的行为也雷同:
输入提醒如下:
■ PyAlink1.1.1改良
- 优化了 DataFrame 和 BatchOperator 互转的性能
咱们在应用Python时,更多是用DataFrame来操作数据,在应用PyAlink时,有一个DataFrame向Alink Table转换的过程,转换的速度会间接影响整个工作的执行时长,为了给用户一个比拟好的用户体验,咱们在转化下面,做了比拟大的性能优化。
以上面的示例代码为例:
n = 50000users = []for col in range(n):users.append([col] * 2)df = pd.DataFrame(users)source = BatchOperator.fromDataframe(df, schemaStr='id int, label int')source.link(CsvSinkBatchOp().setOverwriteSink(True).setFilePath('temp.csv'))BatchOperator.execute()
之前5W行数据须要约55s,当初只须要 5s,当初100w行数据约 20s就能够转换实现。您可能留神到从5W到100W,这种晋升如同不是成程线性关系,这是因为转化的过程中,还蕴含了一些零碎开销。总之,咱们在数据转化中,曾经尽量的压缩了解决工夫,让整个工作运行更快。
- 改良流式组件的print性能,将不会因为数据中有NaN导致作业失败,进步了程序的稳定性。
- Python UDF运行中将自动检测 python3 命令,如果环境中同时有 Python 2和3,将可能因为 python 命令指向 Python 2而导致运行不胜利,在Alink 1.1.1版本中,将优先应用python3 来执行 Python UDF。
- useRemoteEnv 将自动检测本机外网 IP,在个别网络配置下,应用 StreamOperator(流式工作) 组件的性能,无需设置 localIp 了。
- 新增组件,将CSV、JSON和KV格局的字符串解析为多列。
上面是一组JOSN格局的测试数据。
{"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6}{"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5}{"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6}{"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9}{"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3}
咱们须要将其解析为下图这样的结构化数据。
Alink 1.1.1之前,咱们可能须要编写上面这样的代码:
json_parser = JsonValueStreamOp() .setSelectedCol("message") .setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"]) .setJsonPath(["$.sepal_length", "$.sepal_width", data = data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string").setReservedCols([]))"$.petal_length", "$.petal_width", "$.category"])data = data.link(json_parser)data = data.select( \ "CAST(sepal_length AS DOUBLE) AS sepal_length, " + "CAST(sepal_width AS DOUBLE) AS sepal_width, " + "CAST(petal_length AS DOUBLE) AS petal_length, " + "CAST(petal_width AS DOUBLE) AS petal_width, category")
在Alink 1.1.1版本中,咱们增加了JsonToColumnsStreamOp组件,代码变成这样:
data = data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string").setReservedCols([]))
咱们能够看到,代码精简了很多。
最初,介绍一个日志解析的例子,咱们晓得,日志的格局没有一个残缺的法则,不是一个JSON格局,也不是KV格局,这就须要用现有工具进行组合来解决。
上面是一段日志记录的内容:
66.249.79.35 - - [14/Jun/2018:06:45:24 +0000] "GET /img/20180504/702434-20180302101540805-554506523.jpg HTTP/1.1" 200 10013 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:25 +0000] "GET /img/20180504/702434-20180302161346635-1714710787.jpg HTTP/1.1" 200 45157 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:56 +0000] "GET /img/2018/05/21/60662344.jpg HTTP/1.1" 200 14133 "-" "Googlebot-Image/1.0"54.36.148.129 - - [14/Jun/2018:06:46:01 +0000] "GET /archives/91007 HTTP/1.1" 200 8332 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”54.36.148.201 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/88741/feed HTTP/1.1" 200 983 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”5.255.250.200 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/87084 HTTP/1.1" 200 9951 "-" "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)”
上面是具体解析的代码:
source.select("SUBSTRING(text FROM 1 For POSITION('[' in text)-2) AS part1," +"REGEXT_EXTRACT(text,'(\\[)(.*?)(\\])',2) AS log_time," +"SUBSTRING(text FROM 2+POSION(']' IN text)) AS part2").link( new CsvToColumnBatchOp() .setSelectCol("part1") .setFieldDelimiter(" ") .setSchemaStr("ip string,col1 string,col2 string")).link( new CsvToColumnBatchOp() .setSelectCol("part2") .setFieldDelimiter(" ") .setSchemaStr("cmd string,response int,bytesize int,col3 string,col4 string")).link( new CsvToColumnBatchOp() .setSelectCol("cmd") .setFieldDelimiter(" ") .setSchemaStr("req_method string,url String,protocol string")).select("ip,col1,col2,log_time,req_method,url,protocol,response,bytesize,col3,col4")
下面的代码思路如下:
- 首先咱们将日志依据“[]”将日志划分为三局部,能够应用Flink的SUBSTRING函数,联合正则表达式REGEXT_EXTRACT进行拆分。
- 别离应用CsvToColumnBatchOp依照空格分隔对两边的文本(part1,part2两局部)进行解析,并指定列名。
对cmd这个非凡字段做进一步的解析。 - 最初,选出所有解析进去的列,实现。
Alink相干资料汇总:
- Alink GitHub地址:
https://github.com/alibaba/Alink - Alink系列教程:
https://www.zhihu.com/people/...
以上。Alink 是基于 Flink 的机器学习算法平台,欢送拜访 Alink 的 GitHub 链接获取更多信息。也欢送退出 Alink 开源用户群进行交换~
▼ 钉钉扫码退出 Alink 技术交换群 ▼