关于flink:Flink-DataSet-API编程指南

Flink最大的亮点是实时处理局部,Flink认为批处理是流解决的非凡状况,能够通过一套引擎解决批量和流式数据,而Flink在将来也会重点投入更多的资源到批流交融中。我在Flink DataStream API编程指南中介绍了DataStream API的应用,在本文中将介绍Flink批处理计算的DataSet API的应用。通过本文你能够理解:

  • DataSet转换操作(Transformation)
  • Source与Sink的应用
  • 播送变量的基本概念与应用Demo
  • 分布式缓存的概念及应用Demo
  • DataSet API的Transformation应用Demo案例

WordCount示例

在开始解说DataSet API之前,先看一个Word Count的简略示例,来直观感受一下DataSet API的编程模型,具体代码如下:

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 用于批处理的执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 数据源
        DataSource<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");

        // 转换
        AggregateOperator<Tuple2<String, Integer>> wordCnt = stringDataSource
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split(" ");
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);
        // 输入
        wordCnt.print();
    }
}

从下面的示例中能够看出,根本的编程模型是:

  • 获取批处理的执行环境ExecutionEnvironment
  • 加载数据源
  • 转换操作
  • 数据输入

上面会对数据源、转换操作、数据输入进行一一解读。

Data Source

DataSet API反对从多种数据源中将批量数据集读到Flink零碎中,并转换成DataSet数据集。次要包含三种类型:别离是基于文件的、基于汇合的及通用类数据源。同时在DataSet API中能够自定义实现InputFormat/RichInputFormat接口,以接入不同数据格式类型的数据源,比方CsvInputFormat、TextInputFormat等。从ExecutionEnvironment类提供的办法中能够看出反对的数据源办法,如下图所示:

基于文件的数据源

readTextFile(path) / TextInputFormat

  • 解释

读取文本文件,传递文件门路参数,并将文件内容转换成DataSet<String>类型数据集。

  • 应用
// 读取本地文件
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// 读取HDSF文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

readTextFileWithValue(path)/ TextValueInputFormat

  • 解释

读取文本文件内容,将文件内容转换成DataSet[StringValue]类型数据集。该办法与readTextFile(String)不同的是,其泛型是StringValue,是一种可变的String类型,通过StringValue存储文本数据能够无效升高String对象创立数量,减小垃圾回收的压力。

  • 应用
// 读取本地文件
DataSet<StringValue> localLines = env.readTextFileWithValue("file:///some/local/file");
// 读取HDSF文件
DataSet<StringValue> hdfsLines = env.readTextFileWithValue("hdfs://host:port/file/path");

readCsvFile(path)/ CsvInputFormat

  • 解释

创立一个CSV的reader,读取逗号分隔(或其余分隔符)的文件。能够间接转换成Tuple类型、POJOs类的DataSet。在办法中能够指定行切割符、列切割符、字段等信息。

  • 应用
// read a CSV file with five fields, taking only two of them
// 读取一个具备5个字段的CSV文件,只取第一个和第四个字段
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  
                              .types(String.class, Double.class);

// 读取一个有三个字段的CSV文件,将其转为POJO类型
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

readFileOfPrimitives(path, Class) / PrimitiveInputFormat

  • 解释

读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet汇合

  • 应用
DataSet<String> Data = env.readFileOfPrimitives("file:///some/local/file", String.class);

基于汇合的数据源

fromCollection(Collection)

  • 解释

从java的汇合中创立DataSet数据集,汇合中的元素数据类型雷同

  • 应用
DataSet<String> data= env.fromCollection(arrayList);

fromElements(T …)

  • 解释

从给定数据元素序列中创立DataSet数据集,且所有的数据对象类型必须统一

  • 应用
 DataSet<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");

generateSequence(from, to)

  • 解释

指定from到to范畴区间,而后在区间外部生成数字序列数据集,因为是并行处理的,所以最终的程序不能保障统一。

  • 应用
DataSet<Long> longDataSource = env.generateSequence(1, 20);

通用类型数据源

DataSet API中提供了Inputformat通用的数据接口,以接入不同数据源和格局类型的数据。InputFormat接口次要分为两种类型:一种是基于文件类型,在DataSet API对应readFile()办法;另外一种是基于通用数据类型的接口,例如读取RDBMS或NoSQL数据库中等,在DataSet API中对应createInput()办法。

readFile(inputFormat, path) / FileInputFormat

  • 解释

自定义文件类型输出源,将指定格式文件读取并转成DataSet数据集

  • 应用
env.readFile(new MyInputFormat(), "file:///some/local/file");

createInput(inputFormat) / InputFormat

  • 解释

自定义通用型数据源,将读取的数据转换为DataSet数据集。如以下实例应用Flink内置的JDBCInputFormat,创立读取mysql数据源的JDBCInput Format,实现从mysql中读取Person表,并转换成DataSet [Row]数据集

  • 应用
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("com.mysql.jdbc.Driver")
                     .setDBUrl("jdbc:mysql://localhost/mydb")
                     .setQuery("select name, age from stu")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

Data Sink

Flink在DataSet API中的数据输入共分为三种类型。第一种是基于文件实现,对应DataSet的write()办法,实现将DataSet数据输入到文件系统中。第二种是基于通用存储介质实现,对应DataSet的output()办法,例如应用JDBCOutputFormat将数据输入到关系型数据库中。最初一种是客户端输入,间接将DataSet数据从不同的节点收集到Client,并在客户端中输入,例如DataSet的print()办法。

规范的数据输入办法

// 文本数据
DataSet<String> textData = // [...]

// 将数据写入本地文件
textData.writeAsText("file:///my/result/on/localFS");

// 将数据写入HDFS文件
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// 写数据到本地文件,如果文件存在则笼罩
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// 将数据输入到本地的CSV文件,指定分隔符为"|"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// 应用自定义的TextFormatter对象
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

应用自定义的输入类型

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// 将tuple类型的数据写入关系型数据库
myResult.output(
    // 创立并配置OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("com.mysql.jdbc.Driver")
                    .setDBUrl("jdbc:mysql://localhost/mydb")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

DataSet转换

转换(transformations)将一个DataSet转成另外一个DataSet,Flink提供了十分丰盛的转换操作符。具体应用如下:

Map

一进一出

  DataSource<String> source = env.fromElements("I", "like", "flink");
        source.map(new MapFunction<String, String>() {
            @Override
            // 将数据转为大写
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        }).print();

FlatMap

输出一个元素,产生0个、1个或多个元素

stringDataSource
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split(" ");
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);

MapPartition

性能和Map函数类似,只是MapPartition操作是在DataSet中基于分区对数据进行解决,函数调用中会依照分区将数据通过Iteator的模式传入,每个分区中的元素数与并行度无关,并返回任意数量的后果值。

 source.mapPartition(new MapPartitionFunction<String, Long>() {
            @Override
            public void mapPartition(Iterable<String> values, Collector<Long> out) throws Exception {
                long c = 0;
                for (String value : values) {
                    c++;
                }
                //输入每个分区元素个数
                out.collect(c);
            }
        }).print();

Filter

过滤数据,如果返回true则保留数据,如果返回false则过滤掉

DataSource<Long> source = env.fromElements(1L, 2L, 3L,4L,5L);
        source.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0;
            }
        }).print();

Project

仅能用在Tuple类型的数据集,投影操作,选取Tuple数据的字段的子集

  DataSource<Tuple3<Long, Integer, String>> source = env.fromElements(
                Tuple3.of(1L, 20, "tom"), 
                Tuple3.of(2L, 25, "jack"), 
                Tuple3.of(3L, 22, "bob"));
        // 去第一个和第三个元素
        source.project(0, 2).print();

Reduce

通过两两合并,将数据集中的元素合并成一个元素,能够在整个数据集上应用,也能够在分组之后的数据集上应用。

DataSource<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("Flink", 1),
                Tuple2.of("Flink", 1),
                Tuple2.of("Hadoop", 1),
                Tuple2.of("Spark", 1),
                Tuple2.of("Flink", 1));
        source
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        }).print();

ReduceGroup

将数据集中的元素合并成一个元素,能够在整个数据集上应用,也能够在分组之后的数据集上应用。reduce函数的输出值是一个分组元素的Iterable。

DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink", 1L),
                Tuple2.of("Flink", 1L),
                Tuple2.of("Hadoop", 1L),
                Tuple2.of("Spark", 1L),
                Tuple2.of("Flink", 1L));
        source
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String,Long>, Tuple2<String,Long>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<String, Long>> values, Collector<Tuple2<String, Long>> out) throws Exception {
                        Long sum = 0L;
                        String word = "";
                        for(Tuple2<String, Long> value:values){
                            sum += value.f1;
                            word = value.f0;

                        }
                        out.collect(Tuple2.of(word,sum));
                    }
                }).print();

Aggregate

通过Aggregate Function将一组元素值合并成单个值,能够在整个DataSet数据集上应用,也能够在分组之后的数据集上应用。仅仅用在Tuple类型的数据集上,次要包含Sum,Min,Max函数

DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink", 1L),
                Tuple2.of("Flink", 1L),
                Tuple2.of("Hadoop", 1L),
                Tuple2.of("Spark", 1L),
                Tuple2.of("Flink", 1L));
        source
                .groupBy(0)
                .aggregate(SUM,1)// 按第2个值求和
                 .print();

Distinct

DataSet数据集元素去重

DataSource<Tuple> source = env.fromElements(Tuple1.of("Flink"),Tuple1.of("Flink"),Tuple1.of("hadoop"));
        source.distinct(0).print();// 依照tuple的第一个字段去重
// 后果:
(Flink)
(hadoop)

Join

默认的join是产生一个Tuple2数据类型的DataSet,关联的key能够通过key表达式、Key-selector函数、字段地位以及CaseClass字段指定。对于两个Tuple类型的数据集能够通过字段地位进行关联,右边数据集的字段通过where办法指定,左边数据集的字段通过equalTo()办法指定。比方:

DataSource<Tuple2<Integer,String>> source1 = env.fromElements(
                Tuple2.of(1,"jack"),
                Tuple2.of(2,"tom"),
                Tuple2.of(3,"Bob"));
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("order1", 1),
                Tuple2.of("order2", 2),
                Tuple2.of("order3", 3));
        source1.join(source2).where(0).equalTo(1).print();

能够在关联的过程中指定自定义Join Funciton, Funciton的入参为右边数据集中的数据元素和左边数据集的中的数据元素所组成的元祖,并返回一个通过计算解决后的数据。如:

// 用户id,购买商品名称,购买商品数量
        DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1,"item1",2),
                Tuple3.of(2,"item2",3),
                Tuple3.of(3,"item3",4));
        //商品名称与商品单价
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1", 10),
                Tuple2.of("item2", 20),
                Tuple2.of("item3", 15));
        source1.join(source2)
                .where(1)
                .equalTo(0)
                .with(new JoinFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple3<Integer,String,Double>>() {
                    // 用户每种商品购物总金额
                    @Override
                    public Tuple3<Integer, String, Double> join(Tuple3<Integer, String, Integer> first, Tuple2<String, Integer> second) throws Exception {
                        return Tuple3.of(first.f0,first.f1,first.f2 * second.f1.doubleValue());
                    }
                }).print();

为了可能更好地疏导Flink底层去正确地解决数据集,能够在DataSet数据集关联中,通过Size Hint标记数据集的大小,Flink能够依据用户给定的hint(提醒)调整计算策略,例如能够应用joinWithTiny或joinWithHuge提醒第二个数据集的大小。示例如下:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result1 =
            // 提醒第二个数据集为小数据集
            input1.joinWithTiny(input2)
                  .where(0)
                  .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result2 =
            // h提醒第二个数据集为大数据集
            input1.joinWithHuge(input2)
                  .where(0)
                  .equalTo(0);

Flink的runtime能够应用多种形式执行join。在不同的状况下,每种可能的形式都会胜过其余形式。零碎会尝试主动抉择一种正当的办法,然而容许用户手动抉择一种策略, 能够让Flink更加灵便且高效地执行Join操作。

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
// 播送第一个输出并从中构建一个哈希表,第二个输出将对其进行探测,实用于第一个数据集十分小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
// 播送第二个输出并从中构建一个哈希表,第一个输出将对其进行探测,实用于第二个数据集十分小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_SECOND)
            .where("id").equalTo("key");
// 将两个数据集从新分区,并将第一个数据集转换成哈希表,实用于第一个数据集比第二个数据集小,但两个数据集都比拟大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_FIRST)
            .where("id").equalTo("key");
// 将两个数据集从新分区,并将第二个数据集转换成哈希表,实用于第二个数据集比第一个数据集小,但两个数据集都比拟大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_SECOND)
            .where("id").equalTo("key");
// 将两个数据集从新分区,并将每个分区排序,实用于两个数据集都曾经排好序的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");
// 相当于不指定,有零碎自行处理
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.OPTIMIZER_CHOOSES)
            .where("id").equalTo("key");

OuterJoin

OuterJoin对两个数据集进行外关联,蕴含left、right、full outer join三种关联形式,别离对应DataSet API中的leftOuterJoin、rightOuterJoin以及fullOuterJoin办法。留神外连贯仅实用于Java 和 Scala DataSet API.

应用形式简直和join相似:

//左外连贯
source1.leftOuterJoin(source2).where(1).equalTo(0);
//右外链接
source1.rightOuterJoin(source2).where(1).equalTo(0);

此外,外连贯也提供了相应的关联算法提醒,能够跟据左右数据集的散布状况抉择适合的优化策略,晋升数据处理的效率。上面代码能够参考下面join的解释。

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result1 =
      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");

DataSet<Tuple2<SomeType, AnotherType> result2 =
      input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

对于外连贯的关联算法,与join有所不同。每种外连贯只反对局部算法。如下:

  • LeftOuterJoin反对:

    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE
    • RightOuterJoin反对:

      • OPTIMIZER_CHOOSES
      • BROADCAST_HASH_FIRST
      • REPARTITION_HASH_FIRST
      • REPARTITION_SORT_MERGE
    • FullOuterJoin反对:

      • OPTIMIZER_CHOOSES
      • REPARTITION_SORT_MERGE

CoGroup

CoGroup是对分组之后的DataSet进行join操作,将两个DataSet数据汇合并在一起,会先各自对每个DataSet依照key进行分组,而后将分组之后的DataSet传输到用户定义的CoGroupFunction,将两个数据集依据雷同的Key记录组合在一起,雷同Key的记录会寄存在一个Group中,如果指定key仅在一个数据集中有记录,则co-groupFunction会将这个Group与空的Group关联。

// 用户id,购买商品名称,购买商品数量
        DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1,"item1",2),
                Tuple3.of(2,"item2",3),
                Tuple3.of(3,"item2",4));
        //商品名称与商品单价
        DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1", 10),
                Tuple2.of("item2", 20),
                Tuple2.of("item3", 15));

        source1.coGroup(source2)
                .where(1)
                .equalTo(0)
                .with(new CoGroupFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple2<String,Double>>() {
                    // 每个Iterable存储的是分好组的数据,即雷同key的数据组织在一起
                    @Override
                    public void coGroup(Iterable<Tuple3<Integer, String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Double>> out) throws Exception {
                        //存储每种商品购买数量
                        int sum = 0;
                        for(Tuple3<Integer, String, Integer> val1:first){
                        sum += val1.f2;

                    }
                    // 每种商品数量 * 商品单价
                    for(Tuple2<String, Integer> val2:second){
                        out.collect(Tuple2.of(val2.f0,sum * val2.f1.doubleValue()));

                        }
                    }
                }).print();

Cross

将两个数据汇合并成一个数据集,返回被连贯的两个数据集所有数据行的笛卡儿积,返回的数据行数等于第一个数据集中合乎查问条件的数据行数乘以第二个数据集中合乎查问条件的数据行数。Cross操作能够通过利用Cross Funciton将关联的数据汇合并成指标格局的数据集,如果不指定Cross Funciton则返回Tuple2类型的数据集。Cross操作是计算密集型的算子,倡议在应用时加上算法提醒,比方crossWithTiny() and crossWithHuge().

//[id,x,y],坐标值
        DataSet<Tuple3<Integer, Integer, Integer>> coords1 = env.fromElements(
                Tuple3.of(1, 20, 18),
                Tuple3.of(2, 15, 20),
                Tuple3.of(3, 25, 10));
        DataSet<Tuple3<Integer, Integer, Integer>> coords2 = env.fromElements(
                Tuple3.of(1, 20, 18),
                Tuple3.of(2, 15, 20),
                Tuple3.of(3, 25, 10));
        // 求任意两点之间的欧氏间隔

        coords1.cross(coords2)
                .with(new CrossFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Double>>() {
                    @Override
                    public Tuple3<Integer, Integer, Double> cross(Tuple3<Integer, Integer, Integer> val1, Tuple3<Integer, Integer, Integer> val2) throws Exception {
                        // 计算欧式间隔
                        double dist = sqrt(pow(val1.f1 - val2.f1, 2) + pow(val1.f2 - val2.f2, 2));
                        // 返回两点之间的欧式间隔
                        return Tuple3.of(val1.f0,val2.f0,dist);
                    }
                }).print();

Union

合并两个DataSet数据集,两个数据集的数据元素格局必须雷同,多个数据集能够间断合并.

     DataSet<Tuple2<String, Integer>> vals1 = env.fromElements(
                Tuple2.of("jack",20),
                Tuple2.of("Tom",21));
        DataSet<Tuple2<String, Integer>> vals2 = env.fromElements(
                Tuple2.of("Robin",25),
                Tuple2.of("Bob",30));
        DataSet<Tuple2<String, Integer>> vals3 = env.fromElements(
                Tuple2.of("Jasper",24),
                Tuple2.of("jarry",21));
        DataSet<Tuple2<String, Integer>> unioned = vals1
                .union(vals2)
                .union(vals3);
        unioned.print();

Rebalance

对数据集中的数据进行均匀散布,使得每个分区上的数据量雷同,加重数据歪斜造成的影响,留神仅仅是Map-like 类型的算子(比方map,flatMap)才能够用在Rebalance算子之后。

DataSet<String> in = // [...]
// rebalance DataSet,而后应用map算子.
DataSet<Tuple2<String, String>> out = in.rebalance()
                                        .map(new Mapper());

Hash-Partition

依据给定的Key进行Hash分区,key雷同的数据会被放入同一个分区内。能够应用通过元素的地位、元素的名称或者key selector函数指定key。

DataSet<Tuple2<String, Integer>> in = // [...]
// 依据第一个值进行hash分区,而后应用 MapPartition转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
                                        .mapPartition(new PartitionMapper());

Range-Partition

依据给定的Key进行Range分区,key雷同的数据会被放入同一个分区内。能够应用通过元素的地位、元素的名称或者key selector函数指定key。

DataSet<Tuple2<String, Integer>> in = // [...]
// 依据第一个值进行Range分区,而后应用 MapPartition转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
                                        .mapPartition(new PartitionMapper());

Custom Partitioning

除了下面的分区外,还反对自定义分区函数。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(partitioner, key)
                            .mapPartition(new PartitionMapper());

Sort Partition

在本地对DataSet数据集中的所有分区依据指定字段进行重排序,排序形式通过Order.ASCENDING以及Order.DESCENDING关键字指定。反对指定多个字段进行分区排序,如下:

DataSet<Tuple2<String, Integer>> in = // [...]
// 依照第一个字段升序排列,第二个字段降序排列.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
                                        .sortPartition(0, Order.DESCENDING)
                                        .mapPartition(new PartitionMapper());

First-n

返回数据集的n条随机后果,能够利用于惯例类型数据集、Grouped类型数据集以及排序数据集上。

DataSet<Tuple2<String, Integer>> in = // [...]
// 返回数据集中的任意5个元素
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
//返回每个分组内的任意两个元素
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
                                          .first(2);
// 返回每个分组内的前三个元素
// 分组后的数据集依照第二个字段进行升序排序
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
                                          .sortGroup(1, Order.ASCENDING)
                                          .first(3);

MinBy / MaxBy

从数据集中返回指定字段或组合对应最小或最大的记录,如果抉择的字段具备多个雷同值,则在汇合中随机抉择一条记录返回。

DataSet<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("jack",20),
                Tuple2.of("Tom",21),
                Tuple2.of("Robin",25),
                Tuple2.of("Bob",30));
// 依照第2个元素比拟,找出第二个元素为最小值的那个tuple
// 在整个DataSet上应用minBy
ReduceOperator<Tuple2<String, Integer>> tuple2Reduce = source.minBy(1);
tuple2Reduce.print();// 返回(jack,20)

// 也能够在分组的DataSet上应用minBy
source.groupBy(0) // 依照第一个字段进行分组
      .minBy(1)  // 找出每个分组内的依照第二个元素为最小值的那个tuple
      .print();

播送变量

基本概念

播送变量是分布式计算框架中常常会用到的一种数据共享形式。其次要作用是将小数据集采纳网络传输的形式,在每台机器上保护一个只读的缓存变量,所在的计算节点实例均能够在本地内存中间接读取被播送的数据集,这样可能防止在数据计算过程中屡次通过近程的形式从其余节点中读取小数据集,从而晋升整体工作的计算性能。

播送变量能够了解为一个公共的共享变量,能够把DataSet播送进来,这样不同的task都能够读取该数据,播送的数据只会在每个节点上存一份。如果不应用播送变量,则会在每个节点上的task中都要复制一份dataset数据集,导致节约内存。

应用播送变量的根本步骤如下:

//第一步创立须要播送的数据集
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 第三步拜访汇合模式的播送变量数据集
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }
    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 第二步播送数据集

从下面的代码能够看出,DataSet API反对在RichFunction接口中通过RuntimeContext读取到播送变量。

首先在RichFunction中实现Open()办法,而后调用getRuntimeContext()办法获取利用的RuntimeContext,接着调用getBroadcastVariable()办法通过播送名称获取播送变量。同时Flink间接通过collect操作将数据集转换为本地Collection。须要留神的是,Collection对象的数据类型必须和定义的数据集的类型保持一致,否则会呈现类型转换问题。

注意事项:

  • 因为播送变量的内容是保留在每个节点的内存中,所以播送变量数据集不易过大。
  • 播送变量初始化之后,不反对批改,这样方能保障每个节点的数据都是一样的。
  • 如果多个算子都要应用一份数据集,那么须要在多个算子的前面别离注册播送变量。
  • 只能在批处理中应用播送变量。

应用Demo

public class BroadcastExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer,String>> RawBroadCastData = new ArrayList<>();

        RawBroadCastData.add(new Tuple2<>(1,"jack"));
        RawBroadCastData.add(new Tuple2<>(2,"tom"));
        RawBroadCastData.add(new Tuple2<>(3,"Bob"));

        // 模仿数据源,[userId,userName]
        DataSource<Tuple2<Integer, String>> userInfoBroadCastData = env.fromCollection(RawBroadCastData);

        ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();

        rawUserAount.add(new Tuple2<>(1,1000.00));
        rawUserAount.add(new Tuple2<>(2,500.20));
        rawUserAount.add(new Tuple2<>(3,800.50));

        // 解决数据:用户id,用户购买金额 ,[UserId,amount]
        DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);

        // 转换为map汇合类型的DataSet
        DataSet<HashMap<Integer, String>> userInfoBroadCast = userInfoBroadCastData.map(new MapFunction<Tuple2<Integer, String>, HashMap<Integer, String>>() {

            @Override
            public HashMap<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
                HashMap<Integer, String> userInfo = new HashMap<>();
                userInfo.put(value.f0, value.f1);
                return userInfo;
            }
        });

       DataSet<String> result = userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
            // 寄存播送变量返回的list汇合数据
            List<HashMap<String, String>> broadCastList = new ArrayList<>();
            // 寄存播送变量的值
            HashMap<String, String> allMap = new HashMap<>();

            @Override

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //获取播送数据,返回的是一个list汇合
                this.broadCastList = getRuntimeContext().getBroadcastVariable("userInfo");
                for (HashMap<String, String> value : broadCastList) {
                    allMap.putAll(value);
                }
            }

            @Override
            public String map(Tuple2<Integer, Double> value) throws Exception {
                String userName = allMap.get(value.f0);
                return "用户id: " + value.f0 + " | "+ "用户名: " + userName + " | " + "购买金额: " + value.f1;
            }
        }).withBroadcastSet(userInfoBroadCast, "userInfo");

        result.print();
    }

}

分布式缓存

基本概念

Flink提供了一个分布式缓存(distributed cache),相似于Hadoop,以使文件在本地可被用户函数的并行实例拜访。分布式缓存的工作机制是为程序注册一个文件或目录(本地或者近程文件系统,如HDFS等),通过ExecutionEnvironment注册一个缓存文件,并起一个别名。当程序执行的时候,Flink会主动把注册的文件或目录复制到所有TaskManager节点的本地文件系统,用户能够通过注册是起的别名来查找文件或目录,而后在TaskManager节点的本地文件系统拜访该文件。

分布式缓存的应用步骤:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 注册一个HDFS文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 拜访数据
getRuntimeContext().getDistributedCache().getFile("hdfsFile");

获取缓存文件的形式和播送变量类似,也是实现RichFunction接口,并通过RichFunction接口取得RuntimeContext对象,而后通过RuntimeContext提供的接口获取对应的本地缓存文件。

应用Demo

public class DistributeCacheExample {
    public static void main(String[] args) throws Exception {
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        /**
         *  注册一个本地文件
         *   文件内容为:
         *   1,"jack"
         *   2,"tom"
         *   3,"Bob"
         */
        env.registerCachedFile("file:///E://userinfo.txt", "localFileUserInfo", true);

        ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();

        rawUserAount.add(new Tuple2<>(1,1000.00));
        rawUserAount.add(new Tuple2<>(2,500.20));
        rawUserAount.add(new Tuple2<>(3,800.50));

        // 解决数据:用户id,用户购买金额 ,[UserId,amount]
        DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);

        DataSet<String> result= userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
            // 保留缓存数据
            HashMap<String, String> allMap = new HashMap<String, String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 获取分布式缓存的数据
                File userInfoFile = getRuntimeContext().getDistributedCache().getFile("localFileUserInfo");
                List<String> userInfo = FileUtils.readLines(userInfoFile);
                for (String value : userInfo) {

                    String[] split = value.split(",");
                    allMap.put(split[0], split[1]);
                }

            }

            @Override
            public String map(Tuple2<Integer, Double> value) throws Exception {
                String userName = allMap.get(value.f0);

                return "用户id: " + value.f0 + " | " + "用户名: " + userName + " | " + "购买金额: " + value.f1;
            }
        });

        result.print();

    }
}

小结

本文次要解说了Flink DataSet API的根本应用。首先介绍了一个DataSet API的WordCount案例,接着介绍了DataSet API的数据源与Sink操作,以及根本的应用。而后对每一个转换操作进行了具体的解释,并给出了具体的应用案例。最初解说了播送变量和分布式缓存的概念,并就如何应用这两种高级性能,提供了残缺的Demo案例。

关注我的公众号:大数据技术与数仓,收费支付百G大数据视频材料与书籍

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理