序
本文主要研究一下 storm 的 direct grouping
direct grouping
direct grouping 是一种特殊的 grouping,它是由上游的 producer 直接指定下游哪个 task 去接收它发射出来的 tuple。direct grouping 的使用有如下几个步骤:
1、上游在 prepare 方法保存下游 bolt 的 taskId 列表
public class SentenceDirectBolt extends BaseRichBolt {
private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class);
private OutputCollector collector;
private List<Integer> taskIds;
private int numCounterTasks;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
//NOTE 1 这里要取到下游的 bolt 的 taskId,用于 emitDirect 时指定 taskId
this.taskIds = context.getComponentTasks(“count-bolt”);
this.numCounterTasks = taskIds.size();
}
//……
}
这里保存了下游的 bolt 的 taskId 列表,用于 emitDirect 时选择 taskId
2、上游在 declareOutputFields 使用 declareStream 声明 streamId
public class SentenceDirectBolt extends BaseRichBolt {
//……
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“word”));
//NOTE 2 这里要通过 declareStream 声明 direct stream,并指定 streamId
declarer.declareStream(“directStreamDemo1”,true,new Fields(“word”));
declarer.declareStream(“directStreamDemo2”,true,new Fields(“word”));
}
}
这里声明了两个 streamId,一个是 directStreamDemo1,一个是 directStreamDemo2
3、上游采用 emitDirect 指定下游 taskId 及 streamId
public class SentenceDirectBolt extends BaseRichBolt {
//……
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField(“sentence”);
String[] words = sentence.split(” “);
for(String word : words){
int targetTaskId = getWordCountTaskId(word);
LOGGER.info(“word:{} choose taskId:{}”,word,targetTaskId);
// NOTE 3 这里指定发送给下游 bolt 的哪个 taskId,同时指定 streamId
if(targetTaskId % 2 == 0){
this.collector.emitDirect(targetTaskId,”directStreamDemo1″,new Values(word));
}else{
this.collector.emitDirect(targetTaskId,”directStreamDemo2″,new Values(word));
}
}
this.collector.ack(tuple);
}
}
这里使用 emitDirect(int taskId, String streamId, List<Object> tuple) 方法指定了下游的 taskId 以及要发送到的 streamId
4、下游使用 directGrouping 连接上游 bolt 及 streamId
@Test
public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“sentence-spout”, new SentenceSpout());
// SentenceSpout –> SplitSentenceBolt
builder.setBolt(“split-bolt”, new SentenceDirectBolt()).shuffleGrouping(“sentence-spout”);
// SplitSentenceBolt –> WordCountBolt
//NOTE 4 这里要指定上游的 bolt 以及要处理的 streamId
builder.setBolt(“count-bolt”, new WordCountBolt(),5).directGrouping(“split-bolt”,”directStreamDemo1″);
// WordCountBolt –> ReportBolt
builder.setBolt(“report-bolt”, new ReportBolt()).globalGrouping(“count-bolt”);
submitRemote(builder);
}
这里 count-bolt 作为 split-bolt 的下游,使用了 directGrouping,同时指定了要接收的 streamId 为 directStreamDemo1
小结
direct grouping 是一种特殊的 grouping,它是由上游的 producer 直接指定下游哪个 task 去接收它发射出来的 tuple。
下游使用 directGrouping 连接上游同时指定要消费的 streamId,上游在 prepare 的时候保存下游的 taskId 列表,然后在 declareOutputFields 的时候使用 declareStream 来声明 streamId,最后在 execute 方法里头使用 emitDirect(int taskId, String streamId, List<Object> tuple) 方法指定了下游的 taskId 以及要发送到的 streamId
doc
Concepts
Common Topology Patterns
关于 Storm Stream grouping