聊聊storm TridentTopology的构建

73次阅读

共计 19715 个字符,预计需要花费 50 分钟才能阅读完成。


本文主要研究一下 storm TridentTopology 的构建
实例
@Test
public void testDebugTopologyBuild(){
FixedBatchSpout spout = new FixedBatchSpout(new Fields(“user”, “score”), 3,
new Values(“nickt1”, 4),
new Values(“nickt2”, 7),
new Values(“nickt3”, 8),
new Values(“nickt4”, 9),
new Values(“nickt5”, 7),
new Values(“nickt6”, 11),
new Values(“nickt7”, 5)
);
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream(“spout1”,spout)
.each(new Fields(“user”, “score”), new BaseFunction() {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println(“tuple:”+tuple);
}
},new Fields());

topology.build();
}
后面的分析为了简单起见,很多是依据这个实例来
TridentTopology.newStream
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
}

public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
}

public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}

public Stream newStream(String txId, ITridentDataSource dataSource) {
if (dataSource instanceof IBatchSpout) {
return newStream(txId, (IBatchSpout) dataSource);
} else if (dataSource instanceof ITridentSpout) {
return newStream(txId, (ITridentSpout) dataSource);
} else if (dataSource instanceof IPartitionedTridentSpout) {
return newStream(txId, (IPartitionedTridentSpout) dataSource);
} else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
} else {
throw new UnsupportedOperationException(“Unsupported stream”);
}
}

public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}

public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}

protected Stream addNode(Node n) {
registerNode(n);
return new Stream(this, n.name, n);
}

protected void registerNode(Node n) {
_graph.addVertex(n);
if(n.stateInfo!=null) {
String id = n.stateInfo.id;
if(!_colocate.containsKey(id)) {
_colocate.put(id, new ArrayList());
}
_colocate.get(id).add(n);
}
}

newStream 的第一个参数是 txId,第二个参数是 ITridentDataSource
ITridentDataSource 分为好几个类型,分别有 IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout
最后都是创建 SpoutNode,然后 registerNode 添加到_graph(如果 node 的 stateInfo 不为 null,还会添加到_colocate,不过 SpoutNode 该值为 null),注意 SpoutNode 的 SpoutType 为 SpoutNode.SpoutType.BATCH

Node
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/Node.java
public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
private static final AtomicInteger INDEX = new AtomicInteger(0);
private String nodeId;

public String name = null;
public Fields allOutputFields;
public String streamId;
public Integer parallelismHint = null;
public NodeStateInfo stateInfo = null;
public int creationIndex;

public Node(String streamId, String name, Fields allOutputFields) {
this.nodeId = UUID.randomUUID().toString();
this.allOutputFields = allOutputFields;
this.streamId = streamId;
this.name = name;
this.creationIndex = INDEX.incrementAndGet();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return nodeId.equals(((Node) o).nodeId);
}

@Override
public int hashCode() {
return nodeId.hashCode();
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}

public String shortString() {
return “nodeId: ” + nodeId + “, allOutputFields: ” + allOutputFields;
}
}

Node 继承了 DefaultResourceDeclarer,而它实现了 resources 相关的接口:ResourceDeclarer 以及 ITridentResource
Node 有几个子类,分别是 SpoutNode、ProcessorNode、PartitionNode
SpoutNode 就是 spout 信息的节点描述,ProcessorNode 一般是 trident 的 each、map、aggregrate、reduce、project 等操作的节点描述,PartitionNode 就是 partition 相关的节点描述

TridentTopology.build
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public StormTopology build() {
DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();

//……

List<SpoutNode> spoutNodes = new ArrayList<>();

// can be regular nodes (static state) or processor nodes
Set<Node> boltNodes = new LinkedHashSet<>();
for(Node n: graph.vertexSet()) {
if(n instanceof SpoutNode) {
spoutNodes.add((SpoutNode) n);
} else if(!(n instanceof PartitionNode)) {
boltNodes.add(n);
}
}

Set<Group> initialGroups = new LinkedHashSet<>();

//……

for(Node n: boltNodes) {
initialGroups.add(new Group(graph, n));
}

GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
grouper.mergeFully();
Collection<Group> mergedGroups = grouper.getAllGroups();

// add identity partitions between groups
for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(e.target);
// g1 being null means the source is a spout node
if(g1==null && !(e.source instanceof SpoutNode))
throw new RuntimeException(“Planner exception: Null source group must indicate a spout node at this phase of planning”);
if(g1==null || !g1.equals(g2)) {
graph.removeEdge(e);
PartitionNode pNode = makeIdentityPartition(e.source);
graph.addVertex(pNode);
graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));
}
}
}

//……

// add in spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {
grouper.addGroup(new Group(graph, n));
}

grouper.reindex();
mergedGroups = grouper.getAllGroups();

Map<Node, String> batchGroupMap = new HashMap<>();
List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {
String groupId = “bg” + i;
for(Node n: connectedComponents.get(i)) {
batchGroupMap.put(n, groupId);
}
}

// System.out.println(“GRAPH:”);
// System.out.println(graph);

Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);

TridentTopologyBuilder builder = new TridentTopologyBuilder();

Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);

for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));

Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
spoutRes.putAll(sn.getResources());

Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

SpoutDeclarer spoutDeclarer = null;

if(sn.type == SpoutNode.SpoutType.DRPC) {

spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {
s = (ITridentSpout) sn.spout;
} else {
throw new RuntimeException(“Regular rich spouts not supported yet… try wrapping in a RichSpoutBatchExecutor”);
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
}
spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
}

if(onHeap != null) {
if(offHeap != null) {
spoutDeclarer.setMemoryLoad(onHeap, offHeap);
}
else {
spoutDeclarer.setMemoryLoad(onHeap);
}
}

if(cpuLoad != null) {
spoutDeclarer.setCPULoad(cpuLoad);
}
}

for(Group g: mergedGroups) {
if(!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
Map<String, Number> groupRes = g.getResources(_resourceDefaults);

Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);

if(onHeap != null) {
if(offHeap != null) {
d.setMemoryLoad(onHeap, offHeap);
}
else {
d.setMemoryLoad(onHeap);
}
}

if(cpuLoad != null) {
d.setCPULoad(cpuLoad);
}

Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
String componentId = parent instanceof SpoutNode ?
spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
}
}
}
HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
combinedMasterCoordResources.putAll(_masterCoordResources);
return builder.buildTopology(combinedMasterCoordResources);
}

这里创建了 TridentTopologyBuilder,然后对于 spoutNodes,调用 TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup)方法,添加 spout
对于 IBatchSpout 类型的 spout,通过 BatchSpoutExecutor 包装为 ITridentSpout
这里的 streamName 为 streamId,通过 UniqueIdGen.getUniqueStreamId 生成,以 s 开头,之后是_streamCounter 的计数,比如 1,合起来就是 s1;txStateId 为用户传入的 txId;batchGroup 以 bg 开头,之后是 connectedComponents 的元素的 index,比如 0,合起来就是 bg0;parallelism 参数就是用户构建 topology 时设置的
设置完 spout 之后,就是设置 spout 的相关资源配置,比如 memoryLoad、cpuLoad;之后设置 bolt,这里使用的是 SubtopologyBolt,然后设置 bolt 相关的资源配置
最后调用 TridentTopologyBuilder.buildTopology

TridentTopologyBuilder.setSpout
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, TransactionalSpoutComponent> _spouts = new HashMap();

public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
Map<String, String> batchGroups = new HashMap();
batchGroups.put(streamName, batchGroup);
markBatchGroups(id, batchGroups);

TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
_spouts.put(id, c);
return new SpoutDeclarerImpl(c);
}

private void markBatchGroups(String component, Map<String, String> batchGroups) {
for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
_batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
}
}
这里调用了 markBatchGroups,将新的 component 添加到_batchIds 中,同时也添加到_spouts 中
TridentTopologyBuilder.setBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, Component> _bolts = new HashMap();

// map from stream name to batch id
public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
markBatchGroups(id, batchGroups);
Component c = new Component(bolt, parallelism, committerBatches);
_bolts.put(id, c);
return new BoltDeclarerImpl(c);

}

private void markBatchGroups(String component, Map<String, String> batchGroups) {
for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
_batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
}
}
这里调用了 markBatchGroups 将新的 component 添加到_batchIds 中,同时也添加到_bolts 中;对于 trident 来说,就是一系列的 ProcessorNode(可能也会有 PartitionNode)
TridentTopologyBuilder.buildTopology
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();

for(String id: _spouts.keySet()) {
TransactionalSpoutComponent c = _spouts.get(id);
if(c.spout instanceof IRichSpout) {

//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {
String batchGroup = c.batchGroupId;
if(!batchesToCommitIds.containsKey(batchGroup)) {
batchesToCommitIds.put(batchGroup, new ArrayList<String>());
}
batchesToCommitIds.get(batchGroup).add(c.commitStateId);

if(!batchesToSpouts.containsKey(batchGroup)) {
batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
}
batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);

BoltDeclarer scd =
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);

for(Map<String, Object> m: c.componentConfs) {
scd.addConfigurations(m);
}

Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
for(Map<String, Object> m: c.componentConfs) {
bd.addConfigurations(m);
}
}
}

//……

Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

for(String batch: batchesToCommitIds.keySet()) {
List<String> commitIds = batchesToCommitIds.get(batch);
SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));

if(onHeap != null) {
if(offHeap != null) {
masterCoord.setMemoryLoad(onHeap, offHeap);
}
else {
masterCoord.setMemoryLoad(onHeap);
}
}

if(cpuLoad != null) {
masterCoord.setCPULoad(cpuLoad);
}
}

for(String id: _bolts.keySet()) {
Component c = _bolts.get(id);

Map<String, CoordSpec> specs = new HashMap<>();

for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
String batch = batchIdsForBolts.get(s);
if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
CoordSpec spec = specs.get(batch);
CoordType ct;
if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
ct = CoordType.single();
} else {
ct = CoordType.all();
}
spec.coords.put(s.get_componentId(), ct);
}

for(String b: c.committerBatches) {
specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
}

BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
for(Map<String, Object> conf: c.componentConfs) {
d.addConfigurations(conf);
}

for(InputDeclaration inputDecl: c.declarations) {
inputDecl.declare(d);
}

Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {
for(String comp: entry.getValue()) {
d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey()));
}
}

for(String b: c.committerBatches) {
d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
}

return builder.createTopology();
}

buildTopology 对于非 IRichSpout 的的 spout 会在 topology 中创建 TridentSpoutCoordinator 这个 bolt,它 globalGrouping 了 MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success)这两个 stream;同时还创建了 TridentBoltExecutor 这个 bolt,它 allGrouping 了 MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success),对于 spout 是 ICommitterTridentSpout 类型的,还 allGrouping 了 MasterBatchCoordinator.COMMIT_STREAM_ID($commit);注意这里将非 IRichSpout 的 spout 转换为 bolt
之后对 batchesToCommitIds 中的每个 batch 创建 MasterBatchCoordinator 这个 spout,正好前前面的 TridentSpoutCoordinator 以及 TridentBoltExecutor 衔接起来
对于 bolt 来说(包装了 ProcessorNode 的 SubtopologyBolt),这里设置了 TridentBoltExecutor 这个 bolt,它 directGrouping 了 TridentBoltExecutor.COORD_STREAM($coord-),同时还 allGrouping 了 MasterBatchCoordinator.COMMIT_STREAM_ID($commit)

TridentTopologyBuilder.createTopology
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try{
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
“Bolt ‘” + boltId + “‘ contains a non-serializable field of type ” + wrapperCause.getCause().getMessage() + “, ” +
“which was instantiated prior to topology creation. ” + wrapperCause.getCause().getMessage() + ” ” +
“should be instantiated within the prepare method of ‘” + boltId + ” at the earliest.”, wrapperCause);
}
throw wrapperCause;
}
}
for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try{
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}catch(RuntimeException wrapperCause){
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
throw new IllegalStateException(
“Spout ‘” + spoutId + “‘ contains a non-serializable field of type ” + wrapperCause.getCause().getMessage() + “, ” +
“which was instantiated prior to topology creation. ” + wrapperCause.getCause().getMessage() + ” ” +
“should be instantiated within the prepare method of ‘” + spoutId + ” at the earliest.”, wrapperCause);
}
throw wrapperCause;
}
}

StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());

stormTopology.set_worker_hooks(_workerHooks);

return Utils.addVersions(stormTopology);
}

/**
* If the topology has at least one stateful bolt
* add a {@link CheckpointSpout} component to the topology.
*/
private void maybeAddCheckpointSpout() {
if (hasStatefulBolt) {
setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
}
}

createTopology 的时候,判断如果有 stateful 的 bolt,则会添加 CheckpointSpout 这个 spout;同时对每个 bolt 判断如果是 statefulBolt 且不是 StatefulBoltExecutor,那么会添加 CheckpointTupleForwarder
经过 buildTopology 的一系列设置,到了 createTopology 这里,已经有了 3 个 bolt,一个是包装了 ProcessNode 的 TridentBoltExecutor,一个是 TridentSpoutCoordinator,还有一个是包装了原始 spout 的 TridentBoltExecutor
spout 这里只有一个就是 MasterBatchCoordinator,在 buildTopology 的时候,对于非 IRichSpout 的的 spout,会被转化为 TridentSpoutCoordinator 这个 bolt

拓扑结构

以前面的实例来讲,经过 TridentTopologyBuilder 的 createTopology,最后的拓扑结构为一个 spout 为 MasterBatchCoordinator($mastercoord-bg0),3 个 bolt 分别为 TridentSpoutCoordinator($spoutcoord-spout-spout1)、包装了非 IRichSpout 的的 spout 的 TridentBoltExecutor(spout-spout1)、包装了 ProcessorNode 的 TridentBoltExecutor(b-0);一共涉及到了几个 stream,分别为 MasterBatchCoordinator.SUCCESS_STREAM_ID($success)、MasterBatchCoordinator.COMMIT_STREAM_ID($commit)、MasterBatchCoordinator.BATCH_STREAM_ID($batch)、TridentBoltExecutor.COORD_STREAM($coord-bg0)、s1、s2

$mastercoord-bg0 它 declare 了 $success、$commit、$batch 这三个 stream,outputFields 均为 tx 这个字段

$spoutcoord-spout-spout1 它接收了 $mastercoord-bg0 的 $success、$batch 这两个 stream,同时 declare 了 $batch 这个 stream,outputFields 为[tx,metadata]

spout-spout1,它 allGrouping 接收 $mastercoord-bg0 的 $success,以及 $spoutcoord-spout-spout1 的 $batch 这两个 stream 的数据;同时会往 $coord-bg0 发送 [id,count] 数据,以及 stream(s1)发送数据 tuple

b- 0 它接收了 spout-spout1 的 $coord-bg0 以及 s1 这两个 stream 的数据,之后往 stream(s2)发送数据 (output_fields:[$batchId, user, score]),同时也会往 stream($coord-bg0) 发送 [id, count] 数据

小结

TridentTopologyBuilder 在 buildTopology 的时候,对于非 IRichSpout 的的 spout,会被转化为 TridentBoltExecutor 这个 bolt,同时会新增一个 TridentSpoutCoordinator 这个 bolt;ProcessorNode 则会被包装为 TridentBoltExecutor 这个 bolt;TridentTopology 为了方便管理将用户设定的 spout 包装为 bolt,然后创建 MasterBatchCoordinator 作为真正的 spout
TridentBoltExecutor.COORD_STREAM($coord-)这个 stream 用来在 component 之间传递 [id, count] 数据,用于保障 tuple 在每个 component 能够完整传输,即 spout 和 bolt 都会往该 stream 发送 [id, count] 数据
MasterBatchCoordinator、TridentSpoutCoordinator、包装原始 spout 的 TridentBoltExecutor(spout-spout1)它们之间的关系如下:master 会给 spout-spout1 发送 suceess 数据(tuple\ 指令),给 coordinator 发送 suceess、batch 数据(tuple\ 指令);coordinator 会给 spout-spout1 发送 batch 数据(tuple\ 指令)

doc

Trident API Overview
Trident Spouts
聊聊 storm 的 LinearDRPCTopologyBuilder

正文完
 0