聊聊storm的IWaitStrategy

45次阅读

共计 15594 个字符,预计需要花费 39 分钟才能阅读完成。


本文主要研究一下 storm 的 IWaitStrategy
IWaitStrategy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java
public interface IWaitStrategy {
static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> topologyConf) {
IWaitStrategy producerWaitStrategy =
ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
return producerWaitStrategy;
}

void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);

/**
* Implementations of this method should be thread-safe (preferably no side-effects and lock-free)
* <p>
* Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to estimate how long caller has been idling.
* <p>
* <pre>
* <code>
* int idleCounter = 0;
* int consumeCount = consumeFromQ();
* while (consumeCount==0) {
* idleCounter = strategy.idle(idleCounter);
* consumeCount = consumeFromQ();
* }
* </code>
* </pre>
*
* @param idleCounter managed by the idle method until reset
* @return new counter value to be used on subsequent idle cycle
*/
int idle(int idleCounter) throws InterruptedException;

enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}

}

这个接口提供了一个工厂方法,默认是读取 topology.backpressure.wait.strategy 参数值,创建 producerWaitStrategy,并使用 WAIT_SITUATION.BACK_PRESSURE_WAIT 初始化
WAIT_SITUATION 一共有三类,分别是 SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT
该接口定义了 int idle(int idleCounter)方法,用于 static 或 dynamic backoff

SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public class SpoutExecutor extends Executor {

private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);

private final IWaitStrategy spoutWaitStrategy;
private final IWaitStrategy backPressureWaitStrategy;
private final AtomicBoolean lastActive;
private final MutableLong emittedCount;
private final MutableLong emptyEmitStreak;
private final SpoutThrottlingMetrics spoutThrottlingMetrics;
private final boolean hasAckers;
private final SpoutExecutorStats stats;
private final BuiltinMetrics builtInMetrics;
SpoutOutputCollectorImpl spoutOutputCollector;
private Integer maxSpoutPending;
private List<ISpout> spouts;
private List<SpoutOutputCollector> outputCollectors;
private RotatingMap<Long, TupleInfo> pending;
private long threadId = 0;

public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
//……
}

//……
}

这里创建了两个 watiStrategy,一个是 spoutWaitStrategy,一个是 backPressureWaitStrategy
spoutWaitStrategy 读取的是 topology.spout.wait.strategy 参数,在 defaults.yaml 里头值为 org.apache.storm.policy.WaitStrategyProgressive
backPressureWaitStrategy 读取的是 topology.backpressure.wait.strategy 参数,在 defaults.yaml 里头值为 org.apache.storm.policy.WaitStrategyProgressive

BoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
public class BoltExecutor extends Executor {

private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);

private final BooleanSupplier executeSampler;
private final boolean isSystemBoltExecutor;
private final IWaitStrategy consumeWaitStrategy; // employed when no incoming data
private final IWaitStrategy backPressureWaitStrategy; // employed when outbound path is congested
private final BoltExecutorStats stats;
private final BuiltinMetrics builtInMetrics;
private BoltOutputCollectorImpl outputCollector;

public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.BOLT);
this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
if (isSystemBoltExecutor) {
this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
} else {
this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);
}
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),
ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
this.builtInMetrics = new BuiltinBoltMetrics(stats);
}

private static IWaitStrategy makeSystemBoltWaitStrategy() {
WaitStrategyPark ws = new WaitStrategyPark();
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
return ws;
}
//……
}

这里创建了两个 IWaitStrategy,一个是 consumeWaitStrategy,一个是 backPressureWaitStrategy
consumeWaitStrategy 在非 SystemBoltExecutor 的情况下读取的是 topology.bolt.wait.strategy 参数,在 defaults.yaml 里头值为 org.apache.storm.policy.WaitStrategyProgressive;如果是 SystemBoltExecutor 则使用的是 WaitStrategyPark 策略
backPressureWaitStrategy 读取的是读取的是 topology.backpressure.wait.strategy 参数,在 defaults.yaml 里头值为 org.apache.storm.policy.WaitStrategyProgressive

WaitStrategyPark
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java
public class WaitStrategyPark implements IWaitStrategy {
private long parkTimeNanoSec;

public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter
}

// Convenience alternative to prepare() for use in Tests
public WaitStrategyPark(long microsec) {
parkTimeNanoSec = microsec * 1_000;
}

@Override
public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC));
} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));
} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));
} else {
throw new IllegalArgumentException(“Unknown wait situation : ” + waitSituation);
}
}

@Override
public int idle(int idleCounter) throws InterruptedException {
if (parkTimeNanoSec == 0) {
return 1;
}
LockSupport.parkNanos(parkTimeNanoSec);
return idleCounter + 1;
}
}
该策略使用的是 LockSupport.parkNanos(parkTimeNanoSec)方法
WaitStrategyProgressive
storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java
/**
* A Progressive Wait Strategy
* <p> Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level.
* Level 1 – No idling. Returns immediately. Stays in this level for `level1Count` iterations. Level 2 – Calls LockSupport.parkNanos(1).
* Stays in this level for `level2Count` iterations Level 3 – Calls Thread.sleep(). Stays in this level until wait situation changes.
*
* <p>
* The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when the upstream component is a bit
* relatively slower. Allows downstream bolt can enter deeper wait states only if the traffic to it appears to have reduced.
* <p>
*/
public class WaitStrategyProgressive implements IWaitStrategy {
private int level1Count;
private int level2Count;
private long level3SleepMs;

@Override
public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else {
throw new IllegalArgumentException(“Unknown wait situation : ” + waitSituation);
}
}

@Override
public int idle(int idleCounter) throws InterruptedException {
if (idleCounter < level1Count) {// level 1 – no waiting
++idleCounter;
} else if (idleCounter < level1Count + level2Count) {// level 2 – parkNanos(1L)
++idleCounter;
LockSupport.parkNanos(1L);
} else {// level 3 – longer idling with Thread.sleep()
Thread.sleep(level3SleepMs);
}
return idleCounter;
}
}

WaitStrategyProgressive 是一个渐进式的 wait strategy,它分为 3 个 level 的 idling
level 1 是 no idling,立刻返回;在 level 1 经历了 level1Count 的次数之后进入 level 2
level 2 使用的是 LockSupport.parkNanos(1),在 level 2 经历了 level2Count 次数之后进入 level 3
level 3 使用的是 Thread.sleep(level3SleepMs),在 wait situation 改变的时候跳出
不同的 WAIT_SITUATION 读取不同的 LEVEL1_COUNT、LEVEL2_COUNT、LEVEL3_SLEEP_MILLIS 参数,对于 spout,它们的默认值分别为 0、0、1;对于 bolt 它们的默认值分别为 1、1000、1;对于 back pressure,它们的默认值分别为 1、1000、1

SpoutExecutor.call
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@Override
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;

@Override
public Long call() throws Exception {
int receiveCount = 0;
if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
receiveCount = receiveQueue.consume(SpoutExecutor.this);
recvqCheckSkips = 0;
}
long currCount = emittedCount.get();
boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
boolean isActive = stormActive.get();

if (!isActive) {
inactiveExecute();
return 0L;
}

if (!lastActive.get()) {
lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;

if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
for (int j = 0; j < spouts.size(); j++) {// in critical path. don’t use iterators.
spouts.get(j).nextTuple();
}
noEmits = (currCount == emittedCount.get());
if (noEmits) {
emptyEmitStreak.increment();
} else {
emptyStretch = emptyEmitStreak.get();
emptyEmitStreak.set(0);
}
}
if (reachedMaxSpoutPending) {
if (rmspCount == 0) {
LOG.debug(“Reached max spout pending”);
}
rmspCount++;
} else {
if (rmspCount > 0) {
LOG.debug(“Ended max spout pending stretch of {} iterations”, rmspCount);
}
rmspCount = 0;
}

if (receiveCount > 1) {
// continue without idling
return 0L;
}
if (!pendingEmits.isEmpty()) {// then facing backpressure
backPressureWaitStrategy();
return 0L;
}
bpIdleCount = 0;
if (noEmits) {
spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
return 0L;
}
swIdleCount = 0;
return 0L;
}

private void backPressureWaitStrategy() throws InterruptedException {
long start = Time.currentTimeMillis();
if (bpIdleCount == 0) {// check avoids multiple log msgs when in a idle loop
LOG.debug(“Experiencing Back Pressure from downstream components. Entering BackPressure Wait.”);
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() – start);
}

private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
emptyEmitStreak.increment();
long start = Time.currentTimeMillis();
swIdleCount = spoutWaitStrategy.idle(swIdleCount);
if (reachedMaxSpoutPending) {
spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() – start);
} else {
if (emptyStretch > 0) {
LOG.debug(“Ending Spout Wait Stretch of {}”, emptyStretch);
}
}
}

// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else {// to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}
};
}

spout 维护了 pendingEmits 队列,即 emit 没有成功或者等待 emit 的队列,同时也维护了 pending 的 RotatingMap,即等待 ack 的 tuple 的 id 及数据
spout 从 topology.max.spout.pending 读取 TOPOLOGY_MAX_SPOUT_PENDING 配置,计算 maxSpoutPending=ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(),默认为 null,即 maxSpoutPending 为 0
spout 在!reachedMaxSpoutPending && pendingEmitsIsEmpty 的条件下才调用 nextTuple 发送数据;在 pendingEmits 不为空的时候触发 backPressureWaitStrategy;在 noEmits((currCount == emittedCount.get()))时触发 spoutWaitStrategy
在每次调用 call 的时候,在调用 nextTuple 之间记录 currCount = emittedCount.get();如果有调用 nextTuple 的话,则会在 SpoutOutputCollectorImpl 的 emit 或 emitDirect 等方法更新 emittedCount;之后用 noEmits=(currCount == emittedCount.get())判断是否有发射数据
spout 维护了 bpIdleCount 以及 swIdleCount,分别用于 backPressureWaitStrategy.idle(bpIdleCount)、spoutWaitStrategy.idle(swIdleCount)

BoltExecutor.call
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@Override
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);

return new Callable<Long>() {
int bpIdleCount = 0;
int consumeIdleCounter = 0;
private final ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();

@Override
public Long call() throws Exception {
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
if (pendingEmitsIsEmpty) {
if (bpIdleCount != 0) {
LOG.debug(“Ending Back Pressure Wait stretch : {}”, bpIdleCount);
}
bpIdleCount = 0;
int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
if (consumeCount == 0) {
if (consumeIdleCounter == 0) {
LOG.debug(“Invoking consume wait strategy”);
}
consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);
if (Thread.interrupted()) {
throw new InterruptedException();
}
} else {
if (consumeIdleCounter != 0) {
LOG.debug(“Ending consume wait stretch : {}”, consumeIdleCounter);
}
consumeIdleCounter = 0;
}
} else {
if (bpIdleCount == 0) {// check avoids multiple log msgs when spinning in a idle loop
LOG.debug(“Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}”, pendingEmits.size());
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
}

return 0L;
}

// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else {// to avoid reordering of emits, stop at first failure
return false;
}
}
return true;
}

};
}

bolt executor 同样也维护了 pendingEmits,在 pendingEmits 不为空的时候,触发 backPressureWaitStrategy.idle(bpIdleCount)
在 pendingEmits 为空时,根据 receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits)返回的 consumeCount,若为 0 则触发 consumeWaitStrategy.idle(consumeIdleCounter)
bolt executor 维护了 bpIdleCount 及 consumeIdleCounter,分别用于 backPressureWaitStrategy.idle(bpIdleCount)以及 consumeWaitStrategy.idle(consumeIdleCounter)

小结

spout 和 bolt 的 executor 里头都用到了 backPressureWaitStrategy,读取的是 topology.backpressure.wait.strategy 参数(for any producer (spout/bolt/transfer thread) when the downstream Q is full),使用的实现类为 org.apache.storm.policy.WaitStrategyProgressive,在下游 component 的 recv queue 满的时候使用的背压策略;具体是使用 pendingEmits 队列来判断,spout 或 bolt 的 call 方法里头每次判断 pendingEmitsIsEmpty 都是调用 tryFlushPendingEmits,先尝试发送数据,如果下游成功接收,则 pendingEmits 队列为空,通过这种机制来动态判断下游负载,决定是否触发 backpressure
spout 使用的 spoutWaitStrategy,读取的是 topology.spout.wait.strategy 参数(employed when there is no data to produce),使用的实现类为 org.apache.storm.policy.WaitStrategyProgressive,在没有数据发射的时候使用;具体是使用 emittedCount 来判断
bolt 使用的 consumeWaitStrategy,在非 SystemBoltExecutor 的情况下读取的是 topology.bolt.wait.strategy 参数 (employed when there is no data in its receive buffer to process),使用的实现类为 org.apache.storm.policy.WaitStrategyProgressive,在 receive buffer 没有数据处理的时候使用;具体是使用 receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits) 返回的 consumeCount 来判断
spout 与 bolt 不同的还有一点就是 spout 除了 pendingEmitsIsEmpty 还多了一个 reachedMaxSpoutPending 参数,来判断是否继续产生数据,bolt 则使用 pendingEmitsIsEmpty 来判断是否可以继续消费数据
IWaitStrategy 除了 WaitStrategyProgressive 实现,还有 WaitStrategyPark 实现,该策略在 bolt 是 SystemBolt 的情况下使用

doc

IWaitStrategy
WaitStrategyProgressive
WaitStrategyPark

正文完
 0