聊聊flink的SpoutWrapper

61次阅读

共计 17311 个字符,预计需要花费 44 分钟才能阅读完成。


本文主要研究一下 flink 的 SpoutWrapper
SpoutWrapper
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.java
/**
* A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
* takes the spout’s output tuples and transforms them into Flink tuples of type {@code OUT} (see
* {@link SpoutCollector} for supported types).<br>
* <br>
* Per default, {@link SpoutWrapper} calls the wrapped spout’s {@link IRichSpout#nextTuple() nextTuple()} method in
* an infinite loop.<br>
* Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of
* times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can
* be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper}
* terminates if no record was emitted to the output collector for the first time during a call to
* {@link IRichSpout#nextTuple() nextTuple()}.<br>
* If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or
* is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
* {@link FiniteSpout#reachedEnd()} returns true.
*/
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
//……

/** The number of {@link IRichSpout#nextTuple()} calls. */
private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop

/**
* Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
* the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to
* {@link Tuple25} depending on the spout’s declared number of attributes.
*
* @param spout
* The {@link IRichSpout spout} to be used.
* @param numberOfInvocations
* The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper}
* terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is
* disabled.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
*/
public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations)
throws IllegalArgumentException {
this(spout, (Collection<String>) null, numberOfInvocations);
}

/**
* Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
* the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to
* {@link Tuple25} depending on the spout’s declared number of attributes.
*
* @param spout
* The {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [0;25].
*/
public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
this(spout, (Collection<String>) null, null);
}

@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();

if (config != null) {
if (config instanceof StormConfig) {
stormConfig = (StormConfig) config;
} else {
stormConfig.putAll(config.toMap());
}
}

final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
this.stormTopology, stormConfig);

SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes,
stormTopologyContext.getThisTaskId(), ctx);

this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
this.spout.activate();

if (numberOfInvocations == null) {
if (this.spout instanceof FiniteSpout) {
final FiniteSpout finiteSpout = (FiniteSpout) this.spout;

while (this.isRunning && !finiteSpout.reachedEnd()) {
finiteSpout.nextTuple();
}
} else {
while (this.isRunning) {
this.spout.nextTuple();
}
}
} else {
int counter = this.numberOfInvocations;
if (counter >= 0) {
while ((–counter >= 0) && this.isRunning) {
this.spout.nextTuple();
}
} else {
do {
collector.tupleEmitted = false;
this.spout.nextTuple();
} while (collector.tupleEmitted && this.isRunning);
}
}
}

/**
* {@inheritDoc}
*
* <p>Sets the {@link #isRunning} flag to {@code false}.
*/
@Override
public void cancel() {
this.isRunning = false;
}

/**
* {@inheritDoc}
*
* <p>Sets the {@link #isRunning} flag to {@code false}.
*/
@Override
public void stop() {
this.isRunning = false;
}

@Override
public void close() throws Exception {
this.spout.close();
}
}

SpoutWrapper 继承了 RichParallelSourceFunction 类,实现了 StoppableFunction 接口的 stop 方法
SpoutWrapper 的 run 方法创建了 flink 的 SpoutCollector 作为 storm 的 SpoutOutputCollector 的构造器参数,之后调用 spout 的 open 方法,把包装了 SpoutCollector(flink)的 SpoutOutputCollector 传递给 spout,用来收集 spout 发射的数据
之后就是根据 numberOfInvocations 参数来调用 spout.nextTuple()方法来发射数据;numberOfInvocations 是控制调用 spout 的 nextTuple 的次数,它可以在创建 SpoutWrapper 的时候在构造器中设置,如果使用没有 numberOfInvocations 参数的构造器,则该值为 null,表示 infinite loop
flink 对 storm 的 spout 有进行封装,提供了 FiniteSpout 接口,它有个 reachedEnd 接口用来判断数据是否发送完毕,来将 storm 的 spout 改造为 finite 模式;这里如果使用的是 storm 原始的 spout,则就是一直循环调用 nextTuple 方法
如果有设置 numberOfInvocations 而且大于等于 0,则根据指定的次数来调用 nextTuple 方法;如果该值小于 0,则根据 collector.tupleEmitted 值来判断是否终止循环

SpoutCollector
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutCollector.java
/**
* A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm
* compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
* Flink tuples and emits them via the provide {@link SourceContext} object.
*/
class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {

/** The Flink source context object. */
private final SourceContext<OUT> flinkContext;

/**
* Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the
* number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
* to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
*
* @param numberOfAttributes
* The number of attributes of the emitted tuples.
* @param taskId
* The ID of the producer task (negative value for unknown).
* @param flinkContext
* The Flink source context to be used.
* @throws UnsupportedOperationException
* if the specified number of attributes is greater than 25
*/
SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
super(numberOfAttributes, taskId);
assert (flinkContext != null);
this.flinkContext = flinkContext;
}

@Override
protected List<Integer> doEmit(final OUT flinkTuple) {
this.flinkContext.collect(flinkTuple);
// TODO
return null;
}

@Override
public void reportError(final Throwable error) {
// not sure, if Flink can support this
}

@Override
public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
return this.tansformAndEmit(streamId, tuple);
}

@Override
public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
throw new UnsupportedOperationException(“Direct emit is not supported by Flink”);
}

public long getPendingCount() {
return 0;
}

}

SpoutCollector 实现了 storm 的 ISpoutOutputCollector 接口,实现了该接口定义的 emit、emitDirect、getPendingCount、reportError 方法;flink 目前不支持 emitDirect 方法,另外 getPendingCount 也始终返回 0,reportError 方法是个空操作
doEmit 里头调用 flinkContext.collect(flinkTuple)来发射数据,该方法为 protected,主要是给 tansformAndEmit 调用的
tansformAndEmit 方法由父类 AbstractStormCollector 提供

AbstractStormCollector.tansformAndEmit
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java
/**
* Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
* to the specified output stream.
*
* @param The
* The output stream id.
* @param tuple
* The Storm tuple to be emitted.
* @return the return value of {@link #doEmit(Object)}
*/
@SuppressWarnings(“unchecked”)
protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
List<Integer> taskIds;

int numAtt = this.numberOfAttributes.get(streamId);
int taskIdIdx = numAtt;
if (this.taskId >= 0 && numAtt < 0) {
numAtt = 1;
taskIdIdx = 0;
}
if (numAtt >= 0) {
assert (tuple.size() == numAtt);
Tuple out = this.outputTuple.get(streamId);
for (int i = 0; i < numAtt; ++i) {
out.setField(tuple.get(i), i);
}
if (this.taskId >= 0) {
out.setField(this.taskId, taskIdIdx);
}
if (this.split) {
this.splitTuple.streamId = streamId;
this.splitTuple.value = out;

taskIds = doEmit((OUT) this.splitTuple);
} else {
taskIds = doEmit((OUT) out);
}

} else {
assert (tuple.size() == 1);
if (this.split) {
this.splitTuple.streamId = streamId;
this.splitTuple.value = tuple.get(0);

taskIds = doEmit((OUT) this.splitTuple);
} else {
taskIds = doEmit((OUT) tuple.get(0));
}
}
this.tupleEmitted = true;

return taskIds;
}

AbstractStormCollector.tansformAndEmit,这里主要处理了 split 的场景,即一个 spout declare 了多个 stream,最后都通过子类 SpoutCollector.doEmit 来发射数据
如果 split 为 true,则传给 doEmit 方法的是 splitTuple,即 SplitStreamType,它记录了 streamId 及其 value
如果 split 为 false,则传给 doEmit 方法的是 Tuple 类型,即相当于 SplitStreamType 中的 value,相比于 SplitStreamType 少了 streamId 信息

Task.run
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task’s configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//……

/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//……
// now load and instantiate the task’s invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

// —————————————————————-
// actual task core work
// —————————————————————-

// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;

// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}

// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);

// run the invokable
invokable.invoke();

//……
}
}
Task 的 run 方法会调用 invokable.invoke(),这里的 invokable 为 StreamTask
StreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
* the Task’s operator chain. Operators that are chained together execute synchronously in the
* same thread and hence on the same stream partition. A common case for these chains
* are successive map/flatmap/filter tasks.
*
* <p>The task chain contains one “head” operator and multiple chained operators.
* The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
* as well as for sources, iteration heads and iteration tails.
*
* <p>The Task class deals with the setup of the streams read by the head operator, and the streams
* produced by the operators at the ends of the operator chain. Note that the chain may fork and
* thus have multiple ends.
*
* <p>The life cycle of the task is set up as follows:
* <pre>{@code
* — setInitialState -> provides state of all operators in the chain
*
* — invoke()
* |
* +—-> Create basic utils (config, etc) and load the chain of operators
* +—-> operators.setup()
* +—-> task specific init()
* +—-> initialize-operator-states()
* +—-> open-operators()
* +—-> run()
* +—-> close-operators()
* +—-> dispose-operators()
* +—-> common cleanup
* +—-> task specific cleanup()
* }</pre>
*
* <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
* @param <OUT>
* @param <OP>
*/
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {

//……

@Override
public final void invoke() throws Exception {

boolean disposed = false;
try {
//……

// let the task do its work
isRunning = true;
run();

// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the “clean shutdown” is not attempted
if (canceled) {
throw new CancelTaskException();
}

LOG.debug(“Finished task {}”, getName());

//……
}
finally {
// clean up everything we initialized
isRunning = false;

//……
}
}
}
StreamTask 的 invoke 方法里头调用子类的 run 方法,这里子类为 StoppableSourceStreamTask
StoppableSourceStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
/**
* Stoppable task for executing stoppable streaming sources.
*
* @param <OUT> Type of the produced elements
* @param <SRC> Stoppable source function
*/
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {

private volatile boolean stopped;

public StoppableSourceStreamTask(Environment environment) {
super(environment);
}

@Override
protected void run() throws Exception {
if (!stopped) {
super.run();
}
}

@Override
public void stop() {
stopped = true;
if (this.headOperator != null) {
this.headOperator.stop();
}
}
}
StoppableSourceStreamTask 继承了 SourceStreamTask,主要是实现了 StoppableTask 的 stop 方法,它的 run 方法由其直接父类 SourceStreamTask 来实现
SourceStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
/**
* {@link StreamTask} for executing a {@link StreamSource}.
*
* <p>One important aspect of this is that the checkpointing and the emission of elements must never
* occur at the same time. The execution must be serial. This is achieved by having the contract
* with the StreamFunction that it must only modify its state or emit elements in
* a synchronized block that locks on the lock Object. Also, the modification of the state
* and the emission of elements must happen in the same block of code that is protected by the
* synchronized block.
*
* @param <OUT> Type of the output elements of this source.
* @param <SRC> Type of the source function for the stream source operator
* @param <OP> Type of the stream source operator
*/
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {

//……

@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
}
SourceStreamTask 主要是调用 StreamSource 的 run 方法
StreamSource
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
/**
* {@link StreamOperator} for streaming sources.
*
* @param <OUT> Type of the output elements
* @param <SRC> Type of the source function of this stream source operator
*/
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

//……

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}

public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {

final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}

final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);

try {
userFunction.run(ctx);

// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
它调用了 userFunction.run(ctx),这里的 userFunction 为 SpoutWrapper,从而完成 spout 的 nextTuple 的触发
小结

flink 使用 SpoutWrapper 来包装 storm 原始的 spout,它在 run 方法里头创建了 flink 的 SpoutCollector 作为 storm 的 SpoutOutputCollector 的构造器参数,之后调用 spout 的 open 方法,把包装了 SpoutCollector(flink)的 SpoutOutputCollector 传递给 spout,用来收集 spout 发射的数据;之后就是根据 numberOfInvocations 参数来调用 spout.nextTuple()方法来发射数据;numberOfInvocations 是控制调用 spout 的 nextTuple 的次数,它可以在创建 SpoutWrapper 的时候在构造器中设置,如果使用没有 numberOfInvocations 参数的构造器,则该值为 null,表示 infinite loop
SpoutCollector 的 emit 方法内部调用了 AbstractStormCollector.tansformAndEmit(它最后调用 SpoutCollector.doEmit 方法来发射),针对多个 stream 的场景,封装了 SplitStreamType 的 tuple 给到 doEmit 方法;如果只有一个 stream,则仅仅将普通的 tuple 传给 doEmit 方法
flink 的 Task 的 run 方法会调用 StreamTask 的 invoke 方法,而 StreamTask 的 invoke 方法会调用子类 (这里子类为 StoppableSourceStreamTask) 的 run 方法,StoppableSourceStreamTask 的 run 方法是直接父类 SourceStreamTask 来实现的,而它主要是调用了 StreamSource 的 run 方法,而 StreamSource 的 run 方法调用了 userFunction.run(ctx),这里的 userFunction 为 SpoutWrapper,从而执行 spout 的 nextTuple 的逻辑,通过 flink 的 SpoutCollector 进行发射

doc
Storm Compatibility Beta

正文完
 0