聊聊storm的maxSpoutPending

53次阅读

共计 6851 个字符,预计需要花费 18 分钟才能阅读完成。


本文主要研究一下 storm 的 maxSpoutPending
TOPOLOGY_MAX_SPOUT_PENDING
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/**
* The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to
* spouts or topologies as a whole.
*
* A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has
* no effect for unreliable spouts that don’t tag their tuples with a message id.
*/
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_MAX_SPOUT_PENDING = “topology.max.spout.pending”;

TOPOLOGY_MAX_SPOUT_PENDING 设置的是一个 spout task 已经 emit 等待 ack 的 tuple 的最大数量,该配置仅仅对于发射可靠 tuple(设置 msgId)的 spout 起作用
defaults.yaml 文件中 topology.max.spout.pending 的默认配置为 null

SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
this.threadId = Thread.currentThread().getId();
executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}

LOG.info(“Opening spout {}:{}”, componentId, taskIds);
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
//……
}

public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;

@Override
public Long call() throws Exception {
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
}
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();

if (!isActive) {
inactiveExecute();
return 0L;
}

if (!lastActive.get()) {
lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;

if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) {// in critical path. don’t use iterators.
spouts.get(j).nextTuple();
}
noEmits = (currCount == emittedCount.get());
if (noEmits) {
emptyEmitStreak.increment();
} else {
emptyStretch = emptyEmitStreak.get();
emptyEmitStreak.set(0);
}
}
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug(“Reached max spout pending”);
}
rmspCount++;
} else {
if (rmspCount > 0) {
LOG.debug(“Ended max spout pending stretch of {} iterations”, rmspCount);
}
rmspCount = 0;
}

if (receiveCount > 1) {
// continue without idling
return 0L;
}
if (!pendingEmits.isEmpty()) {// then facing backpressure
backPressureWaitStrategy();
return 0L;
}
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
}
swIdleCount = 0;
return 0L;
}

private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) {// check avoids multiple log msgs when in a idle loop
LOG.debug(“Experiencing Back Pressure from downstream components. Entering BackPressure Wait.”);
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() – start);
}

private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
emptyEmitStreak.increment();
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() – start);
} else {
if (emptyStretch > 0) {
LOG.debug(“Ending Spout Wait Stretch of {}”, emptyStretch);
}
}
}

// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else {// to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}
};
}

这里从 topoConf 读取 Config.TOPOLOGY_MAX_SPOUT_PENDING,如果读取不到则取 0,之后乘以 task 的数量,即为 maxSpoutPending
maxSpoutPending 在 call 方法里头控制的是 reachedMaxSpoutPending 变量,只有!reachedMaxSpoutPending && pendingEmitsIsEmpty 才能够执行 nextTuple 发射数据

MasterBatchCoordinator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for (String spoutId : _managedSpoutIds) {
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
_currTransaction = getStoredCurrTransaction();

_collector = collector;
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if (active == null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

for (int i = 0; i < _spouts.size(); i++) {
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
LOG.debug(“Opened {}”, this);
}

private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won’t be a commit for tx 2 (because tx 1 isn’t committed yet),
// and there won’t be a batch for tx 4 because there’s max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug(“Emitted on [stream = {}], [tx_status = {}], [{}]”, COMMIT_STREAM_ID, maybeCommit, this);
}

if (_active) {
if (_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for (int i = 0; i < _maxTransactionActive; i++) {
if (!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if (attemptId == null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for (TransactionalState state : _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}

TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, BATCH_STREAM_ID, attempt,
newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}

MasterBatchCoordinator 的 open 方法从 conf 读取 Config.TOPOLOGY_MAX_SPOUT_PENDING 设置到_maxTransactionActive,如果为 null 则默认为 1
这里只有_activeTx.size() < _maxTransactionActive 才会往 BATCH_STREAM_ID 发射数据

小结

Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),默认为 null,只对于开启可靠 (msgId) 消息的 spout 起作用
对于普通的 spout,指的是等待 ack 的数量的最大值,超过这个值,SpoutExecutor 不会调用 spout 的 nextTuple 发射数据
对于 trident 的 spout 来说,指的是同时处理的 batches 的数量,只有这些 batches 处理成功或失败之后才能继续下一个 batch

doc

Trident Spouts
聊聊 storm 的 IWaitStrategy

正文完
 0