关于云计算:Flink处理函数实战之二ProcessFunction类

40次阅读

共计 5685 个字符,预计需要花费 15 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

Flink 处理函数实战系列链接

  1. 深刻理解 ProcessFunction 的状态操作 (Flink-1.10);
  2. ProcessFunction;
  3. KeyedProcessFunction 类;
  4. ProcessAllWindowFunction(窗口解决);
  5. CoProcessFunction(双流解决);

对于处理函数 (Process Function)

如下图,在惯例的业务开发中,SQL、Table API、DataStream API 比拟罕用,处于 Low-level 的 Porcession 绝对用得较少,从本章开始,咱们一起通过实战来相熟处理函数 (Process Function),看看这一系列的低级算子能够带给咱们哪些能力?

对于 ProcessFunction 类

处理函数有很多种,最根底的应该 ProcessFunction 类,来看看它的类图,可见有 RichFunction 的个性 open、close,而后本人有两个重要的办法 processElement 和 onTimer:

罕用个性如下所示:

  1. 解决单个元素;
  2. 拜访工夫戳;
  3. 旁路输入;

接下来写两个利用体验上述性能;

版本信息

  1. 开发环境操作系统:MacBook Pro 13 寸,macOS Catalina 10.15.3
  2. 开发工具:IDEA ULTIMATE 2018.3
  3. JDK:1.8.0_211
  4. Maven:3.6.0
  5. Flink:1.9.2

源码下载

如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:

名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址 (https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址 (ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定

这个 git 我的项目中有多个文件夹,本章的利用在 <font color=”blue”>flinkstudy</font> 文件夹下,如下图红框所示:

创立工程

执行以下命令创立一个 flink-1.9.2 的利用工程:

mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2

按提醒输出 groupId:com.bolingcavalry,architectid:flinkdemo

第一个 demo

第一个 demo 用来体验以下两个个性:

  1. 解决单个元素;
  2. 拜访工夫戳;

创立 Simple.java,内容如下:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Simple {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 并行度为 1
        env.setParallelism(1);

        // 设置数据源,一共三个元素
        DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {for(int i=1; i<4; i++) {

                    String name = "name" + i;
                    Integer value = i;
                    long timeStamp = System.currentTimeMillis();

                    // 将将数据和工夫戳打印进去,用来验证数据
                    System.out.println(String.format("source,%s, %d, %d\n",
                            name,
                            value,
                            timeStamp));

                    // 发射一个元素,并且戴上了工夫戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

                    // 为了让每个元素的工夫戳不一样,每发射一次就延时 10 毫秒
                    Thread.sleep(10);
                }
            }

            @Override
            public void cancel() {}
        });


        // 过滤值为奇数的元素
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        // f1 字段为奇数的元素不会进入下一个算子
                        if(0 == value.f1 % 2) {
                            out.collect(String.format("processElement,%s, %d, %d\n",
                                    value.f0,
                                    value.f1,
                                    ctx.timestamp()));
                        }
                    }
                });

        // 打印后果,证实每个元素的 timestamp 的确能够在 ProcessFunction 中获得
        mainDataStream.print();

        env.execute("processfunction demo : simple");
    }
}

这里对上述代码做个介绍:

  1. 创立一个数据源,每个 10 毫秒收回一个元素,一共三个,类型是 Tuple2,f0 是个字符串,f1 是整形,每个元素都带工夫戳;
  2. 数据源收回元素时,提前把元素的 f0、f1、工夫戳打印进去,和前面的数据核查是否统一;
  3. 在前面的解决中,创立了 ProcessFunction 的匿名子类,外面能够解决上游发来的每个元素,并且还能获得每个元素的工夫戳 (这个能力很重要),而后将 f1 字段为奇数的元素过滤掉;
  4. 最初将 ProcessFunction 解决过的数据打印进去,验证处理结果是否合乎预期;

间接执行 Simple 类,后果如下,可见过滤和提取工夫戳都胜利了:

第二个 demo

第二个 demo 是实现旁路输入 (Side Outputs),对于一个 DataStream 来说,能够通过旁路输入将数据输入到其余算子中去,而不影响原有的算子的解决,上面来演示旁路输入:

创立 SideOutput 类:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;

public class SideOutput {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度为 1
        env.setParallelism(1);

        // 定义 OutputTag
        final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

        // 创立一个 List,外面有两个 Tuple2 元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 2));
        list.add(new Tuple2("ccc", 3));

        // 通过 List 创立 DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        // 所有元素都进入 mainDataStream,f1 字段为奇数的元素进入 SideOutput
        SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

                        // 进入主流程的下一个算子
                        out.collect("main, name :" + value.f0 + ", value :" + value.f1);

                        //f1 字段为奇数的元素进入 SideOutput
                        if(1 == value.f1 % 2) {ctx.output(outputTag, "side, name :" + value.f0 + ", value :" + value.f1);
                        }
                    }
                });

        // 禁止 chanin,这样能够在页面上看清楚原始的 DAG
        mainDataStream.disableChaining();

        // 获得旁路数据
        DataStream<String> sideDataStream = mainDataStream.getSideOutput(outputTag);

        mainDataStream.print();
        sideDataStream.print();

        env.execute("processfunction demo : sideoutput");
    }
}

这里对上述代码做个介绍:

  1. 数据源是个汇合,类型是 Tuple2,f0 字段是字符串,f1 字段是整形;
  2. ProcessFunction 的匿名子类中,将每个元素的 f0 和 f1 拼接成字符串,发给主流程算子,再将 f1 字段为奇数的元素发到旁路输入;
  3. 数据源收回元素时,提前把元素的 f0、f1、工夫戳打印进去,和前面的数据核查是否统一;
  4. 将主流程和旁路输入的元素都打印进去,验证处理结果是否合乎预期;

执行 SideOutput 看后果,如下图,main 前缀的都是主流程算子,一共三条记录,side 前缀的是旁路输入,只有 f1 字段为奇数的两条记录,合乎预期:

下面的操作都是在 IDEA 上执行的,还能够将 flink 独自部署,再将上述工程构建成 jar,提交到 flink 的 jobmanager,可见 DAG 如下:


至此,处理函数中最简略的 ProcessFunction 类的学习和实战就实现了,接下来的文章咱们会尝试更多了类型的处理函数;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

正文完
 0