乐趣区

Flink入门宝典

本文基于 java 构建 Flink1.9 版本入门程序,需要 Maven 3.0.4 和 Java 8 以上版本。需要安装 Netcat 进行简单调试。
这里简述安装过程,并使用 IDEA 进行开发一个简单流处理程序,本地调试或者提交到 Flink 上运行,Maven 与 JDK 安装这里不做说明。
一、Flink 简介

Flink 诞生于欧洲的一个大数据研究项目 StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink 是做 Batch 计算的,但是在 2014 年,StratoSphere 里面的核心成员孵化出 Flink,同年将 Flink 捐赠 Apache,并在后来成为 Apache 的顶级大数据项目,同时 Flink 计算的主流方向被定位为 Streaming,即用流式计算来做所有大数据的计算,这就是 Flink 技术诞生的背景。
2015 开始阿里开始介入 flink 负责对资源调度和流式 sql 的优化,成立了阿里内部版本 blink 在最近更新的 1.9 版本中,blink 开始合并入 flink,
未来 flink 也将支持 java,scala,python 等更多语言,并在机器学习领域施展拳脚。
二、Flink 开发环境搭建
首先要想运行 Flink,我们需要下载并解压 Flink 的二进制包,下载地址如下:https://flink.apache.org/down…
我们可以选择 Flink 与 Scala 结合版本,这里我们选择最新的 1.9 版本 Apache Flink 1.9.0 for Scala 2.12 进行下载。

Flink 在 Windows 和 Linux 下的安装与部署可以查看 Flink 快速入门 – 安装与示例运行,这里演示 windows 版。
安装成功后,启动 cmd 命令行窗口,进入 flink 文件夹,运行 bin 目录下的 start-cluster.bat
$ cd flink$ cd bin$ start-cluster.batStarting a local cluster with one JobManager process and one TaskManager process.You can terminate the processes via CTRL-C in the spawned shell windows.Web interface by default on http://localhost:8081/.
显示启动成功后,我们在浏览器访问 http://localhost:8081/ 可以看到 flink 的管理页面。

三、Flink 快速体验
请保证安装好了 flink,还需要 Maven 3.0.4 和 Java 8 以上版本。这里简述 Maven 构建过程。
其他详细构建方法欢迎查看:快速构建第一个 Flink 工程
1、搭建 Maven 工程
使用 Flink Maven Archetype 构建一个工程。
$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0
你可以编辑自己的 artifactId groupId
目录结构如下:
$ tree quickstart/quickstart/├── pom.xml└── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties
在 pom 中核心依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency></dependencies>2、编写代码
StreamingJob
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class StreamingJob {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStreaming = env .socketTextStream(“localhost”, 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStreaming.print(); // execute program env.execute(“Flink Streaming Java API Skeleton”); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for(String word : sentence.split(” “)){out.collect(new Tuple2<String, Integer>(word, 1)); } } }}3、调试程序
安装 netcat 工具进行简单调试。
启动 netcat 输入:
nc -l 9999
启动程序

在 netcat 中输入几个单词 逗号分隔

在程序一端查看结果

4、程序提交到 Flink
启动 flink
windows 为 start-cluster.bat linux 为 start-cluster.sh
localhost:8081 查看管理页面

通过 maven 对代码打包

将打好的包提交到 flink 上

查看 log
tail -f log/flink-*-jobmanager.out
在 netcat 中继续输入单词,在 Running Jobs 中查看作业状态,在 log 中查看输出。

四、Flink 编程模型
Flink 提供不同级别的抽象来开发流 / 批处理应用程序。

最低级抽象只提供有状态流。
在实践中,大多数应用程序不需要上述低级抽象,而是针对 Core API 编程,如 DataStream API(有界 / 无界流)和 DataSet API(有界数据集)。
Table Api 声明了一个表,遵循关系模型。
最高级抽象是 SQL。
我们这里只用到了 DataStream API。
Flink 程序的基本构建块是流和转换。
一个程序的基本构成:
l 获取 execution environment
l 加载 / 创建原始数据
l 指定这些数据的转化方法
l 指定计算结果的存放位置
l 触发程序执行

五、DataStreaming API 使用 1、获取 execution environment
StreamExecutionEnvironment 是所有 Flink 程序的基础,获取方法有:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String … jarFiles)
一般情况下使用 getExecutionEnvironment。如果你在 IDE 或者常规 java 程序中执行可以通过 createLocalEnvironment 创建基于本地机器的 StreamExecutionEnvironment。如果你已经创建 jar 程序希望通过 invoke 方式获取里面的 getExecutionEnvironment 方法可以使用 createRemoteEnvironment 方式。
2、加载 / 创建原始数据
StreamExecutionEnvironment 提供的一些访问数据源的接口
(1)基于文件的数据源
readTextFile(path)readFile(fileInputFormat, path)readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
(2)基于 Socket 的数据源(本文使用的)
l socketTextStream

(3)基于 Collection 的数据源
fromCollection(Collection)fromCollection(Iterator, Class)fromElements(T …)fromParallelCollection(SplittableIterator, Class)generateSequence(from, to)3、转化方法
(1)Map 方式:DataStream -> DataStream
功能:拿到一个 element 并输出一个 element,类似 Hive 中的 UDF 函数
举例:
DataStream<Integer> dataStream = //…dataStream.map(new MapFunction<Integer, Integer>() {@Override public Integer map(Integer value) throws Exception {return 2 * value;}});
(2)FlatMap 方式:DataStream -> DataStream
功能:拿到一个 element,输出多个值,类似 Hive 中的 UDTF 函数
举例:
dataStream.flatMap(new FlatMapFunction<String, String>() {@Override public void flatMap(String value, Collector<String> out) throws Exception {for(String word: value.split(” “)){out.collect(word); } }});
(3)Filter 方式:DataStream -> DataStream
功能:针对每个 element 判断函数是否返回 true,最后只保留返回 true 的 element
举例:
dataStream.filter(new FilterFunction<Integer>() {@Override public boolean filter(Integer value) throws Exception {return value != 0;}});
(4)KeyBy 方式:DataStream -> KeyedStream
功能:逻辑上将流分割成不相交的分区,每个分区都是相同 key 的元素
举例:
dataStream.keyBy(“someKey”) // Key by field “someKey”dataStream.keyBy(0) // Key by the first element of a Tuple
(5)Reduce 方式:KeyedStream -> DataStream
功能:在 keyed data stream 中进行轮训 reduce。
举例:
keyedStream.reduce(new ReduceFunction<Integer>() {@Override public Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}});
(6)Aggregations 方式:KeyedStream -> DataStream
功能:在 keyed data stream 中进行聚合操作
举例:
keyedStream.sum(0);keyedStream.sum(“key”);keyedStream.min(0);keyedStream.min(“key”);keyedStream.max(0);keyedStream.max(“key”);keyedStream.minBy(0);keyedStream.minBy(“key”);keyedStream.maxBy(0);keyedStream.maxBy(“key”);
(7)Window 方式:KeyedStream -> WindowedStream
功能:在 KeyedStream 中进行使用,根据某个特征针对每个 key 用 windows 进行分组。
举例:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(8)WindowAll 方式:DataStream -> AllWindowedStream
功能:在 DataStream 中根据某个特征进行分组。
举例:
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(9)Union 方式:DataStream* -> DataStream
功能:合并多个数据流成一个新的数据流
举例:
dataStream.union(otherStream1, otherStream2, …);
(10)Split 方式:DataStream -> SplitStream
功能:将流分割成多个流
举例:
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {@Override public Iterable<String> select(Integer value) {List<String> output = new ArrayList<String>(); if (value % 2 == 0) {output.add(“even”); } else {output.add(“odd”); } return output; }});
(11)Select 方式:SplitStream -> DataStream
功能:从 split stream 中选择一个流
举例:
SplitStream<Integer> split;DataStream<Integer> even = split.select(“even”);DataStream<Integer> odd = split.select(“odd”);DataStream<Integer> all = split.select(“even”,”odd”);4、输出数据 writeAsText()writeAsCsv(…)print() / printToErr() writeUsingOutputFormat() / FileOutputFormatwriteToSocketaddSink

退出移动版