本文依据 Flink Forward 寰球在线会议 · 中文精华版整顿而成,由阿里巴巴计算平台事业部资深算法专家杨旭(品数)分享。本文次要介绍了 Alink 从发表开源到当初,最近半年来的停顿状况,重点分享了 Alink 的一些个性、原理、应用技巧等,为大家应用 Alink 进行开发提供了参考。

Alink停顿总览

Alink 到目前曾经公布了四个 Release 版本:

  1. Alink version 1.0:2019年11月在Flink Forword Asia大会上发表开源。
  2. Alink version 1.0.1:于2019年12月公布,次要解决一些场景下PyAlink的装置问题。在此期间也出了一系列的开发文章,包含Alink环境搭建,入门示例等,为大家应用Alink第一步提供了领导。
  3. Alink version 1.1.0:于2020年02月公布,在Flink公布1.10版本后,Alink 第一工夫做了兼容,目前Alink反对Flink 1.10和Flink 1.9,PyAlink也兼容PyFlink。此外,从这个版本开始,Alink曾经公布到Maven地方仓库和PyPI。这样,Maven工程中应用Alink,只须要在POM文件中引入Alink的相干依赖就能够了,无需本人手动编译,打包装置。Python环境则能够借助PyPI仓库,进行Alink的装置。
  4. Alink version 1.1.1 :于2020年04月公布,次要是晋升了应用体验,晋升了性能。

Alink 倒退之路

上图是Alink在公布1.0版本的时候,所有的算法以及性能,简略来说,Alink的批式性能是和SparkML对应的,SparkML有的性能,Alink根本都提供了。相较于SparkML,除了批式的性能,Alink还提供了流式的性能。

Alink在近半年,性能上整体没有大的变动,上面列举一些正在研发测试,行将开源的一些性能:

  1. 提供更多数据处理,特色工程相干性能,在小版本就会陆续推出。
  2. 经典的分类和回归问题上,次要为两个方面:一是对已有模型,咱们将会披露更多模型外部信息,让大家对模型有更多的理解,而不仅仅只是拿模型来进行预测,二是FM系列算法的推出。
  3. 关联规定&协同过滤,在协同过滤举荐问题上,SparkML次要提供的是ALS,它能够解决一些举荐的问题,然而理论使用过程中,仅仅应用ALS是不够的,后续Alink将推出更多举荐类的算法。
  4. 在线学习,在1.0公布的时候,曾经提供了在线学习的性能,但在理论利用场景中,用户心愿在线学习可能变得更加灵便,后续的版本中将会对这部分进行增强。

重要个性介绍

在本章,将依照版本的公布程序,逐渐介绍Alink的个性,设计原理,以及应用技巧等内容。

1.Alink version 1.1.0

■ 程序构建

从Alink 1.1.0开始,应用Maven地方仓库即可构建Alink我的项目,上面是POM文件示例。Flink 1.10版本依赖:

<dependency>    <groupId>com.alibaba.alink</groupId>    <artifactId>alink_core_flink-1.10_2.11</artifactId>    <version>1.1.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-scala_2.11</artifactId>    <version>1.10.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-planner_2.11</artifactId>    <version>1.10.0</version></dependency>

Flink 1.9版本依赖:

<dependency>    <groupId>com.alibaba.alink</groupId>    <artifactId>alink_core_flink-1.9_2.11</artifactId>    <version>1.1.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-scala_2.11</artifactId>    <version>1.9.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-planner_2.11</artifactId>    <version>1.9.0</version></dependency>

■ 环境装置实际

  • 筹备环节

次要是Python环境搭建,以及JAVA 8的装置,Python环境的搭建咱们举荐装置Anaconda3,能够对Python的版本进行灵便的管制。不同操作系统的环境筹备,请参考上面的教程:

MacOS: https://zhuanlan.zhihu.com/p/...
Linux环境: https://zhuanlan.zhihu.com/p/...
Windows:https://zhuanlan.zhihu.com/p/...

  • PyAlink装置

从1.1.0开始,Alink曾经公布到了PyPI,装置更加不便了,请参考如下链接:

如何装置最新版本PyAlink?
https://zhuanlan.zhihu.com/p/...
  • PyAlink卸载

如果之前装置过PyAlink,因为之前版本咱们是手动装置的,在降级到新版本时,可能会遇到一些问题,因而须要将其卸载,能够参考上面的文章:

PyAlink的版本查问、卸载旧版本:
https://zhuanlan.zhihu.com/p/...

■ Notebook开发实际

在讲 Notebook 示例之前,咱们先来理解一下 PyAlink 的设计背景:

  1. 在机器学习利用开发过程中,咱们常常会先在批式环境进行模型训练,而后在流式的环境利用模型,从批式环境转换到流式环境,往往须要重写代码,无奈做到代码的复用。Alink设计之初,就心愿尽量将批和流之间的差别变得最小,比方,批上做完后,只须要将Batch字样改成Stream字样就能够运行。
  2. 机器学习开发的过程,咱们个别是心愿越快越好,越麻利越好。其实在本机上开发,体验是最好的,个别做法是,在本机小数据规模上进行验证,而后上到集群上进行成果的评估。然而本机环境往集群环境迁徙,并不容易,咱们心愿这个过程有一个好的体验,不必去编写大量的代码。

基于这样的设计背景,咱们来看下一当初Notebook上进行Alink开发的实际。

  • PyAlink 批式工作在 Notebook 上运行

本地运行代码示例:

from pyalink.alink import *## 一个 Batch 作业的例子useLocalEnv(2)## prepare dataimport numpy as npimport pandas as pddata = np.array([    [0, 0.0, 0.0, 0.0],    [1, 0.1, 0.1, 0.1],    [2, 0.2, 0.2, 0.2],    [3, 9, 9, 9],    [4, 9.1, 9.1, 9.1],    [5, 9.2, 9.2, 9.2]])df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')FEATURE_COLS = ["f0", "f1", "f2"]VECTOR_COL = "vec"PRED_COL = "pred"vectorAssembler = (    VectorAssembler()    .setSelectedCols(FEATURE_COLS)    .setOutputCol(VECTOR_COL))kMeans = (    KMeans()    .setVectorCol(VECTOR_COL)    .setK(2)    .setPredictionCol(PRED_COL))pipeline = Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()

集群运行代码示例:

from pyalink.alink import *## 一个 Batch 作业的例子useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False)## prepare dataimport numpy as npimport pandas as pddata = np.array([    [0, 0.0, 0.0, 0.0],    [1, 0.1, 0.1, 0.1],    [2, 0.2, 0.2, 0.2],    [3, 9, 9, 9],    [4, 9.1, 9.1, 9.1],    [5, 9.2, 9.2, 9.2]])df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')FEATURE_COLS = ["f0", "f1", "f2"]VECTOR_COL = "vec"PRED_COL = "pred"vectorAssembler = (    VectorAssembler()    .setSelectedCols(FEATURE_COLS)    .setOutputCol(VECTOR_COL))kMeans = (    KMeans()    .setVectorCol(VECTOR_COL)    .setK(2)    .setPredictionCol(PRED_COL))pipeline = Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()

咱们能够看到本地和近程代码上的差异,就只有第4行代码不一样,本地应用的是useLocalEnv(2),近程应用的是useRemoteEnv("10.101..", 31805, 2, shipAlinkAlgoJar=False)。相较于本地环境,集群环境须要指定Flink集群的ip地址和端口。

  • PyAlink 流式工作在 Notebook 上运行

本地运行示例:

from pyalink.alink import *## 一个 Stream 作业的例子## 惟一参数示意并行度useLocalEnv(2)source = CsvSourceStreamOp() \    .setSchemaStr(    "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \    .setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")source.print()StreamOperator.execute()

集群运行示例:

from pyalink.alink import *## 一个 Stream 作业的例子useRemoteEnv("10.101.**.**", 31805, 2, shipAlinkAlgoJar=False, localIp="30.39.**.**")source = CsvSourceStreamOp() \    .setSchemaStr(    "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \    .setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")source.print()StreamOperator.execute()

咱们能够看到,流式工作的运行,本地和集群上的代码也只有设置运行环境这一行(第4行)代码有差异,其余的代码都是一样的。

集群运行模式中,流式工作和批式工作的设置有点差异,流式的工作须要指定本地的IP地址(localIp),咱们应用Notebook进行交互式开发时,个别须要看到运行后果,批式工作应用Flink现有的机制,是能够间接看到运行后果的。然而流式工作数据流是无边界的,为了将流工作的运行后果返回回来,让用户能够实时看到,咱们独自建设了一个通路进行数据传输,因而咱们须要设置这个本地的IP地址,和集群进行交互。当然,这个本地IP地址的参数,在4月份的版本(Alink 1.1.1版本)中,曾经能够自动检测到了,能够省略掉了。

■ PyAlink 基于 PyFlink 整合

本节中,重点介绍两点PyAlink和PyFlink的兼容个性。

  • 数据的连通性:Alink 算子的输入输出,实质上是Flink的Table格局,PyFlink其实也是Flink Table,这样Alink Operator与PyFlink两个就能够互相转化,而且转换的代价十分小,并不波及到数据的重写。有了这种转换,Alink和PyFlink两边的性能就能够混用,不便串联 Flink 和 Alink 的工作流。

上面是一段Alink和PyFlink代码混用的示例:

###  get_mlenv.pyfrom pyalink.alink import *env, btenv, senv, stenv = getMLEnv()### 应用 PyFlink 接口,与 Table 进行互转table = stenv.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])source = TableSourceStreamOp(table)source.print()StreamOperator.execute()

咱们能够看到,应用PyFlink 构建Table(第5行),能够间接转化为PyAlink的数据源算子(第6行)。

  • 新提供了 getMLEnv 接口,能间接应用 flink run -py *.py 往集群提交作业。除了Notebook交互式运行Alink工作这种形式,对于定时调度的工作,咱们须要一次性提交工作,PyFlink在这方面反对十分好,PyAlink和PyFlink兼容后,咱们能够达到和PyFlink提交工作一样的应用体验。只是有一点不同,在获取运行环境的时候,须要改为调用getMLEnv()办法,这个办法会返回env, btenv, senv, stenv四个运行环境,这样PyAlink的工作能够交给PyFlink,执行PyFlink 相干操作。

上面是提交工作的示例:

### 间接运行脚本python kmeans.py### 向集群提交作业PYFLINK_PATH=`python -c "import pyflink;print(pyflink.__path__[0])"`${PYFLINK_PATH}/bin/flink run -m 10.101.**.**:31805 -py kmeans.py -p 4

■ 读写Kafka

咱们在Flink Kafka Connector根底上,为Kafka的输入输出包装了Source和Sink组件,让大家读写Kafka数据更加不便。上面是一个从数据读入,数据解析,对数据进行逻辑回归预测,将后果写入Kafka的工作的代码示例:

### 读取数据data = KafkaSourceStreamOp()\                .setBootstrapServers("localhost:9092")\                .setTopic("iris")\                .setStartupMode("EARLIEST")\                .setGroupId("alink_group")### 解析JSON数据json_parser = JsonValueStreamOp()            .setSelectedCol("message")            .setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"])            .setJsonPath(["$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width", "$.category"])data = data.link(json_parser)### 数据类型转换data = data.select( \  "CAST(sepal_length AS DOUBLE) AS sepal_length, "  + "CAST(sepal_width AS DOUBLE) AS sepal_width, "  + "CAST(petal_length AS DOUBLE) AS petal_length, "  + "CAST(petal_width AS DOUBLE) AS petal_width, category")### 读取本地模型文件model = CsvSourceBatchOp().setFilePath("/path/to/model.csv") \  .setSchemaStr("model_id bigint, model_info string, label_type string")### 构建逻辑回归预测模型lr_predictor = LogisticRegressionPredictStreamOp(model) \  .setPredictionCol("pred").setPredictionDetailCol("pred_detail")### 对数据进行预测result = data.link(lr_predictor)### 后果输入到Kafkasink = KafkaSinkStreamOp() \  .setBootstrapServers("localhost:9092") \  .setDataFormat("json").setTopic("lr_pred")data.link(sink)StreamOperator.execute()

从代码中能够看出,数据通过KafkaSourceStreamOp组件读入,通过JSON Path解析数据,解析出的值,是String类型,再依据数据的理论类型,应用CAST函数进行类型转换,而后通过加载本地训练好的模型,构建逻辑回归预测组件,对数据进行预测,最初将后果通过KafkaSinkStreamOp组件输入到Kafka。

咱们也能够看到,在Alink 1.1.0这个版本中,数据的解析还是有一点麻烦,在本文前面的局部还会介绍对数据解析局部的简化,让整个流程更简洁。

2.Alink version 1.1.1

本章开始,咱们将具体介绍Alink 1.1.1版本的一些优化的点,以及重要个性等。

■ 优化枚举类型参数提醒

在咱们应用算法组件的时候,常常会遇到有些属性是枚举类型的,在Python中,个别是通过字符串输出枚举值,理论在应用的过程中,这些枚举值很难全副记住,常常须要去查问Alink的文档。为了咱们编写代码更加顺畅,在新版本中,咱们优化了代码的提示信息,咱们能够尝试填写一个代替值,尽管会抛异样,但在运行后果中,能够看到枚举值的明确提醒。

以卡方筛选算子为例,卡方筛选算子的SelectorType能够填写NumTopFeatures, Percentil,FPR等,是枚举类型变量,咱们如果应用'aaa'值代替,看下会有什么成果,代码如下:

### Python代码selector = ChiSqSelectorBatchOp()\            .setSelectorType("aaa")\            .setSelectedCols(["f_string", "f_long", "f_int", "f_double"])\            .setLabelCol("f_boolean")\            .setNumTopFeatures(2)

在Alink 1.1.1之前的版本,会返回下图:

异样信息中打出SelectorType输入谬误的值AAA,但异样信息不显著,也没有指出是哪个参数写错了。

Alink 1.1.1中,则会呈现下图的后果:

异样信息中会有哪个参数填写谬误,以及会提醒可能的值是什么,这样咱们应用Alink算子的时候更加便捷。

下面是Python代码的枚举类型的谬误提醒,对于JAVA来说,有代码主动提醒,编写时会十分不便:

■ 优化列名参数提醒

咱们进行机器学习开发,算法中往往会有很多列名参数,列名输错状况很常见,如下图所示:

咱们可能将text字段谬误的写成了text1,在1.1.1版本里,不仅会指出哪列不存在,也会提醒最可能的列名,帮忙用户做修改,见下图所示。

这样,用户能够更快的定位谬误,排查问题。JAVA的行为也雷同:

输入提醒如下:

■ PyAlink1.1.1改良

  • 优化了 DataFrame 和 BatchOperator 互转的性能

咱们在应用Python时,更多是用DataFrame来操作数据,在应用PyAlink时,有一个DataFrame向Alink Table转换的过程,转换的速度会间接影响整个工作的执行时长,为了给用户一个比拟好的用户体验,咱们在转化下面,做了比拟大的性能优化。

以上面的示例代码为例:

n = 50000users = []for col in range(n):users.append([col] * 2)df = pd.DataFrame(users)source = BatchOperator.fromDataframe(df, schemaStr='id int, label int')source.link(CsvSinkBatchOp().setOverwriteSink(True).setFilePath('temp.csv'))BatchOperator.execute()

之前5W行数据须要约55s,当初只须要 5s,当初100w行数据约 20s就能够转换实现。您可能留神到从5W到100W,这种晋升如同不是成程线性关系,这是因为转化的过程中,还蕴含了一些零碎开销。总之,咱们在数据转化中,曾经尽量的压缩了解决工夫,让整个工作运行更快。

  • 改良流式组件的print性能,将不会因为数据中有NaN导致作业失败,进步了程序的稳定性。
  • Python UDF运行中将自动检测 python3 命令,如果环境中同时有 Python 2和3,将可能因为 python 命令指向 Python 2而导致运行不胜利,在Alink 1.1.1版本中,将优先应用python3 来执行 Python UDF。
  • useRemoteEnv 将自动检测本机外网 IP,在个别网络配置下,应用 StreamOperator(流式工作) 组件的性能,无需设置 localIp 了。
  • 新增组件,将CSV、JSON和KV格局的字符串解析为多列。

上面是一组JOSN格局的测试数据。

{"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6}{"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5}{"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6}{"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9}{"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3}

咱们须要将其解析为下图这样的结构化数据。

Alink 1.1.1之前,咱们可能须要编写上面这样的代码:

json_parser = JsonValueStreamOp()            .setSelectedCol("message")            .setOutputCols(["sepal_length", "sepal_width", "petal_length", "petal_width", "category"])            .setJsonPath(["$.sepal_length", "$.sepal_width", data = data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double, "         + "petal_width double, category string").setReservedCols([]))"$.petal_length", "$.petal_width", "$.category"])data = data.link(json_parser)data = data.select( \  "CAST(sepal_length AS DOUBLE) AS sepal_length, "  + "CAST(sepal_width AS DOUBLE) AS sepal_width, "  + "CAST(petal_length AS DOUBLE) AS petal_length, "  + "CAST(petal_width AS DOUBLE) AS petal_width, category")

在Alink 1.1.1版本中,咱们增加了JsonToColumnsStreamOp组件,代码变成这样:

data = data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_length double, sepal_width double, petal_length double, "         + "petal_width double, category string").setReservedCols([]))

咱们能够看到,代码精简了很多。

最初,介绍一个日志解析的例子,咱们晓得,日志的格局没有一个残缺的法则,不是一个JSON格局,也不是KV格局,这就须要用现有工具进行组合来解决。

上面是一段日志记录的内容:

66.249.79.35 - - [14/Jun/2018:06:45:24 +0000] "GET /img/20180504/702434-20180302101540805-554506523.jpg HTTP/1.1" 200 10013 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:25 +0000] "GET /img/20180504/702434-20180302161346635-1714710787.jpg HTTP/1.1" 200 45157 "-" "Googlebot-Image/1.0”66.249.79.35 - - [14/Jun/2018:06:45:56 +0000] "GET /img/2018/05/21/60662344.jpg HTTP/1.1" 200 14133 "-" "Googlebot-Image/1.0"54.36.148.129 - - [14/Jun/2018:06:46:01 +0000] "GET /archives/91007 HTTP/1.1" 200 8332 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”54.36.148.201 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/88741/feed HTTP/1.1" 200 983 "-" "Mozilla/5.0 (compatible; AhrefsBot/5.2; +http://ahrefs.com/robot/)”5.255.250.200 - - [14/Jun/2018:06:46:03 +0000] "GET /archives/87084 HTTP/1.1" 200 9951 "-" "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)”

上面是具体解析的代码:

source.select("SUBSTRING(text FROM 1 For POSITION('[' in text)-2) AS part1,"  +"REGEXT_EXTRACT(text,'(\\[)(.*?)(\\])',2) AS log_time,"  +"SUBSTRING(text FROM 2+POSION(']' IN text)) AS part2").link(  new CsvToColumnBatchOp()  .setSelectCol("part1")  .setFieldDelimiter(" ")  .setSchemaStr("ip string,col1 string,col2 string")).link(   new CsvToColumnBatchOp()  .setSelectCol("part2")  .setFieldDelimiter(" ")  .setSchemaStr("cmd string,response int,bytesize int,col3 string,col4 string")).link(   new CsvToColumnBatchOp()  .setSelectCol("cmd")  .setFieldDelimiter(" ")  .setSchemaStr("req_method string,url String,protocol string")).select("ip,col1,col2,log_time,req_method,url,protocol,response,bytesize,col3,col4")

下面的代码思路如下:

  1. 首先咱们将日志依据“[]”将日志划分为三局部,能够应用Flink的SUBSTRING函数,联合正则表达式REGEXT_EXTRACT进行拆分。
  2. 别离应用CsvToColumnBatchOp依照空格分隔对两边的文本(part1,part2两局部)进行解析,并指定列名。
    对cmd这个非凡字段做进一步的解析。
  3. 最初,选出所有解析进去的列,实现。

Alink相干资料汇总:

  • Alink GitHub地址:
    https://github.com/alibaba/Alink
  • Alink系列教程:
    https://www.zhihu.com/people/...

以上。Alink 是基于 Flink 的机器学习算法平台,欢送拜访 Alink 的 GitHub 链接获取更多信息。也欢送退出 Alink 开源用户群进行交换~

▼ 钉钉扫码退出 Alink 技术交换群 ▼