序本文主要研究一下storm TridentBoltExecutor的finishBatch方法MasterBatchCoordinator.nextTuplestorm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java public void nextTuple() { sync(); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won’t be a commit for tx 2 (because tx 1 isn’t committed yet), // and there won’t be a batch for tx 4 because there’s max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug(“Emitted on [stream = {}], [tx_status = {}], [{}]”, COMMIT_STREAM_ID, maybeCommit, this); } if(_active) { if(_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for(int i=0; i<_maxTransactionActive; i++) { if(!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if(attemptId==null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for(TransactionalState state: _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } }MasterBatchCoordinator是整个trident的真正的spout,它的nextTuple方法会向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID($batch)发射tupleTridentSpoutCoordinator.executestorm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { _state.cleanupBefore(attempt.getTransactionId()); _coord.success(attempt.getTransactionId()); } else { long txid = attempt.getTransactionId(); Object prevMeta = _state.getPreviousState(txid); Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid)); _state.overrideState(txid, meta); collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta)); } }TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID($batch)发过来的tuple,然后向包装用户spout的TridentBoltExecutor发送batch指令TridentBoltExecutor(TridentSpoutExecutor)storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn’t need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {// System.out.println(“Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()// + " (” + _batches.size() + “)” +// “\ntuple: " + tuple +// “\nwith tracked " + tracked +// “\nwith id " + id + // “\nwith group " + batchGroup// + “\n”);// // } //System.out.println(“Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don’t get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we’ve already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println(“TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { boolean success = true; try { _bolt.finishBatch(tracked.info); String stream = COORD_STREAM(tracked.info.batchGroup); for(Integer task: tracked.condition.targetTasks) { _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); } if(tracked.delayedAck!=null) { _collector.ack(tracked.delayedAck); tracked.delayedAck = null; } } catch(FailedException e) { failBatch(tracked, e); success = false; } _batches.remove(tracked.info.batchId.getId()); return success; }TridentBoltExecutor.execute方法,首先会创建并初始化TrackedBatch(如果TrackedBatch不存在的话),之后接收到batch指令的时候,对tracked.receivedTuple累加,然后调用_bolt.execute(tracked.info, tuple)对于spout来说,这里的_bolt是TridentSpoutExecutor,它的execute方法会往下游的TridentBoltExecutor发射一个batch的tuples;由于spout的expectedTaskReports==0,所以这里在调用完TridentSpoutExecutor发射batch的tuples时,它就立马调用finishBatchfinishBatch操作,这里会通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuplesTridentBoltExecutor(SubtopologyBolt)storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java @Override public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn’t need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {// System.out.println(“Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()// + " (” + _batches.size() + “)” +// “\ntuple: " + tuple +// “\nwith tracked " + tracked +// “\nwith id " + id + // “\nwith group " + batchGroup// + “\n”);// // } //System.out.println(“Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don’t get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we’ve already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println(“TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { boolean success = true; try { _bolt.finishBatch(tracked.info); String stream = COORD_STREAM(tracked.info.batchGroup); for(Integer task: tracked.condition.targetTasks) { _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); } if(tracked.delayedAck!=null) { _collector.ack(tracked.delayedAck); tracked.delayedAck = null; } } catch(FailedException e) { failBatch(tracked, e); success = false; } _batches.remove(tracked.info.batchId.getId()); return success; }这个TridentBoltExecutor是下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型)由于BoltExecutor是使用Utils.asyncLoop来挨个消费receiveQueue的数据的,而且emitBatch的时候也是挨个接收batch的tuples,最后再接收到TridentBoltExecutor(TridentSpoutExecutor)在finishBatch的时候通过COORD_STREAM发过来的[id,count]的tuple(注意这里的COORD_STREAM是分发给每个task的,如果TridentBoltExecutor有多个parallel,则他们是按各自的task来接收的)所以TridentBoltExecutor(SubtopologyBolt)先挨个处理每个tuple,处理完之后才轮到TupleType.COORD这个tuple,然后触发checkFinish操作;在没有commitStream的情况下,tracked.receivedCommit默认为true,因而这里只要检测收到的tuples与应收的tuples数一致,就执行_bolt.finishBatch操作完成一个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple小结对于trident来说,真正的spout是MasterBatchCoordinator,它的nextTuple会触发batch的发送,它将batch指令发送给TridentSpoutCoordinator,而TridentSpoutCoordinator将触发TridentBoltExecutor(TridentSpoutExecutor)的execute方法,进而触发ITridentSpout的emitter的emitBatch,从而发送一个batch的数据TridentBoltExecutor(TridentSpoutExecutor)的expectedTaskReports==0,它在调用完TridentSpoutExecutor发射batch的tuples时,就立马调用finishBatch操作,通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuplesspout的下游bolt为TridentBoltExecutor(SubtopologyBolt),它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型),由于spout是先执行emitBatch操作再最后finishBatch发送[id,count]数据,正常情况下按顺序进入到TridentBoltExecutor(SubtopologyBolt)的receiveQueue队列,然后TridentBoltExecutor(SubtopologyBolt)挨个消费tuple,调用SubtopologyBolt.execute,最后再处理[id,count]数据,触发checkFinish操作,只要检测收到的tuples与应收的tuples数一致,就执行SubtopologyBolt.finishBatch操作完成这个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tupledocTrident Tutorial聊聊storm worker的executor与task聊聊storm的AggregateProcessor的execute及finishBatch方法