共计 8642 个字符,预计需要花费 22 分钟才能阅读完成。
序
本文主要研究一下 storm 的 messageTimeout
TOPOLOGY_MESSAGE_TIMEOUT_SECS
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/**
* True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being
* accidentally timed out during the test.
*/
@isBoolean
public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = “topology.enable.message.timeouts”;
/**
* The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within
* this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time.
*/
@isInteger
@isPositiveNumber
@NotNull
public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = “topology.message.timeout.secs”;
/**
* How often a tick tuple from the “__system” component and “__tick” stream should be sent to tasks. Meant to be used as a
* component-specific configuration.
*/
@isInteger
public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = “topology.tick.tuple.freq.secs”;
defaults.yaml 中 topology.enable.message.timeouts 默认为 true
defaults.yaml 中 topology.message.timeout.secs 默认为 30
defaults.yaml 中 topology.tick.tuple.freq.secs 默认为 null,实际是取的 topology.message.timeout.secs 的值
StormCommon.addAcker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
public static void addAcker(Map<String, Object> conf, StormTopology topology) {
int ackerNum =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));
outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));
outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList(“id”, “time-delta-ms”)));
Map<String, Object> ackerConf = new HashMap<>();
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
for (Bolt bolt : topology.get_bolts().values()) {
ComponentCommon common = bolt.get_common();
common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”, “ack-val”)));
common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”)));
common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList(“id”)));
}
for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
Map<String, Object> spoutConf = componentConf(spout);
spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
Thrift.outputFields(Arrays.asList(“id”, “init-val”, “spout-task”)));
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
Thrift.prepareDirectGrouping());
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
Thrift.prepareDirectGrouping());
}
topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
}
storm 在 addAcker 的时候,使用了 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 的值作为 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
Executor.setupTicks
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
protected void setupTicks(boolean isSpout) {
final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
if (tickTimeSecs != null) {
boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)) {
LOG.info(“Timeouts disabled for executor {}:{}”, componentId, executorId);
} else {
StormTimer timerTask = workerData.getUserTimer();
timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
() -> {
TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
Constants.SYSTEM_COMPONENT_ID,
(int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
try {
receiveQueue.publish(tickTuple);
receiveQueue.flush(); // avoid buffering
} catch (InterruptedException e) {
LOG.warn(“Thread interrupted when emitting tick tuple. Setting interrupt flag.”);
Thread.currentThread().interrupt();
return;
}
}
);
}
}
}
Executor 在 setupTicks 的时候,使用了 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 作为 tickTimeSecs,即 tickTuple 的调度时间间隔
调度 tickTuple 的前提之一是有开启 Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS
该定时任务每隔 tickTimeSecs 发射一个 tickTuple,该 tuple 的 srcComponent 设置为 Constants.SYSTEM_COMPONENT_ID(__system),taskId 设置为 Constants.SYSTEM_TASK_ID(-1),streamId 设置为 Constants.SYSTEM_TICK_STREAM_ID(__tick)
SpoutExecutor.tupleActionFn
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
spoutOutputCollector.flush();
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId – idToTaskBase), tuple);
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
Object spoutObj = idToTask.get(taskId – idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
}
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
}
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException(“Fatal error, mismatched task ids: ” + taskId + ” ” + tupleInfo.getTaskId());
}
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
}
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId – idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId – idToTaskBase), timeDelta, tupleInfo, “FAIL-STREAM”);
}
}
}
}
SpoutExecutor 在 tupleActionFn 方法接收到 Constants.SYSTEM_TICK_STREAM_ID 的 tickTuple 的时候,触发 pending.rotate() 方法
RotatingMap.rotate
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
public Map<K, V> rotate() {
Map<K, V> dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
if (_callback != null) {
for (Entry<K, V> entry : dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
return dead;
}
rotate 方法会触发 expire
SpoutExecutor.RotatingMap.ExpiredCallback
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
this.threadId = Thread.currentThread().getId();
executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}
LOG.info(“Opening spout {}:{}”, componentId, taskIds);
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
this.spouts = new ArrayList<>();
for (Task task : idToTask) {
if (task != null) {
this.spouts.add((ISpout) task.getTaskObject());
}
}
this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
@Override
public void expire(Long key, TupleInfo tupleInfo) {
Long timeDelta = null;
if (tupleInfo.getTimestamp() != 0) {
timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
}
failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() – idToTaskBase), timeDelta, tupleInfo, “TIMEOUT”);
}
});
//……
}
SpoutExecutor 在 init 的时候注册了 pending 的 RotatingMap.ExpiredCallback,里头对过期的 tuple 调用 failSpoutMsg
小结
spout 的 messageTimeout 相关的参数为 Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS(topology.enable.message.timeouts 默认 true)、Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs 默认 30)
StormCommon 在 addAcker 的时候取 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 作为 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 值,作为 tickTimeSecs,即 ack 的 tickTuple 的调度时间间隔
SpoutExecutor 在接收到该 tickTuple 的时候,触发 RotatingMap 的 rotate 操作,进行 expire 回调,而 SpoutExecutor 在 init 的时候,注册了 RotatingMap.ExpiredCallback,对过期的 tuple 进行 failSpoutMsg 操作,回调 spout 的 fail 方法,至此完成 messageTimeout 的功能
doc
聊聊 storm 的 ack 机制
聊聊 storm 的 tickTuple
Guaranteeing Message Processing