聊聊flink的TextOutputFormat

40次阅读

共计 11574 个字符,预计需要花费 29 分钟才能阅读完成。


本文主要研究一下 flink 的 TextOutputFormat
DataStream.writeAsText
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Writes a DataStream to the file specified by path in text format.
*
* <p>For every element of the DataStream the result of {@link Object#toString()} is written.
*
* @param path
* The path pointing to the location the text file is written to
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
tof.setWriteMode(writeMode);
return writeUsingOutputFormat(tof);
}

/**
* Writes the dataStream into an output, described by an OutputFormat.
*
* <p>The output is not participating in Flink’s checkpointing!
*
* <p>For writing to a file system periodically, the use of the “flink-connector-filesystem”
* is recommended.
*
* @param format The output format
* @return The closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
return addSink(new OutputFormatSinkFunction<>(format));
}
DataStream 的 writeAsText 方法创建了 TextOutputFormat,然后通过 OutputFormatSinkFunction 包装为 sink function
TextOutputFormat
flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/io/TextOutputFormat.java
/**
* A {@link FileOutputFormat} that writes objects to a text file.
*
* <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
* @param <T> type of elements
*/
@PublicEvolving
public class TextOutputFormat<T> extends FileOutputFormat<T> {

private static final long serialVersionUID = 1L;

private static final int NEWLINE = ‘\n’;

private String charsetName;

private transient Charset charset;

// ——————————————————————————————–

/**
* Formatter that transforms values into their {@link String} representations.
* @param <IN> type of input elements
*/
public interface TextFormatter<IN> extends Serializable {
String format(IN value);
}

public TextOutputFormat(Path outputPath) {
this(outputPath, “UTF-8”);
}

public TextOutputFormat(Path outputPath, String charset) {
super(outputPath);
this.charsetName = charset;
}

public String getCharsetName() {
return charsetName;
}

public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new NullPointerException();
}

if (!Charset.isSupported(charsetName)) {
throw new UnsupportedCharsetException(“The charset ” + charsetName + ” is not supported.”);
}

this.charsetName = charsetName;
}

// ——————————————————————————————–

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);

try {
this.charset = Charset.forName(charsetName);
}
catch (IllegalCharsetNameException e) {
throw new IOException(“The charset ” + charsetName + ” is not valid.”, e);
}
catch (UnsupportedCharsetException e) {
throw new IOException(“The charset ” + charsetName + ” is not supported.”, e);
}
}

@Override
public void writeRecord(T record) throws IOException {
byte[] bytes = record.toString().getBytes(charset);
this.stream.write(bytes);
this.stream.write(NEWLINE);
}

// ——————————————————————————————–

@Override
public String toString() {
return “TextOutputFormat (” + getOutputFilePath() + “) – ” + this.charsetName;
}
}
TextOutputFormat 继承了 FileOutputFormat,其 open 方法主要是调用 FileOutputFormat 的 open 方法,而 writeRecord 方法则直接往 stream 进行 write,写完一条 record 之后再写一个换行 (\n)
FileOutputFormat
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/FileOutputFormat.java
/**
* The abstract base class for all Rich output formats that are file based. Contains the logic to
* open/close the target
* file streams.
*/
@Public
public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful {
//……

/**
* Initialization of the distributed file system if it is used.
*
* @param parallelism The task parallelism.
*/
@Override
public void initializeGlobal(int parallelism) throws IOException {
final Path path = getOutputFilePath();
final FileSystem fs = path.getFileSystem();

// only distributed file systems can be initialized at start-up time.
if (fs.isDistributedFS()) {

final WriteMode writeMode = getWriteMode();
final OutputDirectoryMode outDirMode = getOutputDirectoryMode();

if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
// output is not written in parallel and should be written to a single file.
// prepare distributed output path
if(!fs.initOutPathDistFS(path, writeMode, false)) {
// output preparation failed! Cancel task.
throw new IOException(“Output path could not be initialized.”);
}

} else {
// output should be written to a directory

// only distributed file systems can be initialized at start-up time.
if(!fs.initOutPathDistFS(path, writeMode, true)) {
throw new IOException(“Output directory could not be created.”);
}
}
}
}

@Override
public void tryCleanupOnError() {
if (this.fileCreated) {
this.fileCreated = false;

try {
close();
} catch (IOException e) {
LOG.error(“Could not properly close FileOutputFormat.”, e);
}

try {
FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
} catch (FileNotFoundException e) {
// ignore, may not be visible yet or may be already removed
} catch (Throwable t) {
LOG.error(“Could not remove the incomplete file ” + actualFilePath + ‘.’, t);
}
}
}

@Override
public void configure(Configuration parameters) {
// get the output file path, if it was not yet set
if (this.outputFilePath == null) {
// get the file parameter
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException(“The output path has been specified neither via constructor/setters” +
“, nor via the Configuration.”);
}

try {
this.outputFilePath = new Path(filePath);
}
catch (RuntimeException rex) {
throw new RuntimeException(“Could not create a valid URI from the given file path name: ” + rex.getMessage());
}
}

// check if have not been set and use the defaults in that case
if (this.writeMode == null) {
this.writeMode = DEFAULT_WRITE_MODE;
}

if (this.outputDirectoryMode == null) {
this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
}
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (taskNumber < 0 || numTasks < 1) {
throw new IllegalArgumentException(“TaskNumber: ” + taskNumber + “, numTasks: ” + numTasks);
}

if (LOG.isDebugEnabled()) {
LOG.debug(“Opening stream for output (” + (taskNumber+1) + “/” + numTasks + “). WriteMode=” + writeMode +
“, OutputDirectoryMode=” + outputDirectoryMode);
}

Path p = this.outputFilePath;
if (p == null) {
throw new IOException(“The file path is null.”);
}

final FileSystem fs = p.getFileSystem();

// if this is a local file system, we need to initialize the local output directory here
if (!fs.isDistributedFS()) {

if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
// output should go to a single file

// prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
if(!fs.initOutPathLocalFS(p, writeMode, false)) {
// output preparation failed! Cancel task.
throw new IOException(“Output path ‘” + p.toString() + “‘ could not be initialized. Canceling task…”);
}
}
else {
// numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS

if(!fs.initOutPathLocalFS(p, writeMode, true)) {
// output preparation failed! Cancel task.
throw new IOException(“Output directory ‘” + p.toString() + “‘ could not be created. Canceling task…”);
}
}
}

// Suffix the path with the parallel instance index, if needed
this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix(“/” + getDirectoryFileName(taskNumber)) : p;

// create output file
this.stream = fs.create(this.actualFilePath, writeMode);

// at this point, the file creation must have succeeded, or an exception has been thrown
this.fileCreated = true;
}

@Override
public void close() throws IOException {
final FSDataOutputStream s = this.stream;
if (s != null) {
this.stream = null;
s.close();
}
}
}

FileOutputFormat 继承了 RichOutputFormat,实现了 InitializeOnMaster(initializeGlobal 方法)、CleanupWhenUnsuccessful(tryCleanupOnError 方法) 接口
initializeGlobal 主要是判断,如果文件是分布式系统文件,那么就在启动的时候全局初始化一下;tryCleanupOnError 方法先 close,然后再 delete 文件
FileOutputFormat 还实现了 OutputFormat 接口的 configure、open、close 方法,而 writeRecord 方法由子类来实现;configure 方法主要是配置 outputFilePath、writeMode、outputDirectoryMode 这几个属性;open 方法则根据 taskNumber 来获取 actualFilePath(对于 numTasks 大于 1 的,则根据 tasknumber 在配置的 outputFilePath 目录下新增文件,文件名为 tasknumber 对应的数值 +1),然后创建 stream;close 方法只要是关闭 stream

RichOutputFormat
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/RichOutputFormat.java
/**
* An abstract stub implementation for Rich output formats.
* Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}.
*/
@Public
public abstract class RichOutputFormat<IT> implements OutputFormat<IT> {

private static final long serialVersionUID = 1L;

// ——————————————————————————————–
// Runtime context access
// ——————————————————————————————–

private transient RuntimeContext runtimeContext;

public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}

public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException(“The runtime context has not been initialized yet. Try accessing ” +
“it in one of the other life cycle methods.”);
}
}
}
RichOutputFormat 声明实现 OutputFormat 接口,它主要是增加了 RuntimeContext 属性
OutputFormat
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/OutputFormat.java
/**
* The base interface for outputs that consumes records. The output format
* describes how to store the final records, for example in a file.
* <p>
* The life cycle of an output format is the following:
* <ol>
* <li>configure() is invoked a single time. The method can be used to implement initialization from
* the parameters (configuration) that may be attached upon instantiation.</li>
* <li>Each parallel output task creates an instance, configures it and opens it.</li>
* <li>All records of its parallel instance are handed to the output format.</li>
* <li>The output format is closed</li>
* </ol>
*
* @param <IT> The type of the consumed records.
*/
@Public
public interface OutputFormat<IT> extends Serializable {

/**
* Configures this output format. Since output formats are instantiated generically and hence parameterless,
* this method is the place where the output formats set their basic fields based on configuration values.
* <p>
* This method is always called first on a newly instantiated output format.
*
* @param parameters The configuration with all parameters.
*/
void configure(Configuration parameters);

/**
* Opens a parallel instance of the output format to store the result of its parallel instance.
* <p>
* When this method is called, the output format it guaranteed to be configured.
*
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws IOException Thrown, if the output could not be opened due to an I/O problem.
*/
void open(int taskNumber, int numTasks) throws IOException;

/**
* Adds a record to the output.
* <p>
* When this method is called, the output format it guaranteed to be opened.
*
* @param record The records to add to the output.
* @throws IOException Thrown, if the records could not be added to to an I/O problem.
*/
void writeRecord(IT record) throws IOException;

/**
* Method that marks the end of the life-cycle of parallel output instance. Should be used to close
* channels and streams and release resources.
* After this method returns without an error, the output is assumed to be correct.
* <p>
* When this method is called, the output format it guaranteed to be opened.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
void close() throws IOException;
}
OutputFormat 接口定义了 configure、open、writeRecord、close 方法
小结

DataStream 的 writeAsText 方法创建了 TextOutputFormat,然后通过 OutputFormatSinkFunction 包装为 sink function
TextOutputFormat 继承了 FileOutputFormat,其 open 方法主要是调用 FileOutputFormat 的 open 方法,而 writeRecord 方法则直接往 stream 进行 write,写完一条 record 之后再写一个换行 (\n)
FileOutputFormat 继承了 RichOutputFormat,实现了 InitializeOnMaster(initializeGlobal 方法)、CleanupWhenUnsuccessful(tryCleanupOnError 方法) 接口,以及 OutputFormat 接口的 configure、open、close 方法,而 writeRecord 方法由子类来实现;
FileOutputFormat 的 open 方法则根据 taskNumber 来获取 actualFilePath(对于 numTasks 大于 1 的,则根据 tasknumber 在配置的 outputFilePath 目录下新增文件,文件名为 tasknumber 对应的数值 +1),然后创建 stream
RichOutputFormat 声明实现 OutputFormat 接口,它主要是增加了 RuntimeContext 属性;OutputFormat 接口则定义了 configure、open、writeRecord、close 方法

doc
TextOutputFormat

正文完
 0