乐趣区

关于云计算:Flink的DataSource三部曲之一直接API

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

本文是《Flink 的 DataSource 三部曲》系列的第一篇,该系列旨在通过实战学习和理解 Flink 的 DataSource,为当前的深刻学习打好根底,由以下三局部组成:

  1. 间接 API:即本篇,除了筹备环境和工程,还学习了 StreamExecutionEnvironment 提供的用来创立数据来的 API;
  2. 内置 connector:StreamExecutionEnvironment 的 addSource 办法,入参能够是 flink 内置的 connector,例如 kafka、RabbitMQ 等;
  3. 自定义:StreamExecutionEnvironment 的 addSource 办法,入参能够是自定义的 SourceFunction 实现类;

Flink 的 DataSource 三部曲文章链接

  1. 《Flink 的 DataSource 三部曲之一:间接 API》
  2. 《Flink 的 DataSource 三部曲之二: 内置 connector》
  3. 《Flink 的 DataSource 三部曲之三: 自定义》

对于 Flink 的 DataSource

官网对 DataSource 的解释:Sources are where your program reads its input from,即 DataSource 是利用的数据起源,如下图的两个红框所示:

DataSource 类型

对于常见的文本读入、kafka、RabbitMQ 等数据起源,能够间接应用 Flink 提供的 API 或者 connector,如果这些满足不了需要,还能够本人开发,下图是我依照本人的了解梳理的:

环境和版本

熟练掌握内置 DataSource 的最好方法就是实战,本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3(MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:

名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址 (https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址 (ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定

这个 git 我的项目中有多个文件夹,本章的利用在 <font color=”blue”>flinkdatasourcedemo</font> 文件夹下,如下图红框所示:

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3(MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

创立工程

  1. 在控制台执行以下命令就会进入创立 flink 利用的交互模式,按提醒输出 gourpId 和 artifactId,就会创立一个 flink 利用 (我输出的 groupId 是 <font color=”blue”>com.bolingcavalry</font>,artifactId 是 <font color=”blue”>flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
  1. 当初 maven 工程已生成,用 IDEA 导入这个工程,如下图:

  1. 以 maven 的类型导入:

  1. 导入胜利的样子:

  1. 我的项目创立胜利,能够开始写代码实战了;

辅助类 Splitter

实战中有个性能罕用到:将字符串用空格宰割,转成 Tuple2 类型的汇合,这里将此算子做成一个公共类 Splitter.java,代码如下:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {if(StringUtils.isNullOrWhitespaceOnly(s)) {System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

筹备结束,能够开始实战了,先从最简略的 Socket 开始。

Socket DataSource

Socket DataSource 的性能是监听指定 IP 的指定端口,读取网络数据;

  1. 在方才新建的工程中创立一个类 Socket.java:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听本地 9999 端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        // 每五秒钟一次,将以后五秒内所有字符串以空格宰割,而后统计单词数量,打印进去
        socketDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("API DataSource demo : socket");
    }
}

从上述代码可见,StreamExecutionEnvironment.socketTextStream 就能够创立 Socket 类型的 DataSource,在控制台执行命令 <font color=”blue”>nc -lk 9999</font>,即可进入交互模式,此时输入任何字符串再回车,都会将字符串传输到本机 9999 端口;

  1. 在 IDEA 上运行 Socket 类,启动胜利后再回到方才执行 <font color=”blue”>nc -lk 9999</font> 的控制台,输出一些字符串再回车,可见 Socket 的性能曾经失效:

汇合 DataSource(generateSequence)

  1. 基于汇合的 DataSource,API 如下图所示:

  1. 先试试最简略的 generateSequence,创立指定范畴内的数字型的 DataSource:
package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度为 1
        env.setParallelism(1);

        // 通过 generateSequence 失去 Long 类型的 DataSource
        DataStream<Long> dataStream = env.generateSequence(1, 10);

        // 做一次过滤,只保留偶数,而后打印
        dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {return 0L==aLong.longValue()%2L;
            }
        }).print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 运行时会打印偶数:

汇合 DataSource(fromElements+fromCollection)

  1. fromElements 和 fromCollection 就在一个类中试了吧,创立 <font color=”blue”>FromCollection</font> 类,外面是这两个 API 的用法:
package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class FromCollection {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度为 1
        env.setParallelism(1);

        // 创立一个 List,外面有两个 Tuple2 元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 1));

        // 通过 List 创立 DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        // 通过多个 Tuple2 元素创立 DataStream
        DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(new Tuple2("ccc", 1),
                new Tuple2("ddd", 1),
                new Tuple2("aaa", 1)
        );

        // 通过 union 将两个 DataStream 合成一个
        DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

        // 统计每个单词的数量
        unionDataStream
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 运行后果如下:

文件 DataSource

  1. 上面的 ReadTextFile 类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为 1
        env.setParallelism(1);

        // 用 txt 文件作为数据源
        DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");

        // 统计单词数量并打印进去
        textDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : readTextFile");
    }
}
  1. 请确保代码中的绝对路径下存在名为 README.txt 文件,运行后果如下:

  1. 关上 StreamExecutionEnvironment.java 源码,看一下方才应用的 readTextFile 办法实现如下,原来是调用了另一个同名办法,该办法的第三个参数确定了文本文件是一次性读取结束,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间:
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");

        TextInputFormat format = new TextInputFormat(new Path(filePath));
        format.setFilesFilter(FilePathFilter.createDefaultFilter());
        TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
        format.setCharsetName(charsetName);

        return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
    }
  1. 下面的 FileProcessingMode 是个枚举,源码如下:
@PublicEvolving
public enum FileProcessingMode {

    /** Processes the current contents of the path and exits. */
    PROCESS_ONCE,

    /** Periodically scans the path for new data. */
    PROCESS_CONTINUOUSLY
}
  1. 另外请关注 <font color=”blue”>readTextFile</font> 办法的 <font color=”red”>filePath</font> 参数,这是个 URI 类型的字符串,除了本地文件门路,还能够是 HDFS 的地址:<font color=”blue”>hdfs://host:port/file/path</font>

至此,通过间接 API 创立 DataSource 的实战就实现了,前面的章节咱们持续学习内置 connector 形式的 DataSource;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

退出移动版