序
本文主要研究一下 storm 的 AggregateProcessor 的 execute 及 finishBatch 方法
实例
TridentTopology topology = new TridentTopology();
topology.newStream(“spout1”, spout)
.groupBy(new Fields(“user”))
.aggregate(new Fields(“user”,”score”),new UserCountAggregator(),new Fields(“val”))
.toStream()
.parallelismHint(1)
.each(new Fields(“val”),new PrintEachFunc(),new Fields());
TridentBoltExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
if(tracked.failed) {
failBatch(tracked);
_collector.fail(tuple);
return;
}
CoordCondition cond = tracked.condition;
boolean delayed = tracked.delayedAck==null &&
(cond.commitStream!=null && type==TupleType.COMMIT
|| cond.commitStream==null);
if(delayed) {
tracked.delayedAck = tuple;
}
boolean failed = false;
if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
if(tracked.receivedTuples == tracked.expectedTupleCount) {
finishBatch(tracked, tuple);
} else {
//TODO: add logging that not all tuples were received
failBatch(tracked);
_collector.fail(tuple);
failed = true;
}
}
if(!delayed && !failed) {
_collector.ack(tuple);
}
}
private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
boolean success = true;
try {
_bolt.finishBatch(tracked.info);
String stream = COORD_STREAM(tracked.info.batchGroup);
for(Integer task: tracked.condition.targetTasks) {
_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
}
if(tracked.delayedAck!=null) {
_collector.ack(tracked.delayedAck);
tracked.delayedAck = null;
}
} catch(FailedException e) {
failBatch(tracked, e);
success = false;
}
_batches.remove(tracked.info.batchId.getId());
return success;
}
public static class TrackedBatch {
int attemptId;
BatchInfo info;
CoordCondition condition;
int reportedTasks = 0;
int expectedTupleCount = 0;
int receivedTuples = 0;
Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
//……
}
用户的 spout 以及 groupBy 操作最后都是被包装为 TridentBoltExecutor,而 groupBy 的 TridentBoltExecutor 则是包装了 SubtopologyBolt
TridentBoltExecutor 在 checkFinish 方法里头会调用 finishBatch 操作(另外接收到 REGULAR 类型的 tuple 时,在 tracked.condition.expectedTaskReports== 0 的时候也会调用 finishBatch 操作,对于 spout 来说 tracked.condition.expectedTaskReports 为 0,因为它是数据源,所以不用接收 COORD_STREAM 更新 expectedTaskReports 以及 expectedTupleCount),而该操作会往 COORD_STREAM 这个 stream 发送 new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)),也就是 new Fields(“id”, “count”),即 batchId 以及发送给目的 task 的 tuple 数量,告知下游的它给 task 发送了多少 tuple(taskEmittedTuples 数据在 CoordinatedOutputCollector 的 emit 及 emitDirect 方法里头维护)
下游也是 TridentBoltExecutor,它在接收到 COORD_STREAM 发来的数据时,更新 expectedTupleCount,而每个 TridentBoltExecutor 在 checkFinish 方法里头会判断,如果 receivedTuples 等于 expectedTupleCount 则表示完整接收完上游发过来的 tuple,然后触发 finishBatch 操作
SubtopologyBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
public class SubtopologyBolt implements ITridentBatchBolt {
//……
@Override
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = _roots.get(sourceStream);
if(ir==null) {
throw new RuntimeException(“Received unexpected tuple ” + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
@Override
public void finishBatch(BatchInfo batchInfo) {
for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
@Override
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
@Override
public void cleanup() {
for(String bg: _myTopologicallyOrdered.keySet()) {
for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
p.cleanup();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for(Node n: _nodes) {
declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields(“$batchId”), n.allOutputFields));
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
protected static class InitialReceiver {
List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
public InitialReceiver(String stream, Fields allFields) {
// TODO: don’t want to project for non-batch bolts…???
// how to distinguish “batch” streams from non-batch streams?
_stream = stream;
_factory = new RootFactory(allFields);
List<String> projected = new ArrayList<>(allFields.toList());
projected.remove(0);
_project = new ProjectionFactory(_factory, new Fields(projected));
}
public void receive(ProcessorContext context, Tuple tuple) {
TridentTuple t = _project.create(_factory.create(tuple));
for(TridentProcessor r: _receivers) {
r.execute(context, _stream, t);
}
}
public void addReceiver(TridentProcessor p) {
_receivers.add(p);
}
public Factory getOutputFactory() {
return _project;
}
}
}
groupBy 操作被包装为一个 SubtopologyBolt,它的 outputFields 的第一个 field 为 $batchId
execute 方法会获取对应的 InitialReceiver,然后调用 receive 方法;InitialReceiver 的 receive 方法调用_receivers 的 execute,这里的 receive 为 AggregateProcessor
finishBatch 方法挨个调用_myTopologicallyOrdered.get(batchInfo.batchGroup)返回的 TridentProcessor 的 finishBatch 方法,这里就是 AggregateProcessor 及 EachProcessor;BatchInfo,包含 batchId、processorContext 及 batchGroup 信息,这里将 processorContext(包含 TransactionAttempt 类型的 batchId 以及 Object 数组 state,state 里头包含 GroupCollector、aggregate 累加结果等)传递给 finishBatch 方法
AggregateProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java
public class AggregateProcessor implements TridentProcessor {
Aggregator _agg;
TridentContext _context;
FreshCollector _collector;
Fields _inputFields;
ProjectionFactory _projection;
public AggregateProcessor(Fields inputFields, Aggregator agg) {
_agg = agg;
_inputFields = inputFields;
}
@Override
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size()!=1) {
throw new RuntimeException(“Aggregate operation can only have one parent”);
}
_context = tridentContext;
_collector = new FreshCollector(tridentContext);
_projection = new ProjectionFactory(parents.get(0), _inputFields);
_agg.prepare(conf, new TridentOperationContext(context, _projection));
}
@Override
public void cleanup() {
_agg.cleanup();
}
@Override
public void startBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
}
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext);
_agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
}
@Override
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
_agg.complete(processorContext.state[_context.getStateIndex()], _collector);
}
@Override
public Factory getOutputFactory() {
return _collector.getOutputFactory();
}
}
AggregateProcessor 在 prepare 创建了 FreshCollector 以及 ProjectionFactory
对于 GroupBy 操作来说,这里的_agg 为 GroupedAggregator,_agg.prepare 传递的 context 为 TridentOperationContext
finishBatch 方法这里调用_agg.complete 方法,传入的 arr 数组,第一个元素为 GroupCollector,第二元素为 aggregator 的累加值;传入的_collector 为 FreshCollector
GroupedAggregator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/impl/GroupedAggregator.java
public class GroupedAggregator implements Aggregator<Object[]> {
ProjectionFactory _groupFactory;
ProjectionFactory _inputFactory;
Aggregator _agg;
ComboList.Factory _fact;
Fields _inFields;
Fields _groupFields;
public GroupedAggregator(Aggregator agg, Fields group, Fields input, int outSize) {
_groupFields = group;
_inFields = input;
_agg = agg;
int[] sizes = new int[2];
sizes[0] = _groupFields.size();
sizes[1] = outSize;
_fact = new ComboList.Factory(sizes);
}
@Override
public void prepare(Map conf, TridentOperationContext context) {
_inputFactory = context.makeProjectionFactory(_inFields);
_groupFactory = context.makeProjectionFactory(_groupFields);
_agg.prepare(conf, new TridentOperationContext(context, _inputFactory));
}
@Override
public Object[] init(Object batchId, TridentCollector collector) {
return new Object[] {new GroupCollector(collector, _fact), new HashMap(), batchId};
}
@Override
public void aggregate(Object[] arr, TridentTuple tuple, TridentCollector collector) {
GroupCollector groupColl = (GroupCollector) arr[0];
Map<List, Object> val = (Map) arr[1];
TridentTuple group = _groupFactory.create((TridentTupleView) tuple);
TridentTuple input = _inputFactory.create((TridentTupleView) tuple);
Object curr;
if(!val.containsKey(group)) {
curr = _agg.init(arr[2], groupColl);
val.put((List) group, curr);
} else {
curr = val.get(group);
}
groupColl.currGroup = group;
_agg.aggregate(curr, input, groupColl);
}
@Override
public void complete(Object[] arr, TridentCollector collector) {
Map<List, Object> val = (Map) arr[1];
GroupCollector groupColl = (GroupCollector) arr[0];
for(Entry<List, Object> e: val.entrySet()) {
groupColl.currGroup = e.getKey();
_agg.complete(e.getValue(), groupColl);
}
}
@Override
public void cleanup() {
_agg.cleanup();
}
}
aggregate 方法的 arr[0]为 GroupCollector;arr[1]为 map,key 为 group 字段的 TridentTupleView,value 为_agg 的 init 返回值用于累加;arr[2]为 TransactionAttempt
_agg 这里为 ChainedAggregatorImpl,aggregate 首先获取 tuple 的 group 字段以及输入的 tuple,然后判断 arr[1]是否有该 group 的值,没有就调用_agg 的 init 初始化一个并添加到 map
aggregate 方法最后调用_agg.aggregate 进行累加
ChainedAggregatorImpl
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/impl/ChainedAggregatorImpl.java
public class ChainedAggregatorImpl implements Aggregator<ChainedResult> {
Aggregator[] _aggs;
ProjectionFactory[] _inputFactories;
ComboList.Factory _fact;
Fields[] _inputFields;
public ChainedAggregatorImpl(Aggregator[] aggs, Fields[] inputFields, ComboList.Factory fact) {
_aggs = aggs;
_inputFields = inputFields;
_fact = fact;
if(_aggs.length!=_inputFields.length) {
throw new IllegalArgumentException(“Require input fields for each aggregator”);
}
}
public void prepare(Map conf, TridentOperationContext context) {
_inputFactories = new ProjectionFactory[_inputFields.length];
for(int i=0; i<_inputFields.length; i++) {
_inputFactories[i] = context.makeProjectionFactory(_inputFields[i]);
_aggs[i].prepare(conf, new TridentOperationContext(context, _inputFactories[i]));
}
}
public ChainedResult init(Object batchId, TridentCollector collector) {
ChainedResult initted = new ChainedResult(collector, _aggs.length);
for(int i=0; i<_aggs.length; i++) {
initted.objs[i] = _aggs[i].init(batchId, initted.collectors[i]);
}
return initted;
}
public void aggregate(ChainedResult val, TridentTuple tuple, TridentCollector collector) {
val.setFollowThroughCollector(collector);
for(int i=0; i<_aggs.length; i++) {
TridentTuple projected = _inputFactories[i].create((TridentTupleView) tuple);
_aggs[i].aggregate(val.objs[i], projected, val.collectors[i]);
}
}
public void complete(ChainedResult val, TridentCollector collector) {
val.setFollowThroughCollector(collector);
for(int i=0; i<_aggs.length; i++) {
_aggs[i].complete(val.objs[i], val.collectors[i]);
}
if(_aggs.length > 1) {// otherwise, tuples were emitted directly
int[] indices = new int[val.collectors.length];
for(int i=0; i<indices.length; i++) {
indices[i] = 0;
}
boolean keepGoing = true;
//emit cross-join of all emitted tuples
while(keepGoing) {
List[] combined = new List[_aggs.length];
for(int i=0; i< _aggs.length; i++) {
CaptureCollector capturer = (CaptureCollector) val.collectors[i];
combined[i] = capturer.captured.get(indices[i]);
}
collector.emit(_fact.create(combined));
keepGoing = increment(val.collectors, indices, indices.length – 1);
}
}
}
//return false if can’t increment anymore
private boolean increment(TridentCollector[] lengths, int[] indices, int j) {
if(j==-1) return false;
indices[j]++;
CaptureCollector capturer = (CaptureCollector) lengths[j];
if(indices[j] >= capturer.captured.size()) {
indices[j] = 0;
return increment(lengths, indices, j-1);
}
return true;
}
public void cleanup() {
for(Aggregator a: _aggs) {
a.cleanup();
}
}
}
init 方法返回的是 ChainedResult,它的 objs 字段存放每个_aggs 对应的 init 结果
这里的_agg 如果是 Aggregator 类型,则为用户在 groupBy 之后 aggregate 方法传入的 aggregator;如果是 CombinerAggregator 类型,它会被 CombinerAggregatorCombineImpl 包装一下
ChainedAggregatorImpl 的 complete 方法,_aggs 挨个调用 complete,传入的第一个参数为 val.objs[i],即每个_agg 对应的累加值
小结
groupBy 被包装为一个 SubtopologyBolt,它的 execute 方法会触发 InitialReceiver 的 receive 方法,而 receive 方法会触发_receivers 的 execute 方法,第一个_receivers 为 AggregateProcessor
AggregateProcessor 包装了 GroupedAggregator,而 GroupedAggregator 包装了 ChainedAggregatorImpl,而 ChainedAggregatorImpl 包装了 Aggregator 数组,本实例只有一个,即在 groupBy 之后 aggregate 方法传入的 aggregator
TridentBoltExecutor 会从 coordinator 那里接收 COORD_STREAM_PREFIX 发送过来的应该接收到的 tuple 的 count,然后更新 expectedTupleCount,然后进行 checkFinish 判断,当 receivedTuples(每次接收到 spout 的 batch 的一个 tuple 就更新该值)等于 expectedTupleCount 的时候,会触发 finishBatch 操作,该操作会调用 SubtopologyBolt.finishBatch,进而调用 AggregateProcessor.finishBatch,进而调用 GroupedAggregator.complete,进而调用 ChainedAggregatorImpl.complete,进而调用用户的 aggregator 的 complete
对于包装了 TridentSpoutExecutor 的 TridentBoltExecutor 来说,它的 tracked.condition.expectedTaskReports 为 0,因为它是数据源,所以不用接收 COORD_STREAM 更新 expectedTaskReports 以及 expectedTupleCount;当它在 execute 方法接收到 MasterBatchCoordinator 的 MasterBatchCoordinator.BATCH_STREAM_ID($batch)发来的 tuple 的时候,调用 TridentSpoutExecutor 的 execute 方法,之后就由于 tracked.condition.expectedTaskReports==0(本实例两个 TridentBoltExecutor 的 TrackedBatch 的 condition.commitStream 为 null,因而 receivedCommit 为 true),就立即调用 finishBatch(里头会调用 TridentSpoutExecutor 的 finishBatch 方法,之后通过 COORD_STREAM 给下游 TridentBoltExecutor 的 task 发送 batchId 及 taskEmittedTuples 数量;而对于下游 TridentBoltExecutor 它的 expectedTaskReports 不为 0,则需要在收到 COORD_STREAM 的 tuple 的时候才能 checkFinish,判断是否可以 finishBatch)
TridentSpoutExecutor 的 execute 会调用 emitter(最后调用用户的 spout)发射一个 batch;而 finishBatch 方法目前为空,没有做任何操作;也就是说对于包装了 TridentSpoutExecutor 的 TridentBoltExecutor 来说,它接收到发射一个 batch 的指令之后,调用完 TridentSpoutExecutor.execute 通过 emitter 发射一个 batch,就立马执行 finishBatch 操作 (发射[id,count] 给下游的 TridentBoltExecutor,下游 TridentBoltExecutor 在接收到 [id,count] 数据时更新 expectedTupleCount,然后进行 checkFinish 判断,如果 receivedTuples 等于 expectedTupleCount,就触发 finishBatch 操作,进而触发 AggregateProcessor 的 finishBatch 操作)
doc
Windowing Support in Core Storm
聊聊 storm TridentTopology 的构建
聊聊 storm trident 的 coordinator