序
本文主要研究一下 storm trident 的 coordinator
实例
代码示例
@Test
public void testDebugTopologyBuild(){
FixedBatchSpout spout = new FixedBatchSpout(new Fields(“user”, “score”), 3,
new Values(“nickt1”, 4),
new Values(“nickt2”, 7),
new Values(“nickt3”, 8),
new Values(“nickt4”, 9),
new Values(“nickt5”, 7),
new Values(“nickt6”, 11),
new Values(“nickt7”, 5)
);
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream(“spout1”,spout)
.each(new Fields(“user”, “score”), new BaseFunction() {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println(“tuple:”+tuple);
}
},new Fields());
topology.build();
}
这里使用的 spout 为 FixedBatchSpout,它是 IBatchSpout 类型
拓扑图
MasterBatchCoordinator
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public class MasterBatchCoordinator extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
public static final long INIT_TXID = 1L;
public static final String BATCH_STREAM_ID = “$batch”;
public static final String COMMIT_STREAM_ID = “$commit”;
public static final String SUCCESS_STREAM_ID = “$success”;
private static final String CURRENT_TX = “currtx”;
private static final String CURRENT_ATTEMPTS = “currattempts”;
private List<TransactionalState> _states = new ArrayList();
TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
TreeMap<Long, Integer> _attemptIds;
private SpoutOutputCollector _collector;
Long _currTransaction;
int _maxTransactionActive;
List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList();
List<String> _managedSpoutIds;
List<ITridentSpout> _spouts;
WindowedTimeThrottler _throttler;
boolean _active = true;
public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
if(spoutIds.isEmpty()) {
throw new IllegalArgumentException(“Must manage at least one spout”);
}
_managedSpoutIds = spoutIds;
_spouts = spouts;
LOG.debug(“Created {}”, this);
}
public List<String> getManagedSpoutIds(){
return _managedSpoutIds;
}
@Override
public void activate() {
_active = true;
}
@Override
public void deactivate() {
_active = false;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for(String spoutId: _managedSpoutIds) {
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
_currTransaction = getStoredCurrTransaction();
_collector = collector;
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if(active==null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
for(int i=0; i<_spouts.size(); i++) {
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
LOG.debug(“Opened {}”, this);
}
@Override
public void close() {
for(TransactionalState state: _states) {
state.close();
}
LOG.debug(“Closed {}”, this);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// in partitioned example, in case an emitter task receives a later transaction than it’s emitted so far,
// when it sees the earlier txid it should know to emit nothing
declarer.declareStream(BATCH_STREAM_ID, new Fields(“tx”));
declarer.declareStream(COMMIT_STREAM_ID, new Fields(“tx”));
declarer.declareStream(SUCCESS_STREAM_ID, new Fields(“tx”));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
ret.registerSerialization(TransactionAttempt.class);
return ret;
}
//……
}
prepare 方法首先从 Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS(topology.trident.batch.emit.interval.millis,在 defaults.yaml 默认为 500)读取触发 batch 的频率配置,然后创建 WindowedTimeThrottler,其 maxAmt 值为 1
这里使用 TransactionalState 在 zookeeper 上维护 transactional 状态
之后读取 Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending,在 defaults.yaml 中默认为 null)设置_maxTransactionActive,如果为 null,则设置为 1
MasterBatchCoordinator.nextTuple
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
@Override
public void nextTuple() {
sync();
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won’t be a commit for tx 2 (because tx 1 isn’t committed yet),
// and there won’t be a batch for tx 4 because there’s max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug(“Emitted on [stream = {}], [tx_status = {}], [{}]”, COMMIT_STREAM_ID, maybeCommit, this);
}
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, BATCH_STREAM_ID, attempt, newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
nextTuple 就是调用 sync 方法,该方法在 ack 及 fail 中均有调用;sync 方法首先根据事务状态,如果需要提交,则会往 MasterBatchCoordinator.COMMIT_STREAM_ID($commit)发送 tuple;之后根据_maxTransactionActive 以及 WindowedTimeThrottler 限制,符合要求才启动新的 TransactionAttempt,往 MasterBatchCoordinator.BATCH_STREAM_ID($batch)发送 tuple,同时对 WindowedTimeThrottler 标记下 windowEvent 数量
MasterBatchCoordinator.ack
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
@Override
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
LOG.debug(“Ack. [tx_attempt = {}], [tx_status = {}], [{}]”, tx, status, this);
if(status!=null && tx.equals(status.attempt)) {
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
LOG.debug(“Changed status. [tx_attempt = {}] [tx_status = {}]”, tx, status);
} else if(status.status==AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_attemptIds.remove(tx.getTransactionId());
_collector.emit(SUCCESS_STREAM_ID, new Values(tx));
_currTransaction = nextTransactionId(tx.getTransactionId());
for(TransactionalState state: _states) {
state.setData(CURRENT_TX, _currTransaction);
}
LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, SUCCESS_STREAM_ID, tx, status, this);
}
sync();
}
}
ack 主要是根据当前事务状态进行不同操作,如果之前是 AttemptStatus.PROCESSING 状态,则更新为 AttemptStatus.PROCESSED;如果之前是 AttemptStatus.COMMITTING,则移除当前事务,然后往 MasterBatchCoordinator.SUCCESS_STREAM_ID($success)发送 tuple,更新_currTransaction 为 nextTransactionId;最后再调用 sync 触发新的 TransactionAttempt
MasterBatchCoordinator.fail
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
@Override
public void fail(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
LOG.debug(“Fail. [tx_attempt = {}], [tx_status = {}], [{}]”, tx, stored, this);
if(stored!=null && tx.equals(stored.attempt)) {
_activeTx.tailMap(tx.getTransactionId()).clear();
sync();
}
}
fail 方法将当前事务从_activeTx 中移除,然后清空_activeTx 中 txId 大于这个失败 txId 的数据,最后再调用 sync 判断是否该触发新的 TransactionAttempt(注意这里没有变更_currTransaction,因而 sync 方法触发新的 TransactionAttempt 的_txid 还是当前这个失败的_currTransaction)
TridentSpoutCoordinator
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutCoordinator.java
public class TridentSpoutCoordinator implements IBasicBolt {
public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
private static final String META_DIR = “meta”;
ITridentSpout<Object> _spout;
ITridentSpout.BatchCoordinator<Object> _coord;
RotatingTransactionalState _state;
TransactionalState _underlyingState;
String _id;
public TridentSpoutCoordinator(String id, ITridentSpout<Object> spout) {
_spout = spout;
_id = id;
}
@Override
public void prepare(Map conf, TopologyContext context) {
_coord = _spout.getCoordinator(_id, conf, context);
_underlyingState = TransactionalState.newCoordinatorState(conf, _id);
_state = new RotatingTransactionalState(_underlyingState, META_DIR);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
} else {
long txid = attempt.getTransactionId();
Object prevMeta = _state.getPreviousState(txid);
Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
_state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
}
}
@Override
public void cleanup() {
_coord.close();
_underlyingState.close();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(MasterBatchCoordinator.BATCH_STREAM_ID, new Fields(“tx”, “metadata”));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
}
TridentSpoutCoordinator 的 nextTuple 根据 streamId 分别做不同的处理
如果是 MasterBatchCoordinator.SUCCESS_STREAM_ID($success)则表示 master 那边接收到了 ack 已经成功了,然后 coordinator 就清除该 txId 之前的数据,然后回调 ITridentSpout.BatchCoordinator 的 success 方法
如果是 MasterBatchCoordinator.BATCH_STREAM_ID($batch)则要启动新的 TransactionAttempt,则往 MasterBatchCoordinator.BATCH_STREAM_ID($batch)发送 tuple,该 tuple 会被下游的 bolt 接收(在本实例就是使用 TridentSpoutExecutor 包装了用户 spout 的 TridentBoltExecutor)
TridentBoltExecutor
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
public class TridentBoltExecutor implements IRichBolt {
public static final String COORD_STREAM_PREFIX = “$coord-“;
public static String COORD_STREAM(String batch) {
return COORD_STREAM_PREFIX + batch;
}
RotatingMap<Object, TrackedBatch> _batches;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L;
_lastRotate = System.currentTimeMillis();
_batches = new RotatingMap<>(2);
_context = context;
_collector = collector;
_coordCollector = new CoordinatedOutputCollector(collector);
_coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));
_coordConditions = (Map) context.getExecutorData(“__coordConditions”);
if(_coordConditions==null) {
_coordConditions = new HashMap<>();
for(String batchGroup: _coordSpecs.keySet()) {
CoordSpec spec = _coordSpecs.get(batchGroup);
CoordCondition cond = new CoordCondition();
cond.commitStream = spec.commitStream;
cond.expectedTaskReports = 0;
for(String comp: spec.coords.keySet()) {
CoordType ct = spec.coords.get(comp);
if(ct.equals(CoordType.single())) {
cond.expectedTaskReports+=1;
} else {
cond.expectedTaskReports+=context.getComponentTasks(comp).size();
}
}
cond.targetTasks = new HashSet<>();
for(String component: Utils.get(context.getThisTargets(),
COORD_STREAM(batchGroup),
new HashMap<String, Grouping>()).keySet()) {
cond.targetTasks.addAll(context.getComponentTasks(component));
}
_coordConditions.put(batchGroup, cond);
}
context.setExecutorData(“_coordConditions”, _coordConditions);
}
_bolt.prepare(conf, context, _coordOutputCollector);
}
//……
@Override
public void cleanup() {
_bolt.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_bolt.declareOutputFields(declarer);
for(String batchGroup: _coordSpecs.keySet()) {
declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields(“id”, “count”));
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = _bolt.getComponentConfiguration();
if(ret==null) ret = new HashMap<>();
ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
// TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization
return ret;
}
}
prepare 的时候,先创建了 CoordinatedOutputCollector,之后用 OutputCollector 包装,再最后包装为 BatchOutputCollectorImpl,调用 ITridentBatchBolt.prepare 方法,ITridentBatchBolt 这里头使用的实现类为 TridentSpoutExecutor
prepare 初始化了 RotatingMap<Object, TrackedBatch> _batches = new RotatingMap<>(2);
prepare 主要做的是构建 CoordCondition,这里主要是计算 expectedTaskReports 以及 targetTasks
TridentBoltExecutor.execute
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@Override
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now – _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn’t need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println(“Received in ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex()
// + ” (” + _batches.size() + “)” +
// “\ntuple: ” + tuple +
// “\nwith tracked ” + tracked +
// “\nwith id ” + id +
// “\nwith group ” + batchGroup
// + “\n”);
//
// }
//System.out.println(“Num tracked: ” + _batches.size() + ” ” + _context.getThisComponentId() + ” ” + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don’t get an explosion in memory usage in the tasks
if(tracked!=null) {
if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we’ve already seen
return;
}
}
if(tracked==null) {
tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println(“TRACKED: ” + tracked + ” ” + tuple);
TupleType t = getTupleType(tuple, tracked);
if(t==TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if(t==TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount+=count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
} catch(FailedException e) {
failBatch(tracked, e);
}
if(success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
private TupleType getTupleType(Tuple tuple, TrackedBatch batch) {
CoordCondition cond = batch.condition;
if(cond.commitStream!=null
&& tuple.getSourceGlobalStreamId().equals(cond.commitStream)) {
return TupleType.COMMIT;
} else if(cond.expectedTaskReports > 0
&& tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) {
return TupleType.COORD;
} else {
return TupleType.REGULAR;
}
}
private void failBatch(TrackedBatch tracked, FailedException e) {
if(e!=null && e instanceof ReportedFailedException) {
_collector.reportError(e);
}
tracked.failed = true;
if(tracked.delayedAck!=null) {
_collector.fail(tracked.delayedAck);
tracked.delayedAck = null;
}
}
TridentBoltExecutor 的 execute 方法首先判断是否是 tickTuple,如果是判断距离_lastRotate 的时间 (prepare 的时候初始化为当时的时间) 是否超过_messageTimeoutMs,如果是则进行_batches.rotate()操作;tickTuple 的发射频率为 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS(topology.tick.tuple.freq.secs),在 TridentBoltExecutor 中它被设置为 5 秒;_messageTimeoutMs 为 context.maxTopologyMessageTimeout() * 1000L,它从整个 topology 的 component 的 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,defaults.yaml 中默认为 30)最大值 *1000
_batches 按 TransactionAttempt 的 txId 来存储 TrackedBatch 信息,如果没有则创建一个新的 TrackedBatch;创建 TrackedBatch 时,会回调_bolt 的 initBatchState 方法
之后判断 tuple 的类型,这里分为 TupleType.COMMIT、TupleType.COORD、TupleType.REGULAR;如果是 TupleType.COMMIT 类型,则设置 tracked.receivedCommit 为 true,然后调用 checkFinish 方法;如果是 TupleType.COORD 类型,则更新 reportedTasks 及 expectedTupleCount 计数,再调用 checkFinish 方法;如果是 TupleType.REGULAR 类型(coordinator 发送过来的 batch 信息),则更新 receivedTuples 计数,然后调用_bolt.execute 方法(这里的_bolt 为 TridentSpoutExecutor),对于 tracked.condition.expectedTaskReports== 0 的则立马调用 finishBatch,将该 batch 从_batches 中移除;如果有 FailedException 则直接 failBatch 上报 error 信息,之后对 tuple 进行 ack 或者 fail;如果下游是 each 操作,一个 batch 中如果是部分抛出 FailedException 异常,则需要等到所有 batch 中的 tuple 执行完,等到 TupleType.COORD 触发检测 checkFinish,这个时候才能 fail 通知到 master,也就是有一些滞后性,比如这个 batch 中有 3 个 tuple,第二个 tuple 抛出 FailedException,还会继续执行第三个 tuple,最后该 batch 的 tuple 都处理完了,才收到 TupleType.COORD 触发检测 checkFinish。
TridentBoltExecutor.checkFinish
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
if(tracked.failed) {
failBatch(tracked);
_collector.fail(tuple);
return;
}
CoordCondition cond = tracked.condition;
boolean delayed = tracked.delayedAck==null &&
(cond.commitStream!=null && type==TupleType.COMMIT
|| cond.commitStream==null);
if(delayed) {
tracked.delayedAck = tuple;
}
boolean failed = false;
if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
if(tracked.receivedTuples == tracked.expectedTupleCount) {
finishBatch(tracked, tuple);
} else {
//TODO: add logging that not all tuples were received
failBatch(tracked);
_collector.fail(tuple);
failed = true;
}
}
if(!delayed && !failed) {
_collector.ack(tuple);
}
}
private void failBatch(TrackedBatch tracked) {
failBatch(tracked, null);
}
private void failBatch(TrackedBatch tracked, FailedException e) {
if(e!=null && e instanceof ReportedFailedException) {
_collector.reportError(e);
}
tracked.failed = true;
if(tracked.delayedAck!=null) {
_collector.fail(tracked.delayedAck);
tracked.delayedAck = null;
}
}
TridentBoltExecutor 在 execute 的时候,在 tuple 是 TupleType.COMMIT 以及 TupleType.COORD 的时候都会调用 checkFinish
一旦_bolt.execute(tracked.info, tuple)方法抛出 FailedException,则会调用 failBatch,它会标记 tracked.failed 为 true
checkFinish 在发现 tracked.failed 为 true 的时候,会调用_collector.fail(tuple),然后回调 MasterBatchCoordinator 的 fail 方法
TridentSpoutExecutor
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java
public class TridentSpoutExecutor implements ITridentBatchBolt {
public static final String ID_FIELD = “$tx”;
public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
AddIdCollector _collector;
ITridentSpout<Object> _spout;
ITridentSpout.Emitter<Object> _emitter;
String _streamName;
String _txStateId;
TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap<>();
public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout<Object> spout) {
_txStateId = txStateId;
_spout = spout;
_streamName = streamName;
}
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
_emitter = _spout.getEmitter(_txStateId, conf, context);
_collector = new AddIdCollector(_streamName, collector);
}
@Override
public void execute(BatchInfo info, Tuple input) {
// there won’t be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException(“Received commit for different transaction attempt”);
}
} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what’s been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else {
_collector.setBatch(info.batchId);
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
}
}
@Override
public void cleanup() {
_emitter.close();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
List<String> fields = new ArrayList<>(_spout.getOutputFields().toList());
fields.add(0, ID_FIELD);
declarer.declareStream(_streamName, new Fields(fields));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return _spout.getComponentConfiguration();
}
@Override
public void finishBatch(BatchInfo batchInfo) {
}
@Override
public Object initBatchState(String batchGroup, Object batchId) {
return null;
}
}
TridentSpoutExecutor 使用的 BatchOutputCollector 为 TridentBoltExecutor 在 prepare 方法构造的,经过几层包装,先是 CoordinatedOutputCollector,然后是 OutputCollector,最后是 BatchOutputCollectorImpl;这里最主要的是 CoordinatedOutputCollector 包装,它维护每个 taskId 发出的 tuple 的数量;而在这个 executor 的 prepare 方法里头,该 collector 又被包装为 AddIdCollector,主要是添加了 batchId 信息(即 TransactionAttempt 信息)
TridentSpoutExecutor 的 ITridentSpout 就是包装了用户设置的原始 spout(IBatchSpout 类型)的 BatchSpoutExecutor(假设原始 spout 是 IBatchSpout 类型的,因而会通过 BatchSpoutExecutor 包装为 ITridentSpout 类型),其 execute 方法根据不同 stream 类型进行不同处理,如果是 master 发过来的 MasterBatchCoordinator.COMMIT_STREAM_ID($commit)则调用 emitter 的 commit 方法提交当前 TransactionAttempt(本文的实例没有 commit 信息),然后将该 tx 从_activeBatches 中移除;如果是 master 发过来的 MasterBatchCoordinator.SUCCESS_STREAM_ID($success)则先把_activeBatches 中 txId 小于该 txId 的 TransactionAttempt 移除,然后调用 emitter 的 success 方法,标记 TransactionAttempt 成功,该方法回调原始 spout(IBatchSpout 类型)的 ack 方法
非 MasterBatchCoordinator.COMMIT_STREAM_ID($commit)及 MasterBatchCoordinator.SUCCESS_STREAM_ID($success)类型的 tuple,则是启动 batch 的消息,这里设置 batchId,然后调用 emitter 的 emitBatch 进行数据发送(这里传递的 batchId 就是 TransactionAttempt 的 txId),同时将该 TransactionAttempt 放入_activeBatches 中(这里的 batch 相当于 TransactionAttempt)
FixedBatchSpout
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/testing/FixedBatchSpout.java
public class FixedBatchSpout implements IBatchSpout {
Fields fields;
List<Object>[] outputs;
int maxBatchSize;
HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>… outputs) {
this.fields = fields;
this.outputs = outputs;
this.maxBatchSize = maxBatchSize;
}
int index = 0;
boolean cycle = false;
public void setCycle(boolean cycle) {
this.cycle = cycle;
}
@Override
public void open(Map conf, TopologyContext context) {
index = 0;
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
List<List<Object>> batch = this.batches.get(batchId);
if(batch == null){
batch = new ArrayList<List<Object>>();
if(index>=outputs.length && cycle) {
index = 0;
}
for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
batch.add(outputs[index]);
}
this.batches.put(batchId, batch);
}
for(List<Object> list : batch){
collector.emit(list);
}
}
@Override
public void ack(long batchId) {
this.batches.remove(batchId);
}
@Override
public void close() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return fields;
}
}
用户使用的 spout 是 IBatchSpout 类型,这里缓存了每个 batchId 对应的 tuple 数据,实现的是 transactional spout 的语义
TridentTopology.newStream
storm-1.2.2/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
}
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
}
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
public Stream newStream(String txId, ITridentDataSource dataSource) {
if (dataSource instanceof IBatchSpout) {
return newStream(txId, (IBatchSpout) dataSource);
} else if (dataSource instanceof ITridentSpout) {
return newStream(txId, (ITridentSpout) dataSource);
} else if (dataSource instanceof IPartitionedTridentSpout) {
return newStream(txId, (IPartitionedTridentSpout) dataSource);
} else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
} else {
throw new UnsupportedOperationException(“Unsupported stream”);
}
}
用户在 TridentTopology.newStream 可以直接使用 IBatchSpout 类似的 spout,使用它的好处就是 TridentTopology 在 build 的时候会使用 BatchSpoutExecutor 将其包装为 ITridentSpout 类型(省得用户再去实现 ITridentSpout 的相关接口,屏蔽 trident spout 的相关逻辑,使得之前一直使用普通 topology 的用户可以快速上手 trident topology)
BatchSpoutExecutor 实现了 ITridentSpout 接口,将 IBatchSpout 适配为 ITridentSpout,使用的 coordinator 是 EmptyCoordinator,使用的 emitter 是 BatchSpoutEmitter
如果用户在 TridentTopology.newStream 使用的 spout 是 IPartitionedTridentSpout 类型,则 TridentTopology 在 newStream 方法内部会使用 PartitionedTridentSpoutExecutor 将其包装为 ITridentSpout 类型,对于 IOpaquePartitionedTridentSpout 则使用 OpaquePartitionedTridentSpoutExecutor 将其包装为 ITridentSpout 类型
小结
TridentTopology 在 newStream 或者 build 方法里头会将 ITridentDataSource 中不是 ITridentSpout 类型的 IBatchSpout(在 build 方法)、IPartitionedTridentSpout(在 newStream 方法)、IOpaquePartitionedTridentSpout(在 newStream 方法)适配为 ITridentSpout 类型;分别使用 BatchSpoutExecutor、PartitionedTridentSpoutExecutor、OpaquePartitionedTridentSpoutExecutor 进行适配(TridentTopologyBuilder 在 buildTopology 的时候,对于 ITridentSpout 类型的 spout 先用 TridentSpoutExecutor 包装,再用 TridentBoltExecutor 包装,最后转换为 bolt,而整个 TridentTopology 真正的 spout 就是 MasterBatchCoordinator;这里可以看到一个 IBatchSpout 的 spout 先经过 BatchSpoutExecutor 包装为 ITridentSpout 类型,之后再经过 TridentSpoutExecutor 及 TridentBoltExecutor 包装为 bolt)
IBatchSpout 的 ack 是针对 batch 维度的,也就是 TransactionAttempt 维度,注意这里没有 fail 方法,如果 emitBatch 方法抛出了 FailedException 异常,则 TridentBoltExecutor 会调用 failBatch 方法(一个 batch 的 tuples 会等所有 tuple 执行完再触发 checkFinish),进行 reportError 以及标记 TrackedBatch 的 failed 为 true,之后 TridentBoltExecutor 在 checkFinish 的时候,一旦发现 tracked.failed 为 true 的时候,会调用_collector.fail(tuple),然后回调 MasterBatchCoordinator 的 fail 方法
MasterBatchCoordinator 的 fail 方法会将当前 TransactionAttempt 从_activeTx 移除,然后一并移除 txId 大于失败的 txId 的数据,最后调用 sync 方法继续 TransactionAttempt(注意这里没有更改_currTransaction 值,因而会继续从失败的 txId 开始重试,只有在 ack 方法里头会更改_currTransaction 为 nextTransactionId)
TridentBoltExecutor 的 execute 方法会根据 tickTuple 来检测距离上次 rotate 是否超过_messageTimeoutMs(取 component 中 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 最大值 *1000,这里 *1000 是将秒转换为毫秒),超过的话进行 rotate 操作,_batches 的最后一个 bucket 将会被移除掉;这里的 tickTuple 的频率为 5 秒,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 按 30 秒算的话,_messageTimeoutMs 为 30*1000,相当于每 5 秒检测一下距离上次 rotate 时间是否超过 30 秒,如果超过则进行 rotate,丢弃最后一个 bucket 的数据(TrackedBatch),这里相当于重置超时的 TrackedBatch 信息
关于 MasterBatchCoordinator 的 fail 的情况,有几种情况,一种是下游 componnent 主动抛出 FailException,这个时候会触发 master 的 fail,再次重试 TransactionAttempt;一种是下游 component 处理 tuple 时间超过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,defaults.yaml 中默认为 30),这个时候 ack 会触发 master 的 fail,导致该 TransactionAttempt 失败继续重试,目前没有对 attempt 的次数做限制,实际生产过程中要注意,因为只要该 batchId 的一个 tuple 失败,整个 batchId 的 tuples 都会重发,这个时候下游如果没有做好处理,可能会出现一个 batchId 中前面部分 tuple 成功,后面部分失败,导致成功的 tuple 不断重复处理(要避免失败的 batch 中 tuples 部分处理成功部分处理失败这个问题就需要配合使用 Trident 的 State)。
doc
Trident Spouts
Trident State
聊聊 storm TridentTopology 的构建