序
本文主要研究一下 storm 的 reportError
IErrorReporter
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java
public interface IErrorReporter {
void reportError(Throwable error);
}
ISpoutOutputCollector、IOutputCollector、IBasicOutputCollector 接口均继承了 IErrorReporter 接口
ISpoutOutputCollector
storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java
public interface ISpoutOutputCollector extends IErrorReporter{
/**
Returns the task ids that received the tuples.
*/
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
long getPendingCount();
}
ISpoutOutputCollector 的实现类有 SpoutOutputCollector、SpoutOutputCollectorImpl 等
IOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
public interface IOutputCollector extends IErrorReporter {
/**
* Returns the task ids that received the tuples.
*/
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
void resetTimeout(Tuple input);
void flush();
}
IOutputCollector 的实现类有 OutputCollector、BoltOutputCollectorImpl 等
IBasicOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
public interface IBasicOutputCollector extends IErrorReporter {
List<Integer> emit(String streamId, List<Object> tuple);
void emitDirect(int taskId, String streamId, List<Object> tuple);
void resetTimeout(Tuple tuple);
}
IBasicOutputCollector 的实现类有 BasicOutputCollector
reportError
SpoutOutputCollectorImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override
public void reportError(Throwable error) {
executor.getErrorReportingMetrics().incrReportedErrorCount();
executor.getReportError().report(error);
}
BoltOutputCollectorImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@Override
public void reportError(Throwable error) {
executor.getErrorReportingMetrics().incrReportedErrorCount();
executor.getReportError().report(error);
}
可以看到 SpoutOutputCollectorImpl 及 BoltOutputCollectorImpl 的 reportError 方法,均调用了 executor.getReportError().report(error);
ReportError.report
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
public class ReportError implements IReportError {
private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);
private final Map<String, Object> topoConf;
private final IStormClusterState stormClusterState;
private final String stormId;
private final String componentId;
private final WorkerTopologyContext workerTopologyContext;
private int maxPerInterval;
private int errorIntervalSecs;
private AtomicInteger intervalStartTime;
private AtomicInteger intervalErrors;
public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId,
WorkerTopologyContext workerTopologyContext) {
this.topoConf = topoConf;
this.stormClusterState = stormClusterState;
this.stormId = stormId;
this.componentId = componentId;
this.workerTopologyContext = workerTopologyContext;
this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
this.intervalErrors = new AtomicInteger(0);
}
@Override
public void report(Throwable error) {
LOG.error(“Error”, error);
if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
intervalErrors.set(0);
intervalStartTime.set(Time.currentTimeSecs());
}
if (intervalErrors.incrementAndGet() <= maxPerInterval) {
try {
stormClusterState.reportError(stormId, componentId, Utils.hostname(),
workerTopologyContext.getThisWorkerPort().longValue(), error);
} catch (UnknownHostException e) {
throw Utils.wrapInRuntime(e);
}
}
}
}
可以看到这里先判断 interval 是否需要重置,然后再判断 error 是否超过 interval 的最大次数,没有超过的话,则调用 stormClusterState.reportError 写入到存储,比如 zk
StormClusterStateImpl.reportError
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@Override
public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
String path = ClusterUtils.errorPath(stormId, componentId);
String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
errorInfo.set_host(node);
errorInfo.set_port(port.intValue());
byte[] serData = Utils.serialize(errorInfo);
stateStorage.mkdirs(path, defaultAcls);
stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + “e”, serData, defaultAcls);
stateStorage.set_data(lastErrorPath, serData, defaultAcls);
List<String> childrens = stateStorage.get_children(path, false);
Collections.sort(childrens, new Comparator<String>() {
public int compare(String arg0, String arg1) {
return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
}
});
while (childrens.size() > 10) {
String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0);
try {
stateStorage.delete_node(znodePath);
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// if the node is already deleted, do nothing
LOG.warn(“Could not find the znode: {}”, znodePath);
} else {
throw e;
}
}
}
}
这里使用 ClusterUtils.errorPath(stormId, componentId) 获取写入的目录,再通过 ClusterUtils.lastErrorPath(stormId, componentId) 获取写入的路径
由于 zk 不适合存储大量数据,因而这里会判断如果 childrens 超过 10 的时候,会删除多余的节点,这里先按照节点名 substring(1) 升序排序,然后挨个删除
ClusterUtils.errorPath
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
public static final String ZK_SEPERATOR = “/”;
public static final String ERRORS_ROOT = “errors”;
public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
public static String errorPath(String stormId, String componentId) {
try {
return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, “UTF-8”);
} catch (UnsupportedEncodingException e) {
throw Utils.wrapInRuntime(e);
}
}
public static String lastErrorPath(String stormId, String componentId) {
return errorPath(stormId, componentId) + “-last-error”;
}
public static String errorStormRoot(String stormId) {
return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
}
errorPath 的路径为 /errors/{stormId}/{componentId},该目录下创建了以 e 开头的 EPHEMERAL_SEQUENTIAL 节点,error 信息首先追加到该目录下,然后再判断如果超过 10 个则删除旧的节点
lastErrorPath 的路径为 /errors/{stormId}/{componentId}-last-error,用于存储该 componentId 的最后一个 error
zkCli 查看
[zk: localhost:2181(CONNECTED) 21] ls /storm/errors
[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375]
[zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375
[print, print-last-error]
[zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print
[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296]
[zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299
[]
[zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error
[]
storm-ui
curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false
storm-ui 请求了如上的接口,获取了 topology 相关的数据,其中 spout 或 bolt 中包括了 lastError,展示了最近一个的 error 信息
StormApiResource
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
/**
* /api/v1/topology -> topo.
*/
@GET
@Path(“/topology/{id}”)
@AuthNimbusOp(value = “getTopology”, needsTopoId = true)
@Produces(“application/json”)
public Response getTopology(@PathParam(“id”) String id,
@DefaultValue(“:all-time”) @QueryParam(“window”) String window,
@QueryParam(“sys”) boolean sys,
@QueryParam(callbackParameterName) String callback) throws TException {
topologyPageRequestMeter.mark();
try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {
return UIHelpers.makeStandardResponse(
UIHelpers.getTopologySummary(
nimbusClient.getClient().getTopologyPageInfo(id, window, sys),
window, config,
servletRequest.getRemoteUser()
),
callback
);
}
}
这里调用了 nimbusClient.getClient().getTopologyPageInfo(id, window, sys) 方法
Nimbus.getTopologyPageInfo
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@Override
public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)
throws NotAliveException, AuthorizationException, TException {
try {
getTopologyPageInfoCalls.mark();
CommonTopoInfo common = getCommonTopoInfo(topoId, “getTopologyPageInfo”);
String topoName = common.topoName;
IStormClusterState state = stormClusterState;
int launchTimeSecs = common.launchTimeSecs;
Assignment assignment = common.assignment;
Map<List<Integer>, Map<String, Object>> beats = common.beats;
Map<Integer, String> taskToComp = common.taskToComponent;
StormTopology topology = common.topology;
Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
StormBase base = common.base;
if (base == null) {
throw new WrappedNotAliveException(topoId);
}
Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
List<WorkerSummary> workerSummaries = null;
Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
if (assignment != null) {
Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
Map<String, String> nodeToHost = assignment.get_node_host();
for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) {
NodeInfo ni = entry.getValue();
List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
exec2NodePort.put(entry.getKey(), nodePort);
}
workerSummaries = StatsUtil.aggWorkerStats(topoId,
topoName,
taskToComp,
beats,
exec2NodePort,
nodeToHost,
workerToResources,
includeSys,
true); //this is the topology page, so we know the user is authorized
}
TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
exec2NodePort,
taskToComp,
beats,
topology,
window,
includeSys,
state);
//……
return topoPageInfo;
} catch (Exception e) {
LOG.warn(“Get topo page info exception. (topology id='{}’)”, topoId, e);
if (e instanceof TException) {
throw (TException) e;
}
throw new RuntimeException(e);
}
}
这里调用了 StatsUtil.aggTopoExecsStats 来获取 TopologyPageInfo
StatsUtil.aggTopoExecsStats
storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
/**
* aggregate topo executors stats.
*
* @param topologyId topology id
* @param exec2nodePort executor -> host+port
* @param task2component task -> component
* @param beats executor[start, end] -> executor heartbeat
* @param topology storm topology
* @param window the window to be aggregated
* @param includeSys whether to include system streams
* @param clusterState cluster state
* @return TopologyPageInfo thrift structure
*/
public static TopologyPageInfo aggTopoExecsStats(
String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats,
StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList);
return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
}
StatsUtil.aggTopoExecsStats 方法最后调用了 postAggregateTopoStats 方法
StatsUtil.postAggregateTopoStats
storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData,
String topologyId, IStormClusterState clusterState) {
TopologyPageInfo ret = new TopologyPageInfo(topologyId);
ret.set_num_tasks(task2comp.size());
ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size());
ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0);
Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS);
Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
for (Object o : bolt2stats.entrySet()) {
Map.Entry e = (Map.Entry) o;
Map m = (Map) e.getValue();
long executed = getByKeyOr0(m, EXECUTED).longValue();
if (executed > 0) {
double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
m.put(EXEC_LATENCY, execLatencyTotal / executed);
double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
m.put(PROC_LATENCY, procLatencyTotal / executed);
}
m.remove(EXEC_LAT_TOTAL);
m.remove(PROC_LAT_TOTAL);
String id = (String) e.getKey();
m.put(“last-error”, getLastError(clusterState, topologyId, id));
aggBolt2stats.put(id, thriftifyBoltAggStats(m));
}
//……
return ret;
}
private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
return stormClusterState.lastError(stormId, compId);
}
这里有添加 last-error,通过 getLastError 调用,之后再通过 thriftifyBoltAggStats 转化到 thrift 对象
这里调用了 stormClusterState.lastError(stormId, compId) 获取 last-error
UIHelpers.getTopologySummary
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java
/**
* getTopologySummary.
* @param topologyPageInfo topologyPageInfo
* @param window window
* @param config config
* @param remoteUser remoteUser
* @return getTopologySummary
*/
public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
String window, Map<String, Object> config, String remoteUser) {
Map<String, Object> result = new HashMap();
Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
Map<String, Object> unpackedTopologyPageInfo =
unpackTopologyInfo(topologyPageInfo, window, config);
result.putAll(unpackedTopologyPageInfo);
result.put(“user”, remoteUser);
result.put(“window”, window);
result.put(“windowHint”, getWindowHint(window));
result.put(“msgTimeout”, messageTimeout);
result.put(“configuration”, topologyConf);
result.put(“visualizationTable”, new ArrayList());
result.put(“schedulerDisplayResource”, config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
return result;
}
获取到 TopologyPageInfo 之后,UIHelpers.getTopologySummary 对其进行 unpackTopologyInfo
UIHelpers.unpackTopologyInfo
storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java
/**
* unpackTopologyInfo.
* @param topologyPageInfo topologyPageInfo
* @param window window
* @param config config
* @return unpackTopologyInfo
*/
private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
Map<String, Object> result = new HashMap();
result.put(“id”, topologyPageInfo.get_id());
//……
Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats();
List<Map> spoutStats = new ArrayList();
for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
}
result.put(“spouts”, spoutStats);
Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
List<Map> boltStats = new ArrayList();
for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
}
result.put(“bolts”, boltStats);
//……
result.put(“samplingPct”, samplingPct);
result.put(“replicationCount”, topologyPageInfo.get_replication_count());
result.put(“topologyVersion”, topologyPageInfo.get_topology_version());
result.put(“stormVersion”, topologyPageInfo.get_storm_version());
return result;
}
/**
* getTopologySpoutAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param spoutId spoutId
* @return getTopologySpoutAggStatsMap
*/
private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
String spoutId) {
Map<String, Object> result = new HashMap();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.putAll(getCommonAggStatsMap(commonStats));
result.put(“spoutId”, spoutId);
result.put(“encodedSpoutId”, URLEncoder.encode(spoutId));
SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
result.put(“completeLatency”, spoutAggregateStats.get_complete_latency_ms());
ErrorInfo lastError = componentAggregateStats.get_last_error();
result.put(“lastError”, Objects.isNull(lastError) ? “” : getTruncatedErrorString(lastError.get_error()));
return result;
}
/**
* getTopologyBoltAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param boltId boltId
* @return getTopologyBoltAggStatsMap
*/
private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
String boltId) {
Map<String, Object> result = new HashMap();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.putAll(getCommonAggStatsMap(commonStats));
result.put(“boltId”, boltId);
result.put(“encodedBoltId”, URLEncoder.encode(boltId));
BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
result.put(“capacity”, StatsUtil.floatStr(boltAggregateStats.get_capacity()));
result.put(“executeLatency”, StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
result.put(“executed”, boltAggregateStats.get_executed());
result.put(“processLatency”, StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
ErrorInfo lastError = componentAggregateStats.get_last_error();
result.put(“lastError”, Objects.isNull(lastError) ? “” : getTruncatedErrorString(lastError.get_error()));
return result;
}
/**
* getTruncatedErrorString.
* @param errorString errorString
* @return getTruncatedErrorString
*/
private static String getTruncatedErrorString(String errorString) {
return errorString.substring(0, Math.min(errorString.length(), 200));
}
注意这里对 spout 调用了 getTopologySpoutAggStatsMap,对 bolt 调用了 getTopologyBoltAggStatsMap
这两个方法对 lastError 都进行了 getTruncatedErrorString 处理,最大只 substring(0,200)
crash log
2018-10-23 02:53:28.118 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
… 6 more
2018-10-23 02:53:28.129 o.a.s.d.executor Thread-10-print-executor[7 7] [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~[storm-core-1.2.2.jar:1.2.2]
at com.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26) ~[stormjar.jar:?]
at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.2.jar:1.2.2]
… 6 more
2018-10-23 02:53:28.175 o.a.s.util Thread-10-print-executor[7 7] [ERROR] Halting process: (“Worker died”)
java.lang.RuntimeException: (“Worker died”)
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792) [storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281) [storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-10-23 02:53:28.176 o.a.s.d.worker Thread-41 [INFO] Shutting down worker reportErrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3d3d36 6700
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Terminating messaging context
2018-10-23 02:53:28.177 o.a.s.d.worker Thread-41 [INFO] Shutting down executors
2018-10-23 02:53:28.177 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[8 8]
2018-10-23 02:53:28.182 o.a.s.util Thread-3-disruptor-executor[8 8]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.186 o.a.s.util Thread-4-spout-executor[8 8] [INFO] Async loop interrupted!
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[8 8]
2018-10-23 02:53:28.188 o.a.s.d.executor Thread-41 [INFO] Shutting down executor spout:[12 12]
2018-10-23 02:53:28.189 o.a.s.util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.util Thread-6-spout-executor[12 12] [INFO] Async loop interrupted!
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shut down executor spout:[12 12]
2018-10-23 02:53:28.190 o.a.s.d.executor Thread-41 [INFO] Shutting down executor count:[2 2]
2018-10-23 02:53:28.191 o.a.s.util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted!
2018-10-23 02:53:28.193 o.a.s.util Thread-8-count-executor[2 2] [INFO] Async loop interrupted!
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shut down executor count:[2 2]
2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executor print:[7 7]
2018-10-23 02:53:28.196 o.a.s.util Thread-9-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!
小结
spout 或 bolt 的方法里头如果抛出异常会导致整个 worker die 掉,同时也会自动记录异常到 zk 但是代价就是 worker die 掉不断被重启
reportError 可以通过 try catch 结合使用,使得有异常之后,worker 不会 die 掉,同时也把 error 信息记录起来;不过一个 topology 的同一个 component 也只记录最近 10 个异常,采用的是 EPHEMERAL_SEQUENTIAL 节点来保存,随着 worker 的 die 而销毁;lastError 采用的是 PERSISTENT 节点。两者在 topology 被 kill 的时候相关信息都会被删掉。
storm-ui 展示了每个 component 的 lastError 信息,展示的时候错误信息的长度最大为 200
doc
What is the use of OutputCollector class’ reportError(Throwable) method?