聊聊flink的InputFormatSourceFunction

50次阅读

共计 17010 个字符,预计需要花费 43 分钟才能阅读完成。


本文主要研究一下 flink 的 InputFormatSourceFunction
实例
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
IteratorInputFormat iteratorInputFormat = new IteratorInputFormat<String>(new WordIterator());
env
//TypeInformation.of(new TypeHint<String>() {}
.createInput(iteratorInputFormat,TypeExtractor.createTypeInfo(String.class))
.setParallelism(1)
.print();
这里使用 IteratorInputFormat 调用 env 的 createInput 方法来创建 SourceFunction
StreamExecutionEnvironment.createInput
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@PublicEvolving
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
DataStreamSource<OUT> source;

if (inputFormat instanceof FileInputFormat) {
@SuppressWarnings(“unchecked”)
FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;

source = createFileInput(format, typeInfo, “Custom File source”,
FileProcessingMode.PROCESS_ONCE, -1);
} else {
source = createInput(inputFormat, typeInfo, “Custom Source”);
}
return source;
}

private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName) {

InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo);
return addSource(function, sourceName, typeInfo);
}
StreamExecutionEnvironment.createInput 在 inputFormat 不是 FileInputFormat 类型的时候创建的是 InputFormatSourceFunction
InputFormatSourceFunction
/**
* A {@link SourceFunction} that reads data using an {@link InputFormat}.
*/
@Internal
public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 1L;

private TypeInformation<OUT> typeInfo;
private transient TypeSerializer<OUT> serializer;

private InputFormat<OUT, InputSplit> format;

private transient InputSplitProvider provider;
private transient Iterator<InputSplit> splitIterator;

private volatile boolean isRunning = true;

@SuppressWarnings(“unchecked”)
public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
this.format = (InputFormat<OUT, InputSplit>) format;
this.typeInfo = typeInfo;
}

@Override
@SuppressWarnings(“unchecked”)
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();

if (format instanceof RichInputFormat) {
((RichInputFormat) format).setRuntimeContext(context);
}
format.configure(parameters);

provider = context.getInputSplitProvider();
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
splitIterator = getInputSplits();
isRunning = splitIterator.hasNext();
}

@Override
public void run(SourceContext<OUT> ctx) throws Exception {
try {

Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter(“numSplitsProcessed”);
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}

OUT nextElement = serializer.createInstance();
while (isRunning) {
format.open(splitIterator.next());

// for each element we also check if cancel
// was called by checking the isRunning flag

while (isRunning && !format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
} else {
break;
}
}
format.close();
completedSplitsCounter.inc();

if (isRunning) {
isRunning = splitIterator.hasNext();
}
}
} finally {
format.close();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).closeInputFormat();
}
isRunning = false;
}
}

@Override
public void cancel() {
isRunning = false;
}

@Override
public void close() throws Exception {
format.close();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).closeInputFormat();
}
}

/**
* Returns the {@code InputFormat}. This is only needed because we need to set the input
* split assigner on the {@code StreamGraph}.
*/
public InputFormat<OUT, InputSplit> getFormat() {
return format;
}

private Iterator<InputSplit> getInputSplits() {

return new Iterator<InputSplit>() {

private InputSplit nextSplit;

private boolean exhausted;

@Override
public boolean hasNext() {
if (exhausted) {
return false;
}

if (nextSplit != null) {
return true;
}

final InputSplit split;
try {
split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
} catch (InputSplitProviderException e) {
throw new RuntimeException(“Could not retrieve next input split.”, e);
}

if (split != null) {
this.nextSplit = split;
return true;
} else {
exhausted = true;
return false;
}
}

@Override
public InputSplit next() {
if (this.nextSplit == null && !hasNext()) {
throw new NoSuchElementException();
}

final InputSplit tmp = this.nextSplit;
this.nextSplit = null;
return tmp;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}

InputFormatSourceFunction 是一个使用 InputFormat 来读取数据的 SourceFunction,它继承了 RichParallelSourceFunction,新增了带有 2 个参数的构造器,一个是 InputFormat,一个是 TypeInformation
这里有一个 getInputSplits 方法,它返回的是 InputSplit 的 Iterator(splitIterator),nextSplit 是调用 InputSplitProvider.getNextInputSplit 来获取
run 方法主要是挨个调用 splitIterator.next(),并用 InputFormat 去 open 该 InputSplit,然后调用 format.nextRecord 来挨个读取该 InputSplit 的每个 record,最后使用 SourceContext 的 emit 方法发射出去

InputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
/**
* An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
* task is supposed to consume in the course of its execution.
*/
@Public
public interface InputSplitProvider {

/**
* Requests the next input split to be consumed by the calling task.
*
* @param userCodeClassLoader used to deserialize input splits
* @return the next input split to be consumed by the calling task or <code>null</code> if the
* task shall not consume any further input splits.
* @throws InputSplitProviderException if fetching the next input split fails
*/
InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}
InputSplitProvider 接口定义了 getNextInputSplit 方法,用于查询 nextInputSplit,它有两个实现类,分别是 RpcInputSplitProvider、TaskInputSplitProvider
RpcInputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
public class RpcInputSplitProvider implements InputSplitProvider {
private final JobMasterGateway jobMasterGateway;
private final JobVertexID jobVertexID;
private final ExecutionAttemptID executionAttemptID;
private final Time timeout;

public RpcInputSplitProvider(
JobMasterGateway jobMasterGateway,
JobVertexID jobVertexID,
ExecutionAttemptID executionAttemptID,
Time timeout) {
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
this.timeout = Preconditions.checkNotNull(timeout);
}

@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);

CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
jobVertexID,
executionAttemptID);

try {
SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());

if (serializedInputSplit.isEmpty()) {
return null;
} else {
return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
}
} catch (Exception e) {
throw new InputSplitProviderException(“Requesting the next input split failed.”, e);
}
}
}
RpcInputSplitProvider 请求 jobMasterGateway.requestNextInputSplit 来获取 SerializedInputSplit(本实例的 splitProvider 为 RpcInputSplitProvider)
TaskInputSplitProvider
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
/**
* Implementation using {@link ActorGateway} to forward the messages.
*/
public class TaskInputSplitProvider implements InputSplitProvider {

private final ActorGateway jobManager;

private final JobID jobID;

private final JobVertexID vertexID;

private final ExecutionAttemptID executionID;

private final FiniteDuration timeout;

public TaskInputSplitProvider(
ActorGateway jobManager,
JobID jobID,
JobVertexID vertexID,
ExecutionAttemptID executionID,
FiniteDuration timeout) {

this.jobManager = Preconditions.checkNotNull(jobManager);
this.jobID = Preconditions.checkNotNull(jobID);
this.vertexID = Preconditions.checkNotNull(vertexID);
this.executionID = Preconditions.checkNotNull(executionID);
this.timeout = Preconditions.checkNotNull(timeout);
}

@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);

final Future<Object> response = jobManager.ask(
new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
timeout);

final Object result;

try {
result = Await.result(response, timeout);
} catch (Exception e) {
throw new InputSplitProviderException(“Did not receive next input split from JobManager.”, e);
}

if(result instanceof JobManagerMessages.NextInputSplit){
final JobManagerMessages.NextInputSplit nextInputSplit =
(JobManagerMessages.NextInputSplit) result;

byte[] serializedData = nextInputSplit.splitData();

if(serializedData == null) {
return null;
} else {
final Object deserialized;

try {
deserialized = InstantiationUtil.deserializeObject(serializedData,
userCodeClassLoader);
} catch (Exception e) {
throw new InputSplitProviderException(“Could not deserialize the serialized input split.”, e);
}

return (InputSplit) deserialized;
}
} else {
throw new InputSplitProviderException(“RequestNextInputSplit requires a response of type ” +
“NextInputSplit. Instead response is of type ” + result.getClass() + ‘.’);
}

}
}
TaskInputSplitProvider 请求 jobManager.ask(new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),timeout)来获取 SerializedInputSplit
InputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/InputSplit.java
/**
* This interface must be implemented by all kind of input splits that can be assigned to input formats.
*
* <p>Input splits are transferred in serialized form via the messages, so they need to be serializable
* as defined by {@link java.io.Serializable}.</p>
*/
@Public
public interface InputSplit extends Serializable {

/**
* Returns the number of this input split.
*
* @return the number of this input split
*/
int getSplitNumber();
}

InputSplit 是所有类型的 input splits 必须实现的接口,它 InputSplit 继承了 Serializable,方便进行序列化传输;getSplitNumber 返回的是当前 split 的编号
它有四个实现类,其中两个实现类是直接实现该接口,分别是 GenericInputSplit、LocatableInputSplit
另外两个分别是继承了 LocatableInputSplit 的 FileInputSplit,以及继承了 FileInputSplit 的 TimestampedFileInputSplit

GenericInputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java
/**
* A generic input split that has only a partition number.
*/
@Public
public class GenericInputSplit implements InputSplit, java.io.Serializable {

private static final long serialVersionUID = 1L;

/** The number of this split. */
private final int partitionNumber;

/** The total number of partitions */
private final int totalNumberOfPartitions;

// ——————————————————————————————–

/**
* Creates a generic input split with the given split number.
*
* @param partitionNumber The number of the split’s partition.
* @param totalNumberOfPartitions The total number of the splits (partitions).
*/
public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {
this.partitionNumber = partitionNumber;
this.totalNumberOfPartitions = totalNumberOfPartitions;
}

//……

public String toString() {
return “GenericSplit (” + this.partitionNumber + ‘/’ + this.totalNumberOfPartitions + ‘)’;
}
}
GenericInputSplit 比较简单,只有两个属性,分别是 partitionNumber、totalNumberOfPartitions(本实例的 InputSplit 为 GenericInputSplit 类型)
LocatableInputSplit
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/LocatableInputSplit.java
/**
* A locatable input split is an input split referring to input data which is located on one or more hosts.
*/
@Public
public class LocatableInputSplit implements InputSplit, java.io.Serializable {

private static final long serialVersionUID = 1L;

private static final String[] EMPTY_ARR = new String[0];

/** The number of the split. */
private final int splitNumber;

/** The names of the hosts storing the data this input split refers to. */
private final String[] hostnames;

// ——————————————————————————————–

/**
* Creates a new locatable input split that refers to a multiple host as its data location.
*
* @param splitNumber The number of the split
* @param hostnames The names of the hosts storing the data this input split refers to.
*/
public LocatableInputSplit(int splitNumber, String[] hostnames) {
this.splitNumber = splitNumber;
this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
}

/**
* Creates a new locatable input split that refers to a single host as its data location.
*
* @param splitNumber The number of the split.
* @param hostname The names of the host storing the data this input split refers to.
*/
public LocatableInputSplit(int splitNumber, String hostname) {
this.splitNumber = splitNumber;
this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname};
}

//……

@Override
public String toString() {
return “Locatable Split (” + splitNumber + “) at ” + Arrays.toString(this.hostnames);
}
}
LocatableInputSplit 是可定位的 input split,它有两个属性,分别是 splitNumber 以及该 split 对应的数据所在的 hostnames
IteratorInputFormat
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/IteratorInputFormat.java
/**
* An input format that returns objects from an iterator.
*/
@PublicEvolving
public class IteratorInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput {

private static final long serialVersionUID = 1L;

private Iterator<T> iterator; // input data as serializable iterator

public IteratorInputFormat(Iterator<T> iterator) {
if (!(iterator instanceof Serializable)) {
throw new IllegalArgumentException(“The data source iterator must be serializable.”);
}

this.iterator = iterator;
}

@Override
public boolean reachedEnd() {
return !this.iterator.hasNext();
}

@Override
public T nextRecord(T record) {
return this.iterator.next();
}
}
IteratorInputFormat 主要是对 Iterator 进行了包装,实现了 reachedEnd、nextRecord 接口;它继承了 GenericInputFormat
GenericInputFormat
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java
/**
* Generic base class for all Rich inputs that are not based on files.
*/
@Public
public abstract class GenericInputFormat<OT> extends RichInputFormat<OT, GenericInputSplit> {

private static final long serialVersionUID = 1L;

/**
* The partition of this split.
*/
protected int partitionNumber;

// ——————————————————————————————–

@Override
public void configure(Configuration parameters) {
// nothing by default
}

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
// no statistics available, by default.
return cachedStatistics;
}

@Override
public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
if (numSplits < 1) {
throw new IllegalArgumentException(“Number of input splits has to be at least 1.”);
}

numSplits = (this instanceof NonParallelInput) ? 1 : numSplits;
GenericInputSplit[] splits = new GenericInputSplit[numSplits];
for (int i = 0; i < splits.length; i++) {
splits[i] = new GenericInputSplit(i, numSplits);
}
return splits;
}

@Override
public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
return new DefaultInputSplitAssigner(splits);
}

// ——————————————————————————————–

@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionNumber = split.getSplitNumber();
}

@Override
public void close() throws IOException {}
}

RpcInputSplitProvider 是调用 JobMaster.requestNextInputSplit 来获取 SerializedInputSplit,而 JobMaster 是调用 splitAssigner.getNextInputSplit(host, taskId),这里的 splitAssigner,即为 DefaultInputSplitAssigner(从 vertex.getSplitAssigner()获取)
而 vertex.getSplitAssigner()返回的 splitAssigner,是 ExecutionJobVertex 在构造器里头根据 splitSource.getInputSplitAssigner(splitSource.createInputSplits(numTaskVertices))得来的
而 splitSource 即为这里的 IteratorInputFormat,而 IteratorInputFormat 的 createInputSplits(根据 numTaskVertices 来分割)及 getInputSplitAssigner 方法均为父类 GenericInputFormat 提供

DefaultInputSplitAssigner
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
/**
* This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner
* simply returns all input splits of an input vertex in the order they were originally computed.
*/
@Internal
public class DefaultInputSplitAssigner implements InputSplitAssigner {

/** The logging object used to report information and errors. */
private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);

/** The list of all splits */
private final List<InputSplit> splits = new ArrayList<InputSplit>();

public DefaultInputSplitAssigner(InputSplit[] splits) {
Collections.addAll(this.splits, splits);
}

public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) {
this.splits.addAll(splits);
}

@Override
public InputSplit getNextInputSplit(String host, int taskId) {
InputSplit next = null;

// keep the synchronized part short
synchronized (this.splits) {
if (this.splits.size() > 0) {
next = this.splits.remove(this.splits.size() – 1);
}
}

if (LOG.isDebugEnabled()) {
if (next == null) {
LOG.debug(“No more input splits available”);
} else {
LOG.debug(“Assigning split ” + next + ” to ” + host);
}
}
return next;
}
}
DefaultInputSplitAssigner 仅仅是按顺序返回 InputSplit
小结

InputFormatSourceFunction 是一个使用 InputFormat 来读取数据的 SourceFunction,它继承了 RichParallelSourceFunction,新增了带有 2 个参数的构造器,一个是 InputFormat,一个是 TypeInformation
本实例使用的 IteratorInputFormat 继承了 GenericInputFormat,后者提供了两个重要的方法,一个是 createInputSplits(这里是根据 numTaskVertices 来分割),一个是 getInputSplitAssigner(这里创建的是 DefaultInputSplitAssigner,即按顺序返回分割好的 InputSplit)
InputFormatSourceFunction 的 run 方法主要是挨个调用 splitIterator.next(),并用 InputFormat 去 open 该 InputSplit,然后调用 format.nextRecord 来挨个读取该 InputSplit 的每个 record,最后使用 SourceContext 的 emit 方法发射出去

可以看到整个大的逻辑就是 GenericInputFormat 提供将 input 分割为 InputSplit 的方法,同时提供 InputSplitAssigner,然后 InputFormatSourceFunction 就是挨个遍历分割好的属于自己 (Task) 的 InputSplit(通过 InputSplitAssigner 获取),然后通过 InputFormat 读取 InputSplit 来挨个获取这个 InputSplit 的每个元素,然后通过 SourceContext 的 emit 方法发射出去
doc
InputFormatSourceFunction

正文完
 0