序
本文主要研究一下 flink 的 PrintSinkFunction
DataStream.print
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Writes a DataStream to the standard output stream (stdout).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
* worker.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name(“Print to Std. Out”);
}
/**
* Writes a DataStream to the standard output stream (stderr).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
* worker.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> printToErr() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true);
return addSink(printFunction).name(“Print to Std. Err”);
}
/**
* Writes a DataStream to the standard output stream (stdout).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
* worker.
*
* @param sinkIdentifier The string to prefix the output with.
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> print(String sinkIdentifier) {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
return addSink(printFunction).name(“Print to Std. Out”);
}
/**
* Writes a DataStream to the standard output stream (stderr).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
* worker.
*
* @param sinkIdentifier The string to prefix the output with.
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> printToErr(String sinkIdentifier) {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true);
return addSink(printFunction).name(“Print to Std. Err”);
}
/**
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the {@link StreamExecutionEnvironment#execute()}
* method is called.
*
* @param sinkFunction
* The object containing the sink’s invoke function.
* @return The closed DataStream.
*/
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
DataStream 提供了几个 print 开头的方法,内部是创建了 PrintSinkFunction,通过调用 addSink 操作把该 PrintSinkFunction 添加进去
addSink 方法的注释表明带有 sinks 的 streams,会在 StreamExecutionEnvironment.execute() 调用的时候被执行
SinkFunction 先是被 StreamSink 包装,然后被 DataStreamSink 包装,最后通过 DataStreamSink.getTransformation 作为 operator 添加到 ExecutionEnvironment
SinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
/**
* Interface for implementing user defined sink functionality.
*
* @param <IN> Input type parameter.
*/
@Public
public interface SinkFunction<IN> extends Function, Serializable {
/**
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {}
/**
* Writes the given value to the sink. This function is called for every record.
*
* <p>You have to override this method when implementing a {@code SinkFunction}, this is a
* {@code default} method for backward compatibility with the old-style method only.
*
* @param value The input record.
* @param context Additional context about the input record.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
/**
* Context that {@link SinkFunction SinkFunctions} can use for getting additional data about
* an input record.
*
* <p>The context is only valid for the duration of a
* {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
* afterwards!
*
* @param <T> The type of elements accepted by the sink.
*/
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}
}
SinkFunction 接口定义了 invoke 方法,用来触发 sink 逻辑;invoke 方法里头传递了一个 Context,该接口定义了 currentProcessingTime、currentWatermark、timestamp 三个方法
RichSinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
/**
* A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
*/
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
}
RichSinkFunction 抽象类继承了 AbstractRichFunction 类,同时也声明实现 SinkFunction 接口;大部分内置的 sink function 都继承了 RichSinkFunction;AbstractRichFunction 主要是提供了 RuntimeContext 属性,可以用来获取 function 运行时的上下文
PrintSinkFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
/**
* Implementation of the SinkFunction writing every tuple to the standard
* output or standard error stream.
*
* <p>
* Four possible format options:
* {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1
* {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1
* taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1
* output <- no {@code sinkIdentifier} provided, parallelism == 1
* </p>
*
* @param <IN> Input record type
*/
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final PrintSinkOutputWriter<IN> writer;
/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}
/**
* Instantiates a print sink function that prints to standard out and gives a sink identifier.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}
PrintSinkFunction 继承了 RichSinkFunction,它主要是使用了 PrintSinkOutputWriter,在 invoke 的时候调用 PrintSinkOutputWriter 的 write 方法来执行输出
PrintSinkOutputWriter
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
/**
* Print sink output writer for DataStream and DataSet print API.
*/
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {
private static final long serialVersionUID = 1L;
private static final boolean STD_OUT = false;
private static final boolean STD_ERR = true;
private final boolean target;
private transient PrintStream stream;
private final String sinkIdentifier;
private transient String completedPrefix;
public PrintSinkOutputWriter() {
this(“”, STD_OUT);
}
public PrintSinkOutputWriter(final boolean stdErr) {
this(“”, stdErr);
}
public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
this.target = stdErr;
this.sinkIdentifier = (sinkIdentifier == null ? “” : sinkIdentifier);
}
public void open(int subtaskIndex, int numParallelSubtasks) {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
completedPrefix = sinkIdentifier;
if (numParallelSubtasks > 1) {
if (!completedPrefix.isEmpty()) {
completedPrefix += “:”;
}
completedPrefix += (subtaskIndex + 1);
}
if (!completedPrefix.isEmpty()) {
completedPrefix += “> “;
}
}
public void write(IN record) {
stream.println(completedPrefix + record.toString());
}
@Override
public String toString() {
return “Print to ” + (target == STD_OUT ? “System.out” : “System.err”);
}
}
PrintSinkOutputWriter 的构造器最多可以接收两个参数,分别是 sinkIdentifier 以及 stdErr;sinkIdentifier 即为输出的前缀,stdErr 用于表示是否输出到 System.err
open 方法主要用于做一些准备工作,它在 PrintSinkFunction 的 open 方法里头会被调用,PrintSinkFunction 的 open 方法会从 AbstractRichFunction 定义的 RuntimeContext 里头获取 subtaskIndex 及 numParallelSubtasks 传递过来;这里的 open 方法根据 sinkIdentifier 以及 subtaskIndex、numParallelSubtasks 信息构建 completedPrefix
write 方法就是调用 System.out 或者 System.err 的 println 方法,带上 completedPrefix 及 record 的信息
小结
DataStream 的几个 print 开头的方法内部创建的是 PrintSinkFunction,然后调用 addSink 方法添加到 ExecutionEnvironment 中 (先是被 StreamSink 包装,然后被 DataStreamSink 包装,最后通过 DataStreamSink.getTransformation 作为 operator 添加到 ExecutionEnvironment)
SinkFunction 是 sink function 的基础接口,它主要定义了 invoke 方法,该方法里头传递了一个 Context;而内置的一些 sink function 大多是继承的 RichSinkFunction,RichSinkFunction 主要是继承了 AbstractRichFunction,可以提供 funtion 运行时的 RuntimeContext 信息
PrintSinkFunction 继承了 RichSinkFunction,它主要是使用了 PrintSinkOutputWriter,在 invoke 的时候调用 PrintSinkOutputWriter 的 write 方法来执行输出
doc
PrintSinkFunction