序
本文主要研究一下 flink 的 BoltWrapper
BoltWrapper
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java
/**
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
* It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
* process. Furthermore, it takes the bolt’s output tuples and transforms them into Flink tuples of type {@code OUT}
* (see {@link AbstractStormCollector} for supported types).<br/>
* <br/>
* <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream
* Bolts.</strong>
*/
public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
@Override
public void open() throws Exception {
super.open();
this.flinkCollector = new TimestampedCollector<>(this.output);
GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
if (config != null) {
if (config instanceof StormConfig) {
stormConfig = (StormConfig) config;
} else {
stormConfig.putAll(config.toMap());
}
}
this.topologyContext = WrapperSetupHelper.createTopologyContext(
getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));
if (this.stormTopology != null) {
Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
for (GlobalStreamId inputStream : inputs.keySet()) {
for (Integer tid : this.topologyContext.getComponentTasks(inputStream
.get_componentId())) {
this.inputComponentIds.put(tid, inputStream.get_componentId());
this.inputStreamIds.put(tid, inputStream.get_streamId());
this.inputSchemas.put(tid,
this.topologyContext.getComponentOutputFields(inputStream));
}
}
}
this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
}
@Override
public void dispose() throws Exception {
super.dispose();
this.bolt.cleanup();
}
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
this.flinkCollector.setTimestamp(element);
IN value = element.getValue();
if (this.stormTopology != null) {
Tuple tuple = (Tuple) value;
Integer producerTaskId = tuple.getField(tuple.getArity() – 1);
this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
.get(producerTaskId), MessageId.makeUnanchored()));
} else {
this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
MessageId.makeUnanchored()));
}
}
}
flink 用 BoltWrapper 来包装 storm 的 IRichBolt,它实现了 OneInputStreamOperator 接口,继承 AbstractStreamOperator 类
OneInputStreamOperator 接口继承了 StreamOperator 接口,额外定义了 processElement、processWatermark、processLatencyMarker 三个接口
AbstractStreamOperator 类实现的是 StreamOperator 接口,但是里头帮忙实现了 processWatermark、processLatencyMarker 这两个接口
BoltWrapper 里头主要是实现 OneInputStreamOperator 接口的 processElement 方法,然后是覆盖 StreamOperator 接口定义的 open 及 dispose 方法
open 方法有个要点就是调用 bolt 的 prepare 方法,传入包装 BoltCollector 的 OutputCollector,通过 BoltCollector 来收集 bolt 发射的数据到 flink,它使用的是 flink 的 TimestampedCollector
BoltCollector
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltCollector.java
/**
* A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible
* output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
* and emits them via the provide {@link Output} object.
*/
class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
/** The Flink output Collector. */
private final Collector<OUT> flinkOutput;
/**
* Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the
* number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
* between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
*
* @param numberOfAttributes
* The number of attributes of the emitted tuples per output stream.
* @param taskId
* The ID of the producer task (negative value for unknown).
* @param flinkOutput
* The Flink output object to be used.
* @throws UnsupportedOperationException
* if the specified number of attributes is greater than 25
*/
BoltCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
super(numberOfAttributes, taskId);
assert (flinkOutput != null);
this.flinkOutput = flinkOutput;
}
@Override
protected List<Integer> doEmit(final OUT flinkTuple) {
this.flinkOutput.collect(flinkTuple);
// TODO
return null;
}
@Override
public void reportError(final Throwable error) {
// not sure, if Flink can support this
}
@Override
public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
return this.tansformAndEmit(streamId, tuple);
}
@Override
public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
throw new UnsupportedOperationException(“Direct emit is not supported by Flink”);
}
@Override
public void ack(final Tuple input) {}
@Override
public void fail(final Tuple input) {}
@Override
public void resetTimeout(Tuple var1) {}
}
BoltCollector 实现了 storm 的 IOutputCollector 接口,只是 ack、fail、resetTimeout、reportError 操作都为空,不支持 emitDirect 操作
doEmit 方法调用的是 flinkOutput.collect(flinkTuple)
emit 方法调用的是 tansformAndEmit(streamId, tuple),它由继承的父类 AbstractStormCollector 实现
TimestampedCollector.collect
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/TimestampedCollector.java
/**
* Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
* Before giving the {@link TimestampedCollector} to a user function you must set
* the timestamp that should be attached to emitted elements. Most operators
* would set the timestamp of the incoming
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
*
* @param <T> The type of the elements that can be emitted.
*/
@Internal
public class TimestampedCollector<T> implements Collector<T> {
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
/**
* Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
*/
public TimestampedCollector(Output<StreamRecord<T>> output) {
this.output = output;
this.reuse = new StreamRecord<T>(null);
}
@Override
public void collect(T record) {
output.collect(reuse.replace(record));
}
public void setTimestamp(StreamRecord<?> timestampBase) {
if (timestampBase.hasTimestamp()) {
reuse.setTimestamp(timestampBase.getTimestamp());
} else {
reuse.eraseTimestamp();
}
}
public void setAbsoluteTimestamp(long timestamp) {
reuse.setTimestamp(timestamp);
}
public void eraseTimestamp() {
reuse.eraseTimestamp();
}
@Override
public void close() {
output.close();
}
}
TimestampedCollector 实现了 flink 的 Collector 接口,这里头额外新增了 setTimestamp、setAbsoluteTimestamp、eraseTimestamp 方法
它使用了 StreamRecord 对象,它里头有 value、timestamp、hasTimestamp 三个属性,可以将 value 与时间戳关联起来
这里的 collect 方法调用了 StreamRecord 的 replace 返回的对象,replace 方法只是更新了 value 引用,但是里头的时间戳没有更新
AbstractStormCollector.tansformAndEmit
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java
/**
* Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
* to the specified output stream.
*
* @param The
* The output stream id.
* @param tuple
* The Storm tuple to be emitted.
* @return the return value of {@link #doEmit(Object)}
*/
@SuppressWarnings(“unchecked”)
protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
List<Integer> taskIds;
int numAtt = this.numberOfAttributes.get(streamId);
int taskIdIdx = numAtt;
if (this.taskId >= 0 && numAtt < 0) {
numAtt = 1;
taskIdIdx = 0;
}
if (numAtt >= 0) {
assert (tuple.size() == numAtt);
Tuple out = this.outputTuple.get(streamId);
for (int i = 0; i < numAtt; ++i) {
out.setField(tuple.get(i), i);
}
if (this.taskId >= 0) {
out.setField(this.taskId, taskIdIdx);
}
if (this.split) {
this.splitTuple.streamId = streamId;
this.splitTuple.value = out;
taskIds = doEmit((OUT) this.splitTuple);
} else {
taskIds = doEmit((OUT) out);
}
} else {
assert (tuple.size() == 1);
if (this.split) {
this.splitTuple.streamId = streamId;
this.splitTuple.value = tuple.get(0);
taskIds = doEmit((OUT) this.splitTuple);
} else {
taskIds = doEmit((OUT) tuple.get(0));
}
}
this.tupleEmitted = true;
return taskIds;
}
AbstractStormCollector.tansformAndEmit,这里主要处理了 split 的场景,即一个 bolt declare 了多个 stream,最后都通过子类 BoltCollector.doEmit 来发射数据
如果 split 为 true,则传给 doEmit 方法的是 splitTuple,即 SplitStreamType,它记录了 streamId 及其 value
如果 split 为 false,则传给 doEmit 方法的是 Tuple 类型,即相当于 SplitStreamType 中的 value,相比于 SplitStreamType 少了 streamId 信息
Task.run
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task’s configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//……
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//……
// now load and instantiate the task’s invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// —————————————————————-
// actual task core work
// —————————————————————-
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//……
}
}
Task 的 run 方法会调用 invokable.invoke(),这里的 invokable 为 StreamTask
StreamTask.invoke
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
* the Task’s operator chain. Operators that are chained together execute synchronously in the
* same thread and hence on the same stream partition. A common case for these chains
* are successive map/flatmap/filter tasks.
*
* <p>The task chain contains one “head” operator and multiple chained operators.
* The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
* as well as for sources, iteration heads and iteration tails.
*
* <p>The Task class deals with the setup of the streams read by the head operator, and the streams
* produced by the operators at the ends of the operator chain. Note that the chain may fork and
* thus have multiple ends.
*
* <p>The life cycle of the task is set up as follows:
* <pre>{@code
* — setInitialState -> provides state of all operators in the chain
*
* — invoke()
* |
* +—-> Create basic utils (config, etc) and load the chain of operators
* +—-> operators.setup()
* +—-> task specific init()
* +—-> initialize-operator-states()
* +—-> open-operators()
* +—-> run()
* +—-> close-operators()
* +—-> dispose-operators()
* +—-> common cleanup
* +—-> task specific cleanup()
* }</pre>
*
* <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
* @param <OUT>
* @param <OP>
*/
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
//……
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
//……
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the “clean shutdown” is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug(“Finished task {}”, getName());
//……
}
finally {
// clean up everything we initialized
isRunning = false;
//……
}
}
}
StreamTask 的 invoke 方法里头调用了子类的 run 方法,这里子类为 OneInputStreamTask
OneInputStreamTask.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the “processInput” method
}
}
该 run 方法主要是调用 inputProcessor.processInput(),这里的 inputProcessor 为 StreamInputProcessor
StreamInputProcessor.processInput
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn(“An exception occurred during the metrics setup.”, e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
//……
}
}
该 processInput 方法,先是通过 currentRecordDeserializer.getNextRecord(deserializationDelegate)获取 nextRecord,之后有调用到 streamOperator.processElement(record)来处理,这里的 streamOperator 为 BoltWrapper
小结
flink 用 BoltWrapper 来包装 storm 的 IRichBolt,它实现 OneInputStreamOperator 接口的 processElement 方法,在该方法中执行 bolt.execute 方法;另外在实现 StreamOperator 的 open 方法中调用 bolt 的 prepare 方法,传入包装 BoltCollector 的 OutputCollector,通过 BoltCollector 来收集 bolt.execute 时发射的数据到 flink,它使用的是 flink 的 TimestampedCollector
BoltCollector 的 emit 方法内部调用了 AbstractStormCollector.tansformAndEmit(它最后调用 BoltCollector.doEmit 方法来发射),针对多个 stream 的场景,封装了 SplitStreamType 的 tuple 给到 doEmit 方法;如果只有一个 stream,则仅仅将普通的 tuple 传给 doEmit 方法
flink 的 Task 的 run 方法会调用 StreamTask 的 invoke 方法,而 StreamTask 的 invoke 方法会调用子类 (这里子类为 OneInputStreamTask) 的 run 方法,OneInputStreamTask 的 run 方法是不断循环调用 inputProcessor.processInput(),这里的 inputProcessor 为 StreamInputProcessor,它的 processInput()会调用 currentRecordDeserializer.getNextRecord(deserializationDelegate)获取 nextRecord,之后根据条件选择调用 streamOperator.processElement(record)方法,这里的 streamOperator 为 BoltWrapper,而 BoltWrapper 的 processElement 正好调用 storm bolt 的 execute 方法来执行 bolt 逻辑并使用 flink 的 BoltCollector 进行发射
doc
Storm Compatibility Beta