乐趣区

聊聊flink如何兼容StormTopology


本文主要研究一下 flink 如何兼容 StormTopology
实例
@Test
public void testStormWordCount() throws Exception {
//NOTE 1 build Topology the Storm way
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“spout”, new RandomWordSpout(), 1);
builder.setBolt(“count”, new WordCountBolt(), 5)
.fieldsGrouping(“spout”, new Fields(“word”));
builder.setBolt(“print”, new PrintBolt(), 1)
.shuffleGrouping(“count”);

//NOTE 2 convert StormTopology to FlinkTopology
FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);

//NOTE 3 execute program locally using FlinkLocalCluster
Config conf = new Config();
// only required to stabilize integration test
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(“stormWordCount”, conf, flinkTopology);
cluster.shutdown();
}

这里使用 FlinkLocalCluster.getLocalCluster() 来创建或获取 FlinkLocalCluster,之后调用 FlinkLocalCluster.submitTopology 来提交 topology,结束时通过 FlinkLocalCluster.shutdown 来关闭 cluster
这里构建的 RandomWordSpout 继承自 storm 的 BaseRichSpout,WordCountBolt 继承自 storm 的 BaseBasicBolt;PrintBolt 继承自 storm 的 BaseRichBolt(由于 flink 是使用的 Checkpoint 机制,不会转换 storm 的 ack 操作,因而这里用 BaseBasicBolt 还是 BaseRichBolt 都无特别要求)
FlinkLocalCluster.submitTopology 这里使用的 topology 是 StormTopoloy 转换后的 FlinkTopology

LocalClusterFactory
flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
// ————————————————————————
// Access to default local cluster
// ————————————————————————

// A different {@link FlinkLocalCluster} to be used for execution of ITCases
private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

/**
* Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
* {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
*
* @return a {@link FlinkLocalCluster} to be used for execution
*/
public static FlinkLocalCluster getLocalCluster() {
return currentFactory.createLocalCluster();
}

/**
* Sets a different factory for FlinkLocalClusters to be used for execution.
*
* @param clusterFactory
* The LocalClusterFactory to create the local clusters for execution.
*/
public static void initialize(LocalClusterFactory clusterFactory) {
currentFactory = Objects.requireNonNull(clusterFactory);
}

// ————————————————————————
// Cluster factory
// ————————————————————————

/**
* A factory that creates local clusters.
*/
public interface LocalClusterFactory {

/**
* Creates a local Flink cluster.
* @return A local Flink cluster.
*/
FlinkLocalCluster createLocalCluster();
}

/**
* A factory that instantiates a FlinkLocalCluster.
*/
public static class DefaultLocalClusterFactory implements LocalClusterFactory {

@Override
public FlinkLocalCluster createLocalCluster() {
return new FlinkLocalCluster();
}
}

flink 在 FlinkLocalCluster 里头提供了一个静态方法 getLocalCluster,用来获取 FlinkLocalCluster,它是通过 LocalClusterFactory 来创建一个 FlinkLocalCluster
LocalClusterFactory 这里使用的是 DefaultLocalClusterFactory 实现类,它的 createLocalCluster 方法,直接 new 了一个 FlinkLocalCluster
目前的实现来看,每次调用 FlinkLocalCluster.getLocalCluster,都会创建一个新的 FlinkLocalCluster,这个在调用的时候是需要注意一下的

FlinkTopology
flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
/**
* Creates a Flink program that uses the specified spouts and bolts.
* @param stormBuilder The Storm topology builder to use for creating the Flink topology.
* @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
*/
public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
return new FlinkTopology(stormBuilder);
}

private FlinkTopology(TopologyBuilder builder) {
this.builder = builder;
this.stormTopology = builder.createTopology();
// extract the spouts and bolts
this.spouts = getPrivateField(“_spouts”);
this.bolts = getPrivateField(“_bolts”);

this.env = StreamExecutionEnvironment.getExecutionEnvironment();

// Kick off the translation immediately
translateTopology();
}

FlinkTopology 提供了一个静态工厂方法 createTopology 用来创建 FlinkTopology
FlinkTopology 先保存一下 TopologyBuilder,然后通过 getPrivateField 反射调用 getDeclaredField 获取_spouts、_bolts 私有属性然后保存起来,方便后面转换 topology 使用
之后先获取到 ExecutionEnvironment,最后就是调用 translateTopology 进行整个 StormTopology 的转换

translateTopology
flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
/**
* Creates a Flink program that uses the specified spouts and bolts.
*/
private void translateTopology() {

unprocessdInputsPerBolt.clear();
outputStreams.clear();
declarers.clear();
availableInputs.clear();

// Storm defaults to parallelism 1
env.setParallelism(1);

/* Translation of topology */

for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
final String spoutId = spout.getKey();
final IRichSpout userSpout = spout.getValue();

final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
this.outputStreams.put(spoutId, sourceStreams);
declarers.put(spoutId, declarer);

final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
final DataStreamSource<?> source;

if (sourceStreams.size() == 1) {
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
spoutWrapperSingleOutput.setStormTopology(stormTopology);

final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];

DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
declarer.getOutputType(outputStreamId));

outputStreams.put(outputStreamId, src);
source = src;
} else {
final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
userSpout, spoutId, null, null);
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);

@SuppressWarnings({“unchecked”, “rawtypes”})
DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
spoutWrapperMultipleOutputs, spoutId,
(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));

SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
.split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
.map(new SplitStreamMapper<Tuple>());
outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
outputStreams.put(streamId, outStream);
}
source = multiSource;
}
availableInputs.put(spoutId, outputStreams);

final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
if (common.is_set_parallelism_hint()) {
int dop = common.get_parallelism_hint();
source.setParallelism(dop);
} else {
common.set_parallelism_hint(1);
}
}

/**
* 1. Connect all spout streams with bolts streams
* 2. Then proceed with the bolts stream already connected
*
* <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
* its producer
* ->thus, we might need to repeat multiple times
*/
boolean makeProgress = true;
while (bolts.size() > 0) {
if (!makeProgress) {
StringBuilder strBld = new StringBuilder();
strBld.append(“Unable to build Topology. Could not connect the following bolts:”);
for (String boltId : bolts.keySet()) {
strBld.append(“\n “);
strBld.append(boltId);
strBld.append(“: missing input streams [“);
for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
.get(boltId)) {
strBld.append(“‘”);
strBld.append(streams.getKey().get_streamId());
strBld.append(“‘ from ‘”);
strBld.append(streams.getKey().get_componentId());
strBld.append(“‘; “);
}
strBld.append(“]”);
}

throw new RuntimeException(strBld.toString());
}
makeProgress = false;

final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
while (boltsIterator.hasNext()) {

final Entry<String, IRichBolt> bolt = boltsIterator.next();
final String boltId = bolt.getKey();
final IRichBolt userBolt = copyObject(bolt.getValue());

final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();

Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
if (unprocessedBoltInputs == null) {
unprocessedBoltInputs = new HashSet<>();
unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
}

// check if all inputs are available
final int numberOfInputs = unprocessedBoltInputs.size();
int inputsAvailable = 0;
for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
final String producerId = entry.getKey().get_componentId();
final String streamId = entry.getKey().get_streamId();
final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
if (streams != null && streams.get(streamId) != null) {
inputsAvailable++;
}
}

if (inputsAvailable != numberOfInputs) {
// traverse other bolts first until inputs are available
continue;
} else {
makeProgress = true;
boltsIterator.remove();
}

final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);

for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
final GlobalStreamId streamId = input.getKey();
final Grouping grouping = input.getValue();

final String producerId = streamId.get_componentId();

final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);

inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
}

final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
userBolt, inputStreams);

if (common.is_set_parallelism_hint()) {
int dop = common.get_parallelism_hint();
outputStream.setParallelism(dop);
} else {
common.set_parallelism_hint(1);
}

}
}
}

整个转换是先转换 spout,再转换 bolt,他们根据的 spouts 及 bolts 信息是在构造器里头使用反射从 storm 的 TopologyBuilder 对象获取到的
flink 使用 FlinkOutputFieldsDeclarer(它实现了 storm 的 OutputFieldsDeclarer 接口) 来承载 storm 的 IRichSpout 及 IRichBolt 里头配置的 declareOutputFields 信息,不过要注意的是 flink 不支持 dirct emit;这里通过 userSpout.declareOutputFields 方法,将原始 spout 的 declare 信息设置到 FlinkOutputFieldsDeclarer
flink 使用 SpoutWrapper 来包装 spout,将其转换为 RichParallelSourceFunction 类型,这里对 spout 的 outputStreams 的个数是否大于 1 进行不同处理;之后就是将 RichParallelSourceFunction 作为 StreamExecutionEnvironment.addSource 方法的参数创建 flink 的 DataStreamSource,并添加到 availableInputs 中,然后根据 spout 的 parallelismHit 来设置 DataStreamSource 的 parallelism
对于 bolt 的转换,这里维护了 unprocessdInputsPerBolt,key 为 boltId,value 为该 bolt 要连接的 GlobalStreamId 及 Grouping 方式,由于是使用 map 来进行遍历的,因此转换的 bolt 可能乱序,如果连接的 GlobalStreamId 存在则进行转换,然后从 bolts 中移除,bolt 连接的 GlobalStreamId 不在 availableInputs 中的时候,需要跳过处理下一个,不会从 bolts 中移除,因为外层的循环条件是 bolts 的 size 大于 0,就是依靠这个机制来处理乱序
对于 bolt 的转换有一个重要的方法就是 processInput,它把 bolt 的 grouping 转换为对 spout 的 DataStream 的对应操作 (比如 shuffleGrouping 转换为对 DataStream 的 rebalance 操作,fieldsGrouping 转换为对 DataStream 的 keyBy 操作,globalGrouping 转换为 global 操作,allGrouping 转换为 broadcast 操作),之后调用 createOutput 方法转换 bolt 的执行逻辑,它使用 BoltWrapper 或者 MergedInputsBoltWrapper 将 bolt 转换为 flink 的 OneInputStreamOperator,然后作为参数对 stream 进行 transform 操作返回 flink 的 SingleOutputStreamOperator,同时将转换后的 SingleOutputStreamOperator 添加到 availableInputs 中,之后根据 bolt 的 parallelismHint 对这个 SingleOutputStreamOperator 设置 parallelism

FlinkLocalCluster
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/api/FlinkLocalCluster.java
/**
* {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
*/
public class FlinkLocalCluster {

/** The log used by this mini cluster. */
private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

/** The Flink mini cluster on which to execute the programs. */
private FlinkMiniCluster flink;

/** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
public static final String SUBMIT_BLOCKING = “SUBMIT_STORM_TOPOLOGY_BLOCKING”;

public FlinkLocalCluster() {
}

public FlinkLocalCluster(FlinkMiniCluster flink) {
this.flink = Objects.requireNonNull(flink);
}

@SuppressWarnings(“rawtypes”)
public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
}

@SuppressWarnings(“rawtypes”)
public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
LOG.info(“Running Storm topology on FlinkLocalCluster”);

boolean submitBlocking = false;
if (conf != null) {
Object blockingFlag = conf.get(SUBMIT_BLOCKING);
if (blockingFlag instanceof Boolean) {
submitBlocking = ((Boolean) blockingFlag).booleanValue();
}
}

FlinkClient.addStormConfigToTopology(topology, conf);

StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
streamGraph.setJobName(topologyName);

JobGraph jobGraph = streamGraph.getJobGraph();

if (this.flink == null) {
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, “0”);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
}

if (submitBlocking) {
this.flink.submitJobAndWait(jobGraph, false);
} else {
this.flink.submitJobDetached(jobGraph);
}
}

public void killTopology(final String topologyName) {
this.killTopologyWithOpts(topologyName, null);
}

public void killTopologyWithOpts(final String name, final KillOptions options) {
}

public void activate(final String topologyName) {
}

public void deactivate(final String topologyName) {
}

public void rebalance(final String name, final RebalanceOptions options) {
}

public void shutdown() {
if (this.flink != null) {
this.flink.stop();
this.flink = null;
}
}

//……
}
FlinkLocalCluster 的 submitTopology 方法调用了 submitTopologyWithOpts,而后者主要是设置一些参数,调用 topology.getExecutionEnvironment().getStreamGraph() 根据 transformations 生成 StreamGraph,再获取 JobGraph,然后创建 LocalFlinkMiniCluster 并 start,最后使用 LocalFlinkMiniCluster 的 submitJobAndWait 或 submitJobDetached 来提交整个 JobGraph
小结

flink 通过 FlinkTopology 对 storm 提供了一定的兼容性,这对于迁移 storm 到 flink 非常有帮助
要在 flink 上运行 storm 的 topology,主要有几个步骤,分别是构建 storm 原生的 TopologyBuilder,之后通过 FlinkTopology.createTopology(builder) 来将 StormTopology 转换为 FlinkTopology,最后是通过 FlinkLocalCluster(本地模式) 或者 FlinkSubmitter(远程提交) 的 submitTopology 方法提交 FlinkTopology
FlinkTopology 是 flink 兼容 storm 的核心,它负责将 StormTopology 转换为 flink 对应的结构,比如使用 SpoutWrapper 将 spout 转换为 RichParallelSourceFunction,然后添加到 StreamExecutionEnvironment 创建 DataStream,把 bolt 的 grouping 转换为对 spout 的 DataStream 的对应操作 (比如 shuffleGrouping 转换为对 DataStream 的 rebalance 操作,fieldsGrouping 转换为对 DataStream 的 keyBy 操作,globalGrouping 转换为 global 操作,allGrouping 转换为 broadcast 操作),然后使用 BoltWrapper 或者 MergedInputsBoltWrapper 将 bolt 转换为 flink 的 OneInputStreamOperator,然后作为参数对 stream 进行 transform 操作
构建完 FlinkTopology 之后,就使用 FlinkLocalCluster 提交到本地执行,或者使用 FlinkSubmitter 提交到远程执行
FlinkLocalCluster 的 submitTopology 方法主要是通过 FlinkTopology 作用的 StreamExecutionEnvironment 生成 StreamGraph,通过它获取 JobGraph,然后创建 LocalFlinkMiniCluster 并 start,最后通过 LocalFlinkMiniCluster 提交 JobGraph

doc
Storm Compatibility Beta

退出移动版