序本文主要研究一下storm的maxSpoutPendingTOPOLOGY_MAX_SPOUT_PENDINGstorm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java /** * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to * spouts or topologies as a whole. * * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has * no effect for unreliable spouts that don’t tag their tuples with a message id. */ @isInteger @isPositiveNumber public static final String TOPOLOGY_MAX_SPOUT_PENDING = “topology.max.spout.pending”;TOPOLOGY_MAX_SPOUT_PENDING设置的是一个spout task已经emit等待ack的tuple的最大数量,该配置仅仅对于发射可靠tuple(设置msgId)的spout起作用defaults.yaml文件中topology.max.spout.pending的默认配置为nullSpoutExecutorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java public void init(final ArrayList<Task> idToTask, int idToTaskBase) { this.threadId = Thread.currentThread().getId(); executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } LOG.info(“Opening spout {}:{}”, componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); //…… } public Callable<Long> call() throws Exception { init(idToTask, idToTaskBase); return new Callable<Long>() { final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount(); int recvqCheckSkips = 0; int swIdleCount = 0; // counter for spout wait strategy int bpIdleCount = 0; // counter for back pressure wait strategy int rmspCount = 0; @Override public Long call() throws Exception { int receiveCount = 0; if (recvqCheckSkips++ == recvqCheckSkipCountMax) { receiveCount = receiveQueue.consume(SpoutExecutor.this); recvqCheckSkips = 0; } long currCount = emittedCount.get(); boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending); boolean isActive = stormActive.get(); if (!isActive) { inactiveExecute(); return 0L; } if (!lastActive.get()) { lastActive.set(true); activateSpouts(); } boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); boolean noEmits = true; long emptyStretch = 0; if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) { for (int j = 0; j < spouts.size(); j++) { // in critical path. don’t use iterators. spouts.get(j).nextTuple(); } noEmits = (currCount == emittedCount.get()); if (noEmits) { emptyEmitStreak.increment(); } else { emptyStretch = emptyEmitStreak.get(); emptyEmitStreak.set(0); } } if (reachedMaxSpoutPending) { if (rmspCount == 0) { LOG.debug(“Reached max spout pending”); } rmspCount++; } else { if (rmspCount > 0) { LOG.debug(“Ended max spout pending stretch of {} iterations”, rmspCount); } rmspCount = 0; } if (receiveCount > 1) { // continue without idling return 0L; } if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } bpIdleCount = 0; if (noEmits) { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } swIdleCount = 0; return 0L; } private void backPressureWaitStrategy() throws InterruptedException { long start = Time.currentTimeMillis(); if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug(“Experiencing Back Pressure from downstream components. Entering BackPressure Wait.”); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { emptyEmitStreak.increment(); long start = Time.currentTimeMillis(); swIdleCount = spoutWaitStrategy.idle(swIdleCount); if (reachedMaxSpoutPending) { spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); } else { if (emptyStretch > 0) { LOG.debug(“Ending Spout Wait Stretch of {}”, emptyStretch); } } } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; } }; }这里从topoConf读取Config.TOPOLOGY_MAX_SPOUT_PENDING,如果读取不到则取0,之后乘以task的数量,即为maxSpoutPendingmaxSpoutPending在call方法里头控制的是reachedMaxSpoutPending变量,只有!reachedMaxSpoutPending && pendingEmitsIsEmpty才能够执行nextTuple发射数据MasterBatchCoordinatorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); for (String spoutId : _managedSpoutIds) { _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); } _currTransaction = getStoredCurrTransaction(); _collector = collector; Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); if (active == null) { _maxTransactionActive = 1; } else { _maxTransactionActive = active.intValue(); } _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); for (int i = 0; i < _spouts.size(); i++) { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } LOG.debug(“Opened {}”, this); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won’t be a commit for tx 2 (because tx 1 isn’t committed yet), // and there won’t be a batch for tx 4 because there’s max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug(“Emitted on [stream = {}], [tx_status = {}], [{}]”, COMMIT_STREAM_ID, maybeCommit, this); } if (_active) { if (_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for (int i = 0; i < _maxTransactionActive; i++) { if (!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if (attemptId == null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for (TransactionalState state : _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug(“Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]”, BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } }MasterBatchCoordinator的open方法从conf读取Config.TOPOLOGY_MAX_SPOUT_PENDING设置到_maxTransactionActive,如果为null则默认为1这里只有_activeTx.size() < _maxTransactionActive才会往BATCH_STREAM_ID发射数据小结Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),默认为null,只对于开启可靠(msgId)消息的spout起作用对于普通的spout,指的是等待ack的数量的最大值,超过这个值,SpoutExecutor不会调用spout的nextTuple发射数据对于trident的spout来说,指的是同时处理的batches的数量,只有这些batches处理成功或失败之后才能继续下一个batchdocTrident Spouts聊聊storm的IWaitStrategy