序
本文主要研究一下 storm TridentBoltExecutor 的 finishBatch 方法
MasterBatchCoordinator.nextTuple
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public void nextTuple() {
sync();
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won’t be a commit for tx 2 (because tx 1 isn’t committed yet),
// and there won’t be a batch for tx 4 because there’s max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug(“Emitted on [stream = {}], [tx_status = {}], [{}]”, COMMIT_STREAM_ID, maybeCommit, this);
}
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, BATCH_STREAM_ID, attempt, newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
MasterBatchCoordinator 是整个 trident 的真正的 spout,它的 nextTuple 方法会向 TridentSpoutCoordinator 向 MasterBatchCoordinator.BATCH_STREAM_ID($batch)发射 tuple
TridentSpoutCoordinator.execute
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java
public void execute(Tuple tuple, BasicOutputCollector collector) {
TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
} else {
long txid = attempt.getTransactionId();
Object prevMeta = _state.getPreviousState(txid);
Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
_state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
}
}
TridentSpoutCoordinator 接收 MasterBatchCoordinator 在 MasterBatchCoordinator.BATCH_STREAM_ID($batch)发过来的 tuple,然后向包装用户 spout 的 TridentBoltExecutor 发送 batch 指令
TridentBoltExecutor(TridentSpoutExecutor)
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now – _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn’t need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println(“Received in ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex()
// + ” (” + _batches.size() + “)” +
// “\ntuple: ” + tuple +
// “\nwith tracked ” + tracked +
// “\nwith id ” + id +
// “\nwith group ” + batchGroup
// + “\n”);
//
// }
//System.out.println(“Num tracked: ” + _batches.size() + ” ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don’t get an explosion in memory usage in the tasks
if(tracked!=null) {
if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we’ve already seen
return;
}
}
if(tracked==null) {
tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println(“TRACKED: ” + tracked + ” ” + tuple);
TupleType t = getTupleType(tuple, tracked);
if(t==TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if(t==TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount+=count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
} catch(FailedException e) {
failBatch(tracked, e);
}
if(success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
boolean success = true;
try {
_bolt.finishBatch(tracked.info);
String stream = COORD_STREAM(tracked.info.batchGroup);
for(Integer task: tracked.condition.targetTasks) {
_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
}
if(tracked.delayedAck!=null) {
_collector.ack(tracked.delayedAck);
tracked.delayedAck = null;
}
} catch(FailedException e) {
failBatch(tracked, e);
success = false;
}
_batches.remove(tracked.info.batchId.getId());
return success;
}
TridentBoltExecutor.execute 方法,首先会创建并初始化 TrackedBatch(如果 TrackedBatch 不存在的话),之后接收到 batch 指令的时候,对 tracked.receivedTuple 累加,然后调用_bolt.execute(tracked.info, tuple)
对于 spout 来说,这里的_bolt 是 TridentSpoutExecutor,它的 execute 方法会往下游的 TridentBoltExecutor 发射一个 batch 的 tuples;由于 spout 的 expectedTaskReports==0,所以这里在调用完 TridentSpoutExecutor 发射 batch 的 tuples 时,它就立马调用 finishBatch
finishBatch 操作,这里会通过 COORD_STREAM 往下游的 TridentBoltExecutor 发射 [id,count] 数据,告知下游 TridentBoltExecutor 说它一共发射了多少 tuples
TridentBoltExecutor(SubtopologyBolt)
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
@Override
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now – _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn’t need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println(“Received in ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex()
// + ” (” + _batches.size() + “)” +
// “\ntuple: ” + tuple +
// “\nwith tracked ” + tracked +
// “\nwith id ” + id +
// “\nwith group ” + batchGroup
// + “\n”);
//
// }
//System.out.println(“Num tracked: ” + _batches.size() + ” ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don’t get an explosion in memory usage in the tasks
if(tracked!=null) {
if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we’ve already seen
return;
}
}
if(tracked==null) {
tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println(“TRACKED: ” + tracked + ” ” + tuple);
TupleType t = getTupleType(tuple, tracked);
if(t==TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if(t==TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount+=count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
} catch(FailedException e) {
failBatch(tracked, e);
}
if(success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
if(tracked.failed) {
failBatch(tracked);
_collector.fail(tuple);
return;
}
CoordCondition cond = tracked.condition;
boolean delayed = tracked.delayedAck==null &&
(cond.commitStream!=null && type==TupleType.COMMIT
|| cond.commitStream==null);
if(delayed) {
tracked.delayedAck = tuple;
}
boolean failed = false;
if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
if(tracked.receivedTuples == tracked.expectedTupleCount) {
finishBatch(tracked, tuple);
} else {
//TODO: add logging that not all tuples were received
failBatch(tracked);
_collector.fail(tuple);
failed = true;
}
}
if(!delayed && !failed) {
_collector.ack(tuple);
}
}
private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
boolean success = true;
try {
_bolt.finishBatch(tracked.info);
String stream = COORD_STREAM(tracked.info.batchGroup);
for(Integer task: tracked.condition.targetTasks) {
_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
}
if(tracked.delayedAck!=null) {
_collector.ack(tracked.delayedAck);
tracked.delayedAck = null;
}
} catch(FailedException e) {
failBatch(tracked, e);
success = false;
}
_batches.remove(tracked.info.batchId.getId());
return success;
}
这个 TridentBoltExecutor 是下游的 bolt,它的_bolt 是 SubtopologyBolt,而且它的 tracked.condition.expectedTaskReports 不为 0,因而它是在接收到 TupleType.COORD 的 tuple 的时候,才进行 checkFinish 操作(这里先忽略 TupleType.COMMIT 类型)
由于 BoltExecutor 是使用 Utils.asyncLoop 来挨个消费 receiveQueue 的数据的,而且 emitBatch 的时候也是挨个接收 batch 的 tuples,最后再接收到 TridentBoltExecutor(TridentSpoutExecutor)在 finishBatch 的时候通过 COORD_STREAM 发过来的 [id,count] 的 tuple(注意这里的 COORD_STREAM 是分发给每个 task 的,如果 TridentBoltExecutor 有多个 parallel,则他们是按各自的 task 来接收的)
所以 TridentBoltExecutor(SubtopologyBolt)先挨个处理每个 tuple,处理完之后才轮到 TupleType.COORD 这个 tuple,然后触发 checkFinish 操作;在没有 commitStream 的情况下,tracked.receivedCommit 默认为 true,因而这里只要检测收到的 tuples 与应收的 tuples 数一致,就执行_bolt.finishBatch 操作完成一个 batch,然后再往它的下游 TridentBoltExecutor 发射它应收的 [id,count] 的 tuple
小结
对于 trident 来说,真正的 spout 是 MasterBatchCoordinator,它的 nextTuple 会触发 batch 的发送,它将 batch 指令发送给 TridentSpoutCoordinator,而 TridentSpoutCoordinator 将触发 TridentBoltExecutor(TridentSpoutExecutor)的 execute 方法,进而触发 ITridentSpout 的 emitter 的 emitBatch,从而发送一个 batch 的数据
TridentBoltExecutor(TridentSpoutExecutor)的 expectedTaskReports==0,它在调用完 TridentSpoutExecutor 发射 batch 的 tuples 时,就立马调用 finishBatch 操作,通过 COORD_STREAM 往下游的 TridentBoltExecutor 发射 [id,count] 数据,告知下游 TridentBoltExecutor 说它一共发射了多少 tuples
spout 的下游 bolt 为 TridentBoltExecutor(SubtopologyBolt),它的 tracked.condition.expectedTaskReports 不为 0,因而它是在接收到 TupleType.COORD 的 tuple 的时候,才进行 checkFinish 操作 (这里先忽略 TupleType.COMMIT 类型),由于 spout 是先执行 emitBatch 操作再最后 finishBatch 发送[id,count] 数据,正常情况下按顺序进入到 TridentBoltExecutor(SubtopologyBolt)的 receiveQueue 队列,然后 TridentBoltExecutor(SubtopologyBolt)挨个消费 tuple,调用 SubtopologyBolt.execute,最后再处理 [id,count] 数据,触发 checkFinish 操作,只要检测收到的 tuples 与应收的 tuples 数一致,就执行 SubtopologyBolt.finishBatch 操作完成这个 batch,然后再往它的下游 TridentBoltExecutor 发射它应收的 [id,count] 的 tuple
doc
Trident Tutorial
聊聊 storm worker 的 executor 与 task
聊聊 storm 的 AggregateProcessor 的 execute 及 finishBatch 方法