[case42]聊聊storm的ack机制

50次阅读

共计 20319 个字符,预计需要花费 51 分钟才能阅读完成。


本文主要研究一下 storm 的 ack 机制
实例
SentenceSpout
public class AckSentenceSpout extends BaseRichSpout {

private ConcurrentHashMap<UUID, Values> pending;

private SpoutOutputCollector collector;

private int index = 0;

private String[] sentences = {
“my dog has fleas”,
“i like cold beverages”,
“the dog ate my homework”,
“don’t have a cow man”,
“i don’t think i like fleas”
};

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}

@Override
public void nextTuple() {
Values values = new Values(sentences[index]);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
// this.collector.emit(values);
//NOTE 这里要传入 msgId
this.collector.emit(values, msgId);
index++;
if (index >= sentences.length) {
index = 0;
}
Utils.sleep(100);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“sentence”));
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}

//NOTE 对于 ack 是失败的,要重新发送
@Override
public void fail(Object msgId) {
this.collector.emit(this.pending.get(msgId), msgId);
}
}
对 spout 来说,需要在 emit 的时候要指定 msgId,然后需要缓存数据,在 ack 时删除,在 fail 的时候重新发送进行重试
AckWordCountBolt
public class AckWordCountBolt extends BaseRichBolt {
private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class);
private OutputCollector collector;
private HashMap<String, Long> counts = null;

public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}

public void execute(Tuple tuple) {
try{
String word = tuple.getStringByField(“word”);
Long count = this.counts.get(word);
if(count == null){
count = 0L;
}
count++;
this.counts.put(word, count);

//NOTE 传入当前处理的 tuple 作为 anchor
this.collector.emit(tuple, new Values(word, count));

//NOTE 这里要自己 ack
this.collector.ack(tuple);
}catch (Exception e){
LOGGER.error(e.getMessage(),e);
//NOTE 处理异常要 fail
this.collector.fail(tuple);
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“word”, “count”));
}
}
对于 bolt 来说,要做两件事情,一是要 anchor,在 emit 的时候把输入及输出 tuple 连接起来,构建 tuple tree;而要对处理完的 tuple 进行 ack,失败进行 fail 操作
源码解析
SpoutOutputCollectorImpl.emit
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
try {
return sendSpoutMsg(streamId, tuple, messageId, null);
} catch (InterruptedException e) {
LOG.warn(“Spout thread interrupted during emit().”);
throw new RuntimeException(e);
}
}

private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
InterruptedException {
emittedCount.increment();

List<Integer> outTasks;
if (outTaskId != null) {
outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
} else {
outTasks = taskData.getOutgoingTasks(stream, values);
}

final boolean needAck = (messageId != null) && hasAckers;

final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

final long rootId = needAck ? MessageId.generateId(random) : 0;

for (int i = 0; i < outTasks.size(); i++) {// perf critical path. don’t use iterators.
Integer t = outTasks.get(i);
MessageId msgId;
if (needAck) {
long as = MessageId.generateId(random);
msgId = MessageId.makeRootId(rootId, as);
ackSeq.add(as);
} else {
msgId = MessageId.makeUnanchored();
}

final TupleImpl tuple =
new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
AddressedTuple adrTuple = new AddressedTuple(t, tuple);
executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
}
if (isEventLoggers) {
taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
}

if (needAck) {
boolean sample = executor.samplerCheck();
TupleInfo info = new TupleInfo();
info.setTaskId(this.taskId);
info.setStream(stream);
info.setMessageId(messageId);
if (isDebug) {
info.setValues(values);
}
if (sample) {
info.setTimestamp(System.currentTimeMillis());
}

pending.put(rootId, info);
List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
} else if (messageId != null) {
// Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
if (isDebug) {
if (spoutExecutorThdId != Thread.currentThread().getId()) {
throw new RuntimeException(“Detected background thread emitting tuples for the spout. ” +
“Spout Output Collector should only emit from the main spout executor thread.”);
}
}
globalTupleInfo.clear();
globalTupleInfo.setStream(stream);
globalTupleInfo.setValues(values);
globalTupleInfo.setMessageId(messageId);
globalTupleInfo.setTimestamp(0);
globalTupleInfo.setId(“0:”);
Long timeDelta = 0L;
executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
}
return outTasks;
}
对于 needAck 的,首先创建 rootId,然后调用 ackSeq.add(as),之后触发 taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()) 操作
BoltOutputCollectorImpl.ack&fail
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@Override
public void ack(Tuple input) {
if (!ackingEnabled) {
return;
}
long ackValue = ((TupleImpl) input).getAckVal();
Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info(“BOLT ack TASK: {} TIME: {} TUPLE: {}”, taskId, delta, input);
}

if (!task.getUserContext().getHooks().isEmpty()) {
BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
boltAckInfo.applyOn(task.getUserContext());
}
if (delta >= 0) {
executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
task.getTaskMetrics().getAcked(input.getSourceStreamId()));
}
}

@Override
public void fail(Tuple input) {
if (!ackingEnabled) {
return;
}
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info(“BOLT fail TASK: {} TIME: {} TUPLE: {}”, taskId, delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
boltFailInfo.applyOn(task.getUserContext());
if (delta >= 0) {
executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
task.getTaskMetrics().getFailed(input.getSourceStreamId()));
}
}

BoltOutputCollectorImpl 的 ack 及 fail 均是调用 task.sendUnanchored 操作
ack 发送到 Acker.ACKER_ACK_STREAM_ID,fail 发送到 Acker.ACKER_FAIL_STREAM_ID

Task.sendUnanchored
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
Tuple tuple = getTuple(stream, values);
List<Integer> tasks = getOutgoingTasks(stream, values);
for (Integer t : tasks) {
AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
transfer.tryTransfer(addressedTuple, pendingEmits);
}
}
这里调用了 ExecutorTransfer.tryTransfer
ExecutorTransfer.tryTransfer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
LOG.info(“TRANSFERRING tuple {}”, addressedTuple);
}

JCQueue localQueue = getLocalQueue(addressedTuple);
if (localQueue != null) {
return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
}
return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
}

/**
* Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null.
* Returns false if unable to add to localQueue.
*/
public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) {
workerData.checkSerialize(serializer, tuple);
if (pendingEmits != null) {
if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
queuesToFlush.set(tuple.dest – indexingBase, localQueue);
return true;
} else {
pendingEmits.add(tuple);
return false;
}
} else {
return localQueue.tryPublish(tuple);
}
}

这里先根据 addressedTuple 判断目标队列是否是本地,是的话,调用 tryTransferLocal;不是的话,则调用 workerData.tryTransferRemote
tryTransferLocal 操作,执行的 localQueue.tryPublish,就是将数据放到 JCQueue 的 recvQueue 队列中
workerData.tryTransferRemote 的话,是通过 WorkerTransfer 将数据放到 TransferDrainer,在 flush 的时候传输到远程的 node 节点

StormCommon.systemTopology
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
return _instance.systemTopologyImpl(topoConf, topology);
}

protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
validateBasic(topology);

StormTopology ret = topology.deepCopy();
addAcker(topoConf, ret);
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
addSystemStreams(ret);

validateStructure(ret);

return ret;
}

public static void addAcker(Map<String, Object> conf, StormTopology topology) {
int ackerNum =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);

Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));
outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));
outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));

Map<String, Object> ackerConf = new HashMap<>();
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));

Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);

for (Bolt bolt : topology.get_bolts().values()) {
ComponentCommon common = bolt.get_common();
common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”, “ack-val”)));
common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”)));
common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”)));
}

for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
Map<String, Object> spoutConf = componentConf(spout);
spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
Thrift.outputFields(Arrays.asList(“id”, “init-val”, “spout-task”)));
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareDirectGrouping());
}

topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
}

public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
Set<String> boltIds = topology.get_bolts().keySet();
Set<String> spoutIds = topology.get_spouts().keySet();

for (String id : spoutIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList(“id”)));
}

for (String id : boltIds) {
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList(“id”)));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList(“id”)));
inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList(“id”)));
}
return inputs;
}

public static IBolt makeAckerBolt() {
return _instance.makeAckerBoltImpl();
}

public IBolt makeAckerBoltImpl() {
return new Acker();
}

WorkerState 构造器里头调用了 systemTopology 方法,添加了一些系统的组件,比如 Acker、MetricsConsumerBolt、SystemBolt
addAcker 执行了创建 ack 的逻辑,ackerNum 为 ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))),即如果 Config.TOPOLOGY_ACKER_EXECUTORS 没有配置,则取 Config.TOPOLOGY_WORKERS 的值
这里对 ack 配置了 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,值为 ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)),也就是 Acker 配置了 tickTuple,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 的时候触发超时操作
Thrift.prepareSerializedBoltDetails 传入参数的时候,调用 makeAckerBolt() 方法,创建 Acker
ack 里头对 input 及 output 配置了 Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID
addAcker 对 spout 配置了 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID、Acker.ACKER_RESET_TIMEOUT_STREAM_ID

Acker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
public class Acker implements IBolt {
public static final String ACKER_COMPONENT_ID = “__acker”;
public static final String ACKER_INIT_STREAM_ID = “__ack_init”;
public static final String ACKER_ACK_STREAM_ID = “__ack_ack”;
public static final String ACKER_FAIL_STREAM_ID = “__ack_fail”;
public static final String ACKER_RESET_TIMEOUT_STREAM_ID = “__ack_reset_timeout”;
public static final int TIMEOUT_BUCKET_NUM = 3;
private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
private static final long serialVersionUID = 4430906880683183091L;
private OutputCollector collector;
private RotatingMap<Object, AckObject> pending;

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);
}

@Override
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
Map<Object, AckObject> tmp = pending.rotate();
LOG.debug(“Number of timeout tuples:{}”, tmp.size());
return;
}

boolean resetTimeout = false;
String streamId = input.getSourceStreamId();
Object id = input.getValue(0);
AckObject curr = pending.get(id);
if (ACKER_INIT_STREAM_ID.equals(streamId)) {
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.updateAck(input.getLong(1));
curr.spoutTask = input.getInteger(2);
} else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
if (curr == null) {
curr = new AckObject();
pending.put(id, curr);
}
curr.updateAck(input.getLong(1));
} else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
// For the case that ack_fail message arrives before ack_init
if (curr == null) {
curr = new AckObject();
}
curr.failed = true;
pending.put(id, curr);
} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
resetTimeout = true;
if (curr != null) {
pending.put(id, curr);
} //else if it has not been added yet, there is no reason time it out later on
} else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
collector.flush();
return;
} else {
LOG.warn(“Unknown source stream {} from task-{}”, streamId, input.getSourceTask());
return;
}

int task = curr.spoutTask;
if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
if (curr.val == 0) {
pending.remove(id);
collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
} else if (curr.failed) {
pending.remove(id);
collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
} else if (resetTimeout) {
collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
} else {
throw new IllegalStateException(“The checks are inconsistent we reach what should be unreachable code.”);
}
}

collector.ack(input);
}

@Override
public void cleanup() {
LOG.info(“Acker: cleanup successfully”);
}

private long getTimeDeltaMillis(long startTimeMillis) {
return Time.currentTimeMillis() – startTimeMillis;
}

private static class AckObject {
public long val = 0L;
public long startTime = Time.currentTimeMillis();
public int spoutTask = -1;
public boolean failed = false;

// val xor value
public void updateAck(Long value) {
val = Utils.bitXor(val, value);
}
}
}

对于 tickTuple,执行 RotatingMap.rotate 操作
对于成功则调用 AckObject 的 updateAck 操作,对于失败的重新放回 pending 中
最后判断,如果 AckObject 的 val 为 0 的话,表示整个 tuple tree 都操作成功,则往 ACKER_ACK_STREAM_ID 通知;如果是 failed 的则往 ACKER_FAIL_STREAM_ID 通知;如果是 resetTimeout 的则往 ACKER_RESET_TIMEOUT_STREAM_ID 通知

SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public class SpoutExecutor extends Executor {
//……
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
spoutOutputCollector.flush();
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId – idToTaskBase), tuple);
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
Object spoutObj = idToTask.get(taskId – idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
}
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
}
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException(“Fatal error, mismatched task ids: ” + taskId + ” ” + tupleInfo.getTaskId());
}
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
}
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId – idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId – idToTaskBase), timeDelta, tupleInfo, “FAIL-STREAM”);
}
}
}
}

public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info(“SPOUT Acking message {} {}”, tupleInfo.getId(), tupleInfo.getMessageId());
}
spout.ack(tupleInfo.getMessageId());
if (!taskData.getUserContext().getHooks().isEmpty()) {// avoid allocating SpoutAckInfo obj if not necessary
new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
}
if (hasAckers && timeDelta != null) {
executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}

public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info(“SPOUT Failing {} : {} REASON: {}”, tupleInfo.getId(), tupleInfo, reason);
}
spout.fail(tupleInfo.getMessageId());
new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta,
taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
}

SpoutExecutor 在 tupleActionFn 里头,如果接收到 ACKER_ACK_STREAM_ID,则进行 ackSpoutMsg 操作;如果接收到 ACKER_FAIL_STREAM_ID,则进行 failSpoutMsg 操作
SpoutExecutor 的 ackSpoutMsg 及 failSpoutMsg 里头分别调用了具体 spout 的 ack 及 fail 方法,将 ack 的结果通知到原始的 spout

小结

storm 通过 ack 机制保证 least once processing 的语义
storm 在 WorkerState 构造器里头调用了 systemTopology 方法,对提交的 topology 添加了一些系统的组件,比如 Acker、MetricsConsumerBolt、SystemBolt;addAcker 里头添加了 acker,也对 spout 进行了 ack 相关的配置
spout 的 emit 方法如果带 messageId 的话,则表示需要 ack,然后会触发 taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()) 操作
bolt 通过 BoltOutputCollectorImpl 的 ack 或 fail 方法将 ack 信息发送出去,里头调用了 task.sendUnanchored 操作,而该操作是调用 ExecutorTransfer.tryTransfer,将 addressedTuple 发送到目标队列 (如果是远程 node 则会远程进行远程调用),发送到的 stream 为 Acker.ACKER_ACK_STREAM_ID 或者 Acker.ACKER_FAIL_STREAM_ID
acker 接收到 Acker.ACKER_ACK_STREAM_ID 调用 AckObject 的 updateAck 操作,对于 Acker.ACKER_FAIL_STREAM_ID 则重新放回 pending 中,然后对 AckObject 的 val 进行判断,如果为 0 的话,表示整个 tuple tree 都操作成功,则 emitDirect 往 ACKER_ACK_STREAM_ID 通知;如果是 failed 的则 emitDirect 往 ACKER_FAIL_STREAM_ID 通知对应的 task;如果是 resetTimeout 的则往 ACKER_RESET_TIMEOUT_STREAM_ID 通知对应的 task
SpoutExecutor 接收到接收到 ACKER_ACK_STREAM_ID,则进行 ackSpoutMsg 操作;接收到 ACKER_FAIL_STREAM_ID,则进行 failSpoutMsg 操作;ackSpoutMsg 及 failSpoutMsg 里头分别调用了具体 spout 的 ack 及 fail 方法,将 ack 的结果通知到原始的 spout

doc

JStorm Acker 详解
Guaranteeing Message Processing
storm ack 机制流程详解
Storm 的 ack 机制在项目应用中的坑
Storm 可靠性实例解析——ack 机制

正文完
 0