序本文主要研究一下flink的PrintSinkFunctionDataStream.printflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java /** * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name(“Print to Std. Out”); } /* * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> printToErr() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true); return addSink(printFunction).name(“Print to Std. Err”); } /* * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false); return addSink(printFunction).name(“Print to Std. Out”); } /* * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> printToErr(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true); return addSink(printFunction).name(“Print to Std. Err”); } /* * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink’s invoke function. * @return The closed DataStream. / public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }DataStream提供了几个print开头的方法,内部是创建了PrintSinkFunction,通过调用addSink操作把该PrintSinkFunction添加进去addSink方法的注释表明带有sinks的streams,会在StreamExecutionEnvironment.execute()调用的时候被执行SinkFunction先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironmentSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java/* * Interface for implementing user defined sink functionality. * * @param <IN> Input type parameter. /@Publicpublic interface SinkFunction<IN> extends Function, Serializable { /* * @deprecated Use {@link #invoke(Object, Context)}. / @Deprecated default void invoke(IN value) throws Exception {} /* * Writes the given value to the sink. This function is called for every record. * * <p>You have to override this method when implementing a {@code SinkFunction}, this is a * {@code default} method for backward compatibility with the old-style method only. * * @param value The input record. * @param context Additional context about the input record. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. / default void invoke(IN value, Context context) throws Exception { invoke(value); } /* * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * <p>The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param <T> The type of elements accepted by the sink. / @Public // Interface might be extended in the future with additional methods. interface Context<T> { /* Returns the current processing time. / long currentProcessingTime(); /* Returns the current event-time watermark. / long currentWatermark(); /* * Returns the timestamp of the current input record or {@code null} if the element does not * have an assigned timestamp. / Long timestamp(); }}SinkFunction接口定义了invoke方法,用来触发sink逻辑;invoke方法里头传递了一个Context,该接口定义了currentProcessingTime、currentWatermark、timestamp三个方法RichSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java/* * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}. /@Publicpublic abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { private static final long serialVersionUID = 1L;}RichSinkFunction抽象类继承了AbstractRichFunction类,同时也声明实现SinkFunction接口;大部分内置的sink function都继承了RichSinkFunction;AbstractRichFunction主要是提供了RuntimeContext属性,可以用来获取function运行时的上下文PrintSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java/* * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * * <p> * Four possible format options: * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 * taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1 * output <- no {@code sinkIdentifier} provided, parallelism == 1 * </p> * * @param <IN> Input record type /@PublicEvolvingpublic class PrintSinkFunction<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private final PrintSinkOutputWriter<IN> writer; /* * Instantiates a print sink function that prints to standard out. / public PrintSinkFunction() { writer = new PrintSinkOutputWriter<>(false); } /* * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. / public PrintSinkFunction(final boolean stdErr) { writer = new PrintSinkOutputWriter<>(stdErr); } /* * Instantiates a print sink function that prints to standard out and gives a sink identifier. * * @param stdErr True, if the format should print to standard error instead of standard out. * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value / public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); } @Override public void invoke(IN record) { writer.write(record); } @Override public String toString() { return writer.toString(); }}PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出PrintSinkOutputWriterflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java/* * Print sink output writer for DataStream and DataSet print API. */@Internalpublic class PrintSinkOutputWriter<IN> implements Serializable { private static final long serialVersionUID = 1L; private static final boolean STD_OUT = false; private static final boolean STD_ERR = true; private final boolean target; private transient PrintStream stream; private final String sinkIdentifier; private transient String completedPrefix; public PrintSinkOutputWriter() { this("", STD_OUT); } public PrintSinkOutputWriter(final boolean stdErr) { this("", stdErr); } public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { this.target = stdErr; this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); } public void open(int subtaskIndex, int numParallelSubtasks) { // get the target stream stream = target == STD_OUT ? System.out : System.err; completedPrefix = sinkIdentifier; if (numParallelSubtasks > 1) { if (!completedPrefix.isEmpty()) { completedPrefix += “:”; } completedPrefix += (subtaskIndex + 1); } if (!completedPrefix.isEmpty()) { completedPrefix += “> “; } } public void write(IN record) { stream.println(completedPrefix + record.toString()); } @Override public String toString() { return “Print to " + (target == STD_OUT ? “System.out” : “System.err”); }}PrintSinkOutputWriter的构造器最多可以接收两个参数,分别是sinkIdentifier以及stdErr;sinkIdentifier即为输出的前缀,stdErr用于表示是否输出到System.erropen方法主要用于做一些准备工作,它在PrintSinkFunction的open方法里头会被调用,PrintSinkFunction的open方法会从AbstractRichFunction定义的RuntimeContext里头获取subtaskIndex及numParallelSubtasks传递过来;这里的open方法根据sinkIdentifier以及subtaskIndex、numParallelSubtasks信息构建completedPrefixwrite方法就是调用System.out或者System.err的println方法,带上completedPrefix及record的信息小结DataStream的几个print开头的方法内部创建的是PrintSinkFunction,然后调用addSink方法添加到ExecutionEnvironment中(先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironment)SinkFunction是sink function的基础接口,它主要定义了invoke方法,该方法里头传递了一个Context;而内置的一些sink function大多是继承的RichSinkFunction,RichSinkFunction主要是继承了AbstractRichFunction,可以提供funtion运行时的RuntimeContext信息PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出docPrintSinkFunction