关于程序员:Flink-Collector-Output-接口源码解析

59次阅读

共计 12948 个字符,预计需要花费 33 分钟才能阅读完成。

Flink Collector Output 接口源码解析

在 Flink 中 Collector 接口次要用于 operator 发送(输入)元素,Output 接口是对 Collector 接口的扩大,减少了发送 WaterMark 的性能,在 Flink 外面只有波及到数据的传递都必须实现这两个接口,上面就来梳理一下这些接口的源码。

Output Collector UML 图

Collector 接口只有 2 个办法:

  1. collect(T record) 用于失常流输入数据。
  2. close() 敞开 Output,如果任何数据被缓冲,则该数据将被刷新。

Output 接口有 4 个办法:

  1. emitWatermark(Watermark mark) 从 operator 收回 Watermark。此水印将播送到所有上游所有 operator。
  2. emitWatermarkStatus(WatermarkStatus watermarkStatus) 发送水印状态。
  3. collect(OutputTag<X> outputTag, StreamRecord<X> record) 发送数据,这个办法和 Collector 接口中的 collect 办法作用是一样的,然而这个 collect 办法多了一个 OutputTag 参数,也就是说这个办法次要用在侧流输入场景下。
  4. emitLatencyMarker(LatencyMarker latencyMarker) 发送 LatencyMarker 它是一种非凡的数据,用来测量数据的提早。

WatermarkGaugeExposingOutput 接口只有 1 个办法:

  1. getWatermarkGauge() 用来获取 WatermarkGauge,它是测量最初收回的水印。

咱们明天次要说的是 collect 办法,也就是发送实在数据的办法,Output 接口的实现类是十分多的,因为只有你想发送数据就必须实现这个接口,那在泛滥的实现类里有几个是比拟重要的,上面我会挑出 7 个常见的实现类进行介绍,咱们先来看上面的 Output 实现类的 UML 类图。

Output 实现类 UML 图

能够看到 TimestampedCollector 和 CountingOutput 是间接实现了 Output 接口的,ChainingOutput,RecordWriterOutput,BroadcastingOutputCollector 这三个类是实现了 WatermarkGaugeExposingOutput 接口,次要是为了显示以后输入的 Watermark 值,WatermarkGaugeExposingOutput 又继承了 Output 接口。

依据其应用场景的不同,咱们能够把这些 Output 分成五大类:

同 operatorChain

  • ChainingOutput
  • CopyingChainingOutput

跨 operatorChain

  • RecordWriterOutput

统计 Metrics

  • CountingOutput

播送

  • BroadcastingOutputCollector
  • CopyingBroadcastingOutputCollector

工夫戳

  • TimestampedCollector

OperatorChain 图

这是一张 OperatorChain 和 Output 的关系图,其中虚线代表的是同一个 operatorChain 之间的数据传递,应用的是 ChainingOutput,实线代表的是跨 operatorChain 之间数据传递,应用的是 RecordWriterOutput。

为了更好的展现每一个 Output 的应用场景,以及把整个数据传递流程串联起来,上面来看一个简略的 Demo。

Demo

上图中的 Kafka Source 和 Map 算子 chain 在一起造成了一个 operatorChain,其中 Kafka Source 又叫做 Head Operator,Map 算子又叫做 Chain Operator,前面的 Process,两个 Print 算子 chain 在一起造成了另外一个 operatorChain,其中 Process 算子又叫做 Head Operator,Print 算子又叫做 Chain Operator。

那 Kafka Source -> Map 之间的数据传递用的则是 ChainingOutput,对应着上图中的虚线局部,Map -> Process 之间的数据传递应用的是 RecordWriterOutput,对应着上图中的实线局部。

另外从下面的分类能够看进去,很多 Output 都有一个对应的 CopyingXXXOutput,比方同一个 operatorChain 内数据传递是有 ChainingOutput 和 CopyingChainingOutput 两个实现类的,那这两者之间又有什么区别和分割呢?咱们接着往下面看。

同一个 operatorChain 之间(KafkaSource -> Map)

AsyncDataOutputToOutput#emitRecord

@Override
        public void emitRecord(StreamRecord<T> streamRecord) {
              // 更新 metric
            numRecordsOut.inc();
            metricGroup.recordEmitted(streamRecord.getTimestamp());
              // 这里是 CopyingChainingOutput
            output.collect(streamRecord);
        }

在 Kafka Source Operator 中发送数据的对象是 AsyncDataOutputToOutput,它会持有一个 Output,这里的 Output 实际上是 CopyingChainingOutput 而不是 ChainingOutput,通过调用 collect 发送数据。

CopyingChainingOutput#collect

@Override
    public void collect(StreamRecord<T> record) {
          // 如果是失常的流 outputTag 是空的所以会间接走上面的逻辑
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
                
        pushToOperator(record);
    }

失常的流是没有 outputTag 的(只有侧流输入才有)所以会间接走 pushToOperator 办法。

CopyingChainingOutput#pushToOperator

@Override
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator (and Serializer) expects.
            @SuppressWarnings("unchecked")
              // 浅拷贝
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
                        // 更新 metric
            numRecordsIn.inc();
              // 对 record 做深拷贝
            StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
            input.setKeyContextElement(copy);
              // 调用上游的 processElement 办法
            input.processElement(copy);
        } catch (ClassCastException e) {if (outputTag != null) {
                // Enrich error message
                ClassCastException replace =
                        new ClassCastException(
                                String.format(
                                        "%s. Failed to push OutputTag with id'%s'to operator."
                                                + "This can occur when multiple OutputTags with different types"
                                                + "but identical names are being used.",
                                        e.getMessage(), outputTag.getId()));

                throw new ExceptionInChainedOperatorException(replace);
            } else {throw new ExceptionInChainedOperatorException(e);
            }
        } catch (Exception e) {throw new ExceptionInChainedOperatorException(e);
        }
    }

pushToOperator 办法的逻辑很简略:

  1. 对 record 浅拷贝。
  2. 更新 metrics。
  3. 对 record 做深拷贝。
  4. 设置 Key 的上下文。
  5. 调用 chain operator 的 processElement 办法解决数据。

能够看到在深拷贝的时候是须要对数据进行序列化的,这跟咱们狭义上了解的 Flink 在 JobGraph 阶段次要优化了 operatorChain,从而缩小数据在网络传输中序列化和反序列的开销是不太统一的,难道这句话是错的吗?

那咱们就来看下 ChainingOutput 的 pushToOperator 办法和 CopyingChainingOutput 的 pushToOperator 有什么区别呢?

ChainingOutput#pushToOperator

protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator expects.
            @SuppressWarnings("unchecked")
              // 浅拷贝
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
                        // 更新 metric
            numRecordsIn.inc();
              // 设置 key 上下文
            input.setKeyContextElement(castRecord);
              // 调用下一个算子解决数据
            input.processElement(castRecord);
        } catch (Exception e) {throw new ExceptionInChainedOperatorException(e);
        }
    }

你会发现 ChainingOutput 的 pushToOperator 办法和 CopyingChainingOutput 的简直统一,惟一的区别就是这里没有对 record 做深拷贝,仅做了一个浅拷贝,显然,这种浅拷贝的形式性能是更高的,那是由什么决定应用 ChainingOutput 还是 CopyingChainingOutput 呢?其实是通过 env.getConfig().enableObjectReuse() 这个配置决定的,默认状况下 objectReuse 是 false 也就是会应用 CopyingChainingOutput 如果开启了 objectReuse 则会应用 ChainingOutput,也就是说如果不开启 objectReuse 是不能齐全施展 operatorChain 优化成果的。

那既然 ChainingOutput 的性能更高,为什么默认不应用 ChainingOutput 呢?因为在某些场景下,开启 objectReuse 可能会带来安全性问题,所以就抉择了 CopyingChainingOutput 作为默认的 Output。

以后 operator 的输入是下一个 operator 的输出,所以这里的 input 是 StreamMap 对象,也就相当于是间接调用 StreamMap.processElement 办法来传输数据。

StreamMap#processElement

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
          // 先调用咱们本人的 map 逻辑
        output.collect(element.replace(userFunction.map(element.getValue())));
    }

在 processElement 办法外面会先调用 userFunction 的 map 办法,这里的 userFunction 其实就是咱们自定义的 map 算子的代码逻辑,而后把返回的后果通过 collect 办法发送到上游算子,在发送之前须要先更新相干的 Metric,所以这里的 output 其实是 CountingOutput。

CountingOutput#collect

@Override
    public void collect(StreamRecord<OUT> record) {
          // 更新 metric
        numRecordsOut.inc();
        // 发送数据
        output.collect(record);
    }

CountingOutput 对象次要的作用是更新 Metric, 而后再发送数据,因为 Map 是 operatorChain 的最初一个 operator,所以它持有的 Output 是 RecordWriterOutput 对象,也就是上图所说的实线传输数据。

不同 operatorChain 之间(Map -> Process)

RecordWriterOutput#collect

@Override
    public void collect(StreamRecord<OUT> record) {if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }

        pushToRecordWriter(record);
    }

在 RecordWriterOutput 的 collect 办法里又调用了 pushToRecordWriter 办法。

RecordWriterOutput#pushToRecordWriter

private <X> void pushToRecordWriter(StreamRecord<X> record) {serializationDelegate.setInstance(record);

        try {recordWriter.emit(serializationDelegate);
        } catch (IOException e) {throw new UncheckedIOException(e.getMessage(), e);
        }
    }

通过 recordWriter 的 emit 办法发送数据,因为是跨 operatorChain 的数据传输,并不像 operatorChain 之间数据传输那么简略,间接调用 Chain Operator 的 processElement 解决数据,而是上游先把数据写到 ResultPartition 里,而后上游算子通过 InputChannel 生产数据,这个过程就不在开展了,因为不是咱们明天探讨的重点。

OneInputStreamTask#StreamTaskNetworkOutput#emitRecord

@Override
        public void emitRecord(StreamRecord<IN> record) throws Exception {
              // 更新 metric
            numRecordsIn.inc();
            operator.setKeyContextElement(record);
            operator.processElement(record);
        }

上游生产到数据后通过 StreamTaskNetworkOutput 的 emitRecord 办法来发送数据,首先还是更新 Metrics,同样的情理,这里的 operator 示意的是上游算子 ProcessOperator,先要设置上下文 key,最初调用其 processElement 办法传递数据。

同一个 operatorChain 之间(Process -> Print)

从这里开始,其实还是同一个 operatorChain 内的数据传递,整体上的逻辑和下面同一个 operatorChain 之间数据传递的逻辑是一样的,所以上面有些中央就一笔带过了。

ProcessOperator#processElement

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
          // 设置工夫戳
        collector.setTimestamp(element);
          // 把 element 赋值给 context.element
        context.element = element;
          // 调用用户的代码逻辑
        userFunction.processElement(element.getValue(), context, collector);
          // 把 context.element 赋值为空
        context.element = null;
    }

在 processElement 办法里次要做了上面几件事件:

  1. 给 StreamRecord 对象设置工夫戳属性。
  2. 把 element 赋值给 context.element。
  3. 执行用户自定义的 ProcessFunction 的 processElement 办法。
  4. 把 context.element 设置为空。

其中步骤 3 是最重要的,所以咱们再来看下在 3 外面是如何传递数据的?

ProcessFunction#processElement

new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {
                                    @Override
                                    public void processElement(
                                            JasonLeePOJO value,
                                            ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,
                                            Collector<JasonLeePOJO> out)
                                            throws Exception {if (value.getName().equals("flink")) {
                                                // 失常流
                                            out.collect(value);
                                        } else if (value.getName().equals("spark")) {
                                                // 侧流
                                            ctx.output(test, value);
                                        }
                                    }
                                })

ProcessFunction 是咱们自定义的代码逻辑,次要实现了 processElement 办法,在这里会有两种不同的 Output,一种是失常的流输入,一种是侧流输入,失常的流输入用的是 TimestampedCollector,侧流输入用的是 ContextImpl 对象,它实现了 Context 抽象类的 output 办法。

TimestampedCollector#collect

@Override
    public void collect(T record) {output.collect(reuse.replace(record));
    }

在 TimestampedCollector 的 collect 办法里没做任何解决,间接调用 CountingOutput 的 collect 办法传递数据。

ContextImpl#output

@Override
        public <X> void output(OutputTag<X> outputTag, X value) {if (outputTag == null) {throw new IllegalArgumentException("OutputTag must not be null.");
            }
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

侧流和失常流略微不同的是它并没有实现 Output 接口,而是实现了 Context 对象,然而 output 办法里的 output 同样也是 CountingOutput。

CountingOutput#collect

@Override
    public void collect(StreamRecord<OUT> record) {numRecordsOut.inc();
        output.collect(record);
    }

CountingOutput 的 collect 里先是更新 Metrics,因为须要像上游播送数据,所以这里的 output 是 BroadcastingOutputCollector。

BroadcastingOutputCollector#collect

@Override
    public void collect(StreamRecord<T> record) {for (Output<StreamRecord<T>> output : outputs) {output.collect(record);
        }
    }

因为这是在同一个 operatorChain 内传递数据,所以这里的 output 是 CopyingChainingOutput。与 BroadcastingOutputCollector 对应的还有一个 CopyingBroadcastingOutputCollector,这里也顺便看一下。

CopyingBroadcastingOutputCollector#collect

@Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {for (int i = 0; i < outputs.length - 1; i++) {Output<StreamRecord<T>> output = outputs[i];

            StreamRecord<X> shallowCopy = record.copy(record.getValue());
            output.collect(outputTag, shallowCopy);
        }

        if (outputs.length > 0) {
            // don't copy for the last output
            outputs[outputs.length - 1].collect(outputTag, record);
        }
    }

CopyingBroadcastingOutputCollector 是 BroadcastingOutputCollector 的非凡版本,在 collect 办法外面多了一个浅拷贝的逻辑,如果开启了 objectReuse 则应用 CopyingBroadcastingOutputCollector 否则应用 BroadcastingOutputCollector。

CopyingChainingOutput#collect

@Override
    public void collect(StreamRecord<T> record) {if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }

        pushToOperator(record);
    }

这里逻辑和下面一样,就跳过了。

CopyingChainingOutput#pushToOperator

@Override
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator (and Serializer) expects.
            @SuppressWarnings("unchecked")
            StreamRecord<T> castRecord = (StreamRecord<T>) record;

            numRecordsIn.inc();
            StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
            input.setKeyContextElement(copy);
            input.processElement(copy);
        } catch (ClassCastException e) {if (outputTag != null) {
                // Enrich error message
                ClassCastException replace =
                        new ClassCastException(
                                String.format(
                                        "%s. Failed to push OutputTag with id'%s'to operator."
                                                + "This can occur when multiple OutputTags with different types"
                                                + "but identical names are being used.",
                                        e.getMessage(), outputTag.getId()));

                throw new ExceptionInChainedOperatorException(replace);
            } else {throw new ExceptionInChainedOperatorException(e);
            }
        } catch (Exception e) {throw new ExceptionInChainedOperatorException(e);
        }
    }

还是和下面一样,间接调用上游 operator 的 processElement 传递数据,这里的上游是 StreamSink,所以 input 是 StreamSink。

StreamSink#processElement

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        sinkContext.element = element;
        userFunction.invoke(element.getValue(), sinkContext);
    }

在 processElement 办法里会调用 userFunction 的 invoke 办法,然而这里的 userFunction 不是咱们自定义实现的,而是 Flink 默认提供的 PrintSinkFunction。

为了更加不便的比照开启 objectReuse 和不开启 objectReuse 的不同之处,整个调用链路如下:

不开启(默认)objectReuse 的调用链:
AsyncDataOutputToOutput.emitRecord
-->CopyingChainingOutput.collect
-->CopyingChainingOutput.pushToOperator
  -->StreamMap.processElement
    -->CountingOutput.collect
    -->RecordWriterOutput.collect
    -->RecordWriterOutput.pushToRecordWriter
      -->AbstractStreamTaskNetworkInput.emitNext
      -->AbstractStreamTaskNetworkInput.processElement
        -->OneInputStreamTask.StreamTaskNetworkOutput.emitRecord
          -->ProcessOperator.processElement
            -->ProcessFunction.processElement
              -->TimestampedCollector.collect (这个是失常流的链路)
                -->CountingOutput.collect
                  -->BroadcastingOutputCollector.collect
                    -->CopyingChainingOutput.collect
                    -->CopyingChainingOutput.pushToOperator
                      -->StreamSink.processElement
                        -->SinkFunction.invoke
                            -->PrintSinkFunction.invoke
开启 objectReuse 的调用链:
AsyncDataOutputToOutput.emitRecord
-->ChainingOutput.collect
-->ChainingOutput.pushToOperator
  -->StreamMap.processElement
    -->CountingOutput.collect
    -->RecordWriterOutput.collect
    -->RecordWriterOutput.pushToRecordWriter
    -->AbstractStreamTaskNetworkInput.emitNext
    -->AbstractStreamTaskNetworkInput.processElement
        -->OneInputStreamTask.StreamTaskNetworkOutput.emitRecord
            -->ProcessFunction.processElement
                -->ProcessOperator.ContextImpl.output (这个是侧流输入的链路)
                    -->CountingOutput.collect
                        -->CopyingBroadcastingOutputCollector.collect
                            -->ChainingOutput.collect
                            -->ChainingOutput.pushToOperator
                                -->StreamSink.processElement
                    -->SinkFunction.invoke
                      -->PrintSinkFunction.invoke

通过比照整个调用链路,你会发现,不开启(默认)objectReuse 的时候,在 operatorChain 之间传递数据用的是 CopyingChainingOutput,在有侧流输入播送的场景下用的是 BroadcastingOutputCollector,开启 objectReuse 的话,在 operatorChain 之间传递数据用的是 ChainingOutput,在有侧流输入播送的场景下用的是 CopyingBroadcastingOutputCollector,其余中央没有差异。

总结

本文从一个简略的 Flink 应用程序登程,介绍了常见的几个 Output 实现类的应用场景及源码解析,ChainingOutput 次要用在 operatorChain 外部传递数据,RecordWriterOutput 次要用在跨 operatorChain 不同 Task 之间传递数据,CountingOutput 次要是为了更新 Metrics,BroadcastingOutputCollector 次要用于播送场景下,TimestampedCollector 次要用来给 StreamRecord 设置工夫戳属性。

举荐浏览

Flink 工作实时监控最佳实际

Flink on yarn 实时日志收集最佳实际

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 生产 kafka 数据自定义反序列化类

Flink SQL JSON Format 源码解析

Flink on yarn 近程调试源码

Flink 通过 State Processor API 实现状态的读取和写入

Flink 侧流输入源码解析

Flink 源码:播送流状态源码解析

Flink 源码剖析之 Client 端启动流程剖析

Flink Print SQL Connector 增加随机取样性能

如果你感觉文章对你有帮忙,麻烦点一下 在看 吧,你的反对是我创作的最大能源。

本文由 mdnice 多平台公布

正文完
 0