Flink为了可能解决有边界的数据集和无边界的数据集,提供了对应的DataSet API和DataStream API。咱们能够开发对应的Java程序或者Scala程序来实现相应的性能。上面举例了一些DataSet API中的根本的算子。

上面咱们通过具体的代码来为大家演示每个算子的作用。

1、Map、FlatMap与MapPartition

//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<String>();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource<String> text = env.fromCollection(data);DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {    public List<String> map(String data) throws Exception {        String[] words = data.split(" ");                //创立一个List        List<String> result = new ArrayList<String>();        for(String w:words){            result.add(w);        }        return result;    }});mapData.print();System.out.println("*****************************************");DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {    public void flatMap(String data, Collector<String> collection) throws Exception {        String[] words = data.split(" ");        for(String w:words){            collection.collect(w);        }    }});flatMapData.print();System.out.println("*****************************************");/*    new MapPartitionFunction<String, String>    第一个String:示意分区中的数据元素类型    第二个String:示意解决后的数据元素类型*/DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {    public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {        //针对分区进行操作的益处是:比方要进行数据库的操作,一个分区只须要创立一个Connection        //values中保留了一个分区的数据         Iterator<String> it = values.iterator();        while (it.hasNext()) {            String next = it.next();            String[] split = next.split(" ");            for (String word : split) {                out.collect(word);            }        }        //敞开链接    }});mapPartitionData.print();

2、Filter与Distinct

//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<String>();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource<String> text = env.fromCollection(data);DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {    public void flatMap(String data, Collector<String> collection) throws Exception {        String[] words = data.split(" ");        for(String w:words){            collection.collect(w);        }    }});//去掉反复的单词flatMapData.distinct().print();System.out.println("*********************");//选出长度大于3的单词flatMapData.filter(new FilterFunction<String>() {        public boolean filter(String word) throws Exception {        int length = word.length();        return length>3?true:false;    }}).print();

3、Join操作

//获取运行的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创立第一张表:用户ID  姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创立第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查问:用户ID  姓名  所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);table1.join(table2).where(0).equalTo(0)/*第一个Tuple2<Integer,String>:示意第一张表 * 第二个Tuple2<Integer,String>:示意第二张表 * Tuple3<Integer,String, String>:多表join连贯查问后的返回后果   */                           .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,            Tuple2<Integer, String> table2) throws Exception {        return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);    } }).print();

4、笛卡尔积

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创立第一张表:用户ID  姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创立第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查问:用户ID  姓名  所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);//生成笛卡尔积table1.cross(table2).print();

5、First-N

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//这里的数据是:员工姓名、薪水、部门号DataSet<Tuple3<String, Integer,Integer>> grade =         env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),                         new Tuple3<String, Integer,Integer>("Mary",1500,20),                         new Tuple3<String, Integer,Integer>("Mike",1200,30),                         new Tuple3<String, Integer,Integer>("Jerry",2000,10));//依照插入程序取前三条记录grade.first(3).print();System.out.println("**********************");//先依照部门号排序,在依照薪水排序grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();System.out.println("**********************");//依照部门号分组,求每组的第一条记录grade.groupBy(2).first(1).print();

6、外链接操作

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创立第一张表:用户ID  姓名ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创立第二张表:用户ID 所在的城市ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查问:用户ID  姓名  所在的程序DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);//左外连贯table1.leftOuterJoin(table2).where(0).equalTo(0)      .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,                Tuple2<Integer, String> table2) throws Exception {            // 左外连贯示意等号右边的信息会被蕴含            if(table2 == null){                return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);            }else{                return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);            }        }    }).print();System.out.println("***********************************");//右外连贯table1.rightOuterJoin(table2).where(0).equalTo(0)      .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,                Tuple2<Integer, String> table2) throws Exception {            //右外链接示意等号左边的表的信息会被蕴含            if(table1 == null){                return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);            }else{                return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);            }        }    }).print();System.out.println("***********************************");//全外连贯table1.fullOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)            throws Exception {        if(table1 == null){            return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);        }else if(table2 == null){            return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);        }else{            return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);        }    }    }).print();