乐趣区

聊聊storm的CheckpointSpout


本文主要研究一下 storm 的 CheckpointSpout
TopologyBuilder
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for (String boltId : _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try {
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException(
“Bolt ‘” + boltId + “‘ contains a non-serializable field of type ” + wrapperCause.getCause().getMessage() + “, ” +
“which was instantiated prior to topology creation. ” + wrapperCause.getCause().getMessage() + ” ” +
“should be instantiated within the prepare method of ‘” + boltId + ” at the earliest.”, wrapperCause);
}
throw wrapperCause;
}
}
for (String spoutId : _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try {
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException(
“Spout ‘” + spoutId + “‘ contains a non-serializable field of type ” + wrapperCause.getCause().getMessage() + “, ” +
“which was instantiated prior to topology creation. ” + wrapperCause.getCause().getMessage() + ” ” +
“should be instantiated within the prepare method of ‘” + spoutId + ” at the earliest.”, wrapperCause);
}
throw wrapperCause;
}
}

StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<>());

stormTopology.set_worker_hooks(_workerHooks);

if (!_componentToSharedMemory.isEmpty()) {
stormTopology.set_component_to_shared_memory(_componentToSharedMemory);
stormTopology.set_shared_memory(_sharedMemory);
}

return Utils.addVersions(stormTopology);
}

/**
* If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
*/
private void maybeAddCheckpointSpout() {
if (hasStatefulBolt) {
setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
}
}

private void maybeAddCheckpointInputs(ComponentCommon common) {
if (hasStatefulBolt) {
addCheckPointInputs(common);
}
}

/**
* If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
* checkpoint tuples can flow through the topology.
*/
private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
bolt = new CheckpointTupleForwarder(bolt);
}
return bolt;
}

/**
* For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
* bolts, add checkpoint stream from the previous bolt to its input.
*/
private void addCheckPointInputs(ComponentCommon component) {
Set<GlobalStreamId> checkPointInputs = new HashSet<>();
for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
String sourceId = inputStream.get_componentId();
if (_spouts.containsKey(sourceId)) {
checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
} else {
checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
}
}
for (GlobalStreamId streamId : checkPointInputs) {
component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
}
}

TopologyBuilder 在 createTopology 的时候,会调用 maybeAddCheckpointSpout,如果是 hasStatefulBolt 的话,则会自动创建并添加 CheckpointSpout
如果是 hasStatefulBolt,bolt 不是 StatefulBoltExecutor 类型,则会使用 CheckpointTupleForwarder 进行包装
如果是 hasStatefulBolt,会调用 addCheckPointInputs,配置 inputs

CheckpointSpout
storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.java
/**
* Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology.
* If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per
* topology. Checkpoint spout stores its internal state in a {@link KeyValueState}.
*
* @see CheckPointState
*/
public class CheckpointSpout extends BaseRichSpout {
public static final String CHECKPOINT_STREAM_ID = “$checkpoint”;
public static final String CHECKPOINT_COMPONENT_ID = “$checkpointspout”;
public static final String CHECKPOINT_FIELD_TXID = “txid”;
public static final String CHECKPOINT_FIELD_ACTION = “action”;
private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
private static final String TX_STATE_KEY = “__state”;
private TopologyContext context;
private SpoutOutputCollector collector;
private long lastCheckpointTs;
private int checkpointInterval;
private int sleepInterval;
private boolean recoveryStepInProgress;
private boolean checkpointStepInProgress;
private boolean recovering;
private KeyValueState<String, CheckPointState> checkpointState;
private CheckPointState curTxState;

public static boolean isCheckpoint(Tuple input) {
return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context));
}

// package access for unit test
void open(TopologyContext context, SpoutOutputCollector collector,
int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
this.context = context;
this.collector = collector;
this.checkpointInterval = checkpointInterval;
this.sleepInterval = checkpointInterval / 10;
this.checkpointState = checkpointState;
this.curTxState = checkpointState.get(TX_STATE_KEY);
lastCheckpointTs = 0;
recoveryStepInProgress = false;
checkpointStepInProgress = false;
recovering = true;
}

@Override
public void nextTuple() {
if (shouldRecover()) {
handleRecovery();
startProgress();
} else if (shouldCheckpoint()) {
doCheckpoint();
startProgress();
} else {
Utils.sleep(sleepInterval);
}
}

@Override
public void ack(Object msgId) {
LOG.debug(“Got ack with txid {}, current txState {}”, msgId, curTxState);
if (curTxState.getTxid() == ((Number) msgId).longValue()) {
if (recovering) {
handleRecoveryAck();
} else {
handleCheckpointAck();
}
} else {
LOG.warn(“Ack msgid {}, txState.txid {} mismatch”, msgId, curTxState.getTxid());
}
resetProgress();
}

@Override
public void fail(Object msgId) {
LOG.debug(“Got fail with msgid {}”, msgId);
if (!recovering) {
LOG.debug(“Checkpoint failed, will trigger recovery”);
recovering = true;
}
resetProgress();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
}

private int loadCheckpointInterval(Map<String, Object> topoConf) {
int interval = 0;
if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
}
// ensure checkpoint interval is not less than a sane low value.
interval = Math.max(100, interval);
LOG.info(“Checkpoint interval is {} millis”, interval);
return interval;
}

private boolean shouldCheckpoint() {
return !recovering && !checkpointStepInProgress &&
(curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
}

private boolean checkpointIntervalElapsed() {
return (System.currentTimeMillis() – lastCheckpointTs) > checkpointInterval;
}

private void doCheckpoint() {
LOG.debug(“In checkpoint”);
if (curTxState.getState() == COMMITTED) {
saveTxState(curTxState.nextState(false));
lastCheckpointTs = System.currentTimeMillis();
}
Action action = curTxState.nextAction(false);
emit(curTxState.getTxid(), action);
}

private void emit(long txid, Action action) {
LOG.debug(“Current state {}, emitting txid {}, action {}”, curTxState, txid, action);
collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
}

//……
}

CheckpointSpout 从 Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms) 读取 checkpoint 的时间间隔,defaults.yaml 中默认是 1000,如果没有指定,则使用 100,最低值为 100
nextTuple 方法首先判断 shouldRecover,如果需要恢复则调用 handleRecovery 进行恢复,然后 startProgress;如果需要 checkpoint 则进行 checkpoint,否则 sleepInterval 再进行下次判断
如果不需要 recover,则调用 shouldCheckpoint 方法判断是否需要进行 checkpoint,如果当前状态不是 COMMITTED 或者当前时间距离上次 checkpoint 的时间超过了 checkpointInterval,则进行 doCheckpoint 操作,往 CHECKPOINT_STREAM_ID 发送下一步的 action
CheckpointSpout 在收到 ack 之后会进行 saveTxState 操作,调用 checkpointState.commit 提交整个 checkpoint,然后调用 resetProgress 重置状态
如果是 fail 的 ack,则调用 resetProgress 重置状态

CheckPointState
storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.java
/**
* Get the next state based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next checkpoint state based on this state.
*/
public CheckPointState nextState(boolean recovering) {
CheckPointState nextState;
switch (state) {
case PREPARING:
nextState = recovering ? new CheckPointState(txid – 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
break;
case COMMITTING:
nextState = new CheckPointState(txid, COMMITTED);
break;
case COMMITTED:
nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
break;
default:
throw new IllegalStateException(“Unknown state ” + state);
}
return nextState;
}

/**
* Get the next action to perform based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next action to perform based on this state
*/
public Action nextAction(boolean recovering) {
Action action;
switch (state) {
case PREPARING:
action = recovering ? Action.ROLLBACK : Action.PREPARE;
break;
case COMMITTING:
action = Action.COMMIT;
break;
case COMMITTED:
action = recovering ? Action.INITSTATE : Action.PREPARE;
break;
default:
throw new IllegalStateException(“Unknown state ” + state);
}
return action;
}
CheckPointState 提供了 nextState 方法进行状态的切换,nextAction 方法则提供了对应 state 的的下个动作
BaseStatefulBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
public void execute(Tuple input) {
if (CheckpointSpout.isCheckpoint(input)) {
processCheckpoint(input);
} else {
handleTuple(input);
}
}

/**
* Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
*/
private void processCheckpoint(Tuple input) {
CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
if (shouldProcessTransaction(action, txid)) {
LOG.debug(“Processing action {}, txid {}”, action, txid);
try {
if (txid >= lastTxid) {
handleCheckpoint(input, action, txid);
if (action == ROLLBACK) {
lastTxid = txid – 1;
} else {
lastTxid = txid;
}
} else {
LOG.debug(“Ignoring old transaction. Action {}, txid {}”, action, txid);
collector.ack(input);
}
} catch (Throwable th) {
LOG.error(“Got error while processing checkpoint tuple”, th);
collector.fail(input);
collector.reportError(th);
}
} else {
LOG.debug(“Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, ” +
“transactionRequestCount {}”, action, txid, checkPointInputTaskCount, transactionRequestCount);
collector.ack(input);
}
}

/**
* Checks if check points have been received from all tasks across all input streams to this component
*/
private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
TransactionRequest request = new TransactionRequest(action, txid);
Integer count;
if ((count = transactionRequestCount.get(request)) == null) {
transactionRequestCount.put(request, 1);
count = 1;
} else {
transactionRequestCount.put(request, ++count);
}
if (count == checkPointInputTaskCount) {
transactionRequestCount.remove(request);
return true;
}
return false;
}

protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
}

BaseStatefulBoltExecutor 的 execute 方法首先通过 CheckpointSpout.isCheckpoint(input) 判断是否是 CheckpointSpout 发来的 tuple,如果是则执行 processCheckpoint
processCheckpoint 首先调用 shouldProcessTransaction 判断所有输入流的 task 是否都有给它发送 checkpint tuple 来决定是否往下处理
如果 txid 大于 lastTxid,则调用 handleCheckpoint 方法,该方法由子类实现

StatefulBoltExecutor.handleCheckpoint
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
//……

protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
LOG.debug(“handleCheckPoint with tuple {}, action {}, txid {}”, checkpointTuple, action, txid);
if (action == PREPARE) {
if (boltInitialized) {
bolt.prePrepare(txid);
state.prepareCommit(txid);
preparedTuples.addAll(collector.ackedTuples());
} else {
/*
* May be the task restarted in the middle and the state needs be initialized.
* Fail fast and trigger recovery.
*/
LOG.debug(“Failing checkpointTuple, PREPARE received when bolt state is not initialized.”);
collector.fail(checkpointTuple);
return;
}
} else if (action == COMMIT) {
bolt.preCommit(txid);
state.commit(txid);
ack(preparedTuples);
} else if (action == ROLLBACK) {
bolt.preRollback();
state.rollback();
fail(preparedTuples);
fail(collector.ackedTuples());
} else if (action == INITSTATE) {
if (!boltInitialized) {
bolt.initState((T) state);
boltInitialized = true;
LOG.debug(“{} pending tuples to process”, pendingTuples.size());
for (Tuple tuple : pendingTuples) {
doExecute(tuple);
}
pendingTuples.clear();
} else {
/*
* If a worker crashes, the states of all workers are rolled back and an initState message is sent across
* the topology so that crashed workers can initialize their state.
* The bolts that have their state already initialized need not be re-initialized.
*/
LOG.debug(“Bolt state is already initialized, ignoring tuple {}, action {}, txid {}”,
checkpointTuple, action, txid);
}
}
collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
collector.delegate.ack(checkpointTuple);
}

//……
}

StatefulBoltExecutor 继承了 BaseStatefulBoltExecutor,实现了 handleCheckpoint 方法
该方法根据不同的 action 进行相应的处理,PREPARE 的话,调用 bolt 的 prePrepare,对 state 调用 prepareCommit;COMMIT 的话则调用 bolt 的 preCommit,对 state 调用 commit;ROLLBACK 的话,调用 bolt 的 preRollback,对 state 调用 rollback;对于 INITSTATE,如果 bolt 未初始化,则调用 bolt 的 initState
根据 action 执行完之后,继续流转 checkpoint tuple,然后调用 collector.delegate.ack(checkpointTuple) 进行 ack

CheckpointTupleForwarder.handleCheckpoint
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
/**
* Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology.
* <p>
* When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder}
* so that the checkpoint tuples can flow through the entire topology DAG.
* </p>
*/
public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
//……
/**
* Forwards the checkpoint tuple downstream.
*
* @param checkpointTuple the checkpoint tuple
* @param action the action (prepare, commit, rollback or initstate)
* @param txid the transaction id.
*/
protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
collector.ack(checkpointTuple);
}

//……
}
CheckpointTupleForwarder 用于包装 non-stateful bolts,使得 checkpoint tuples 得以在整个 topology DAG 中顺利流转
小结

如果 topology 有 IStatefulBolt 的话 (IStatefulBolt 为 bolt 提供了存取 state 的抽象,通过 checkpiont 机制持久化 state 并利用 ack 机制提供 at-least once 语义),TopologyBuilder 会自动添加 CheckpointSpout,对于 bolt 不是 StatefulBoltExecutor 类型,则会使用 CheckpointTupleForwarder 进行包装,这样使得 checkpint tuple 贯穿整个 topology 的 DAG
CheckpointSpout 在 nextTuple 方法先判断是否需要 recover,在判断是否需要进行 checkpoint,都不是的话则 sleep 一段时间,sleepInterval 为 checkpointInterval/10,而 checkpointInterval 最小为 100,从 Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL 配置读取,默认是 1000;注意该值并不是意味着每隔 checkpointInterval 就进行 checkpoint 检测,也就是说不是 fixedRate 效果而是 fixedDelay 的效果,即如果当前 checkpoint 还没有结束,是不会再重复进行 checkpoint 检测的
recover 及 checkpoint 都会往 CHECKPOINT_STREAM_ID 发送 tuple;BaseStatefulBoltExecutor 则在 execute 方法封装了对 checkpoint tuple 的处理,非 checkpint tuple 则通过抽象方法 handleTuple 由子类去实现;具体的 handleCheckpoint 方法由子类实现,BaseStatefulBoltExecutor 只是对其进行前提判断,要求收到所有输入流的 task 发来的 checkpoint tuple,且 txid >= lastTxid 才可以执行 handleCheckpoint 操作
StatefulBoltExecutor 继承了 BaseStatefulBoltExecutor,实现了 handleCheckpoint 方法,对 PREPARE、COMMIT、ROLLBACK、INITSTATE 这几个 action(类似 three phase commit protocol) 进行相应处理,然后继续流转 checkpoint tuple,并进行 ack
CheckpointSpout 在发送 checkpoint tuple 的时候,使用 txid 作为 msgId 来发送可靠的 tuple,在所有 checkpoint tuple 在整个 topology 的 DAG 都被 ack 之后,会收到 ack,然后调用 checkpointState.commit 提交整个 checkpoint;如果是 fail 的话则重置相关状态;一般情况下 Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms,默认 1000,即 1 秒) 值小于 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,默认 30 秒);如果 checkpointInterval 设置得太大,中间假设 worker crash 了恢复后的 state 就不太实时,这样就失去了 checkpoint 的意义了。

doc

Storm State Management
Storm 状态管理
What is a checkpoint in databases?

退出移动版