本文主要研究一下 storm 的 IEventLogger
* EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging.
public interface IEventLogger {

void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context);

* This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging
* enabled.
* @param e the event
void log(EventInfo e);

void close();

* A wrapper for the fields that we would log.
class EventInfo {
private long ts;
private String component;
private int task;
private Object messageId;
private List<Object> values;

public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) {
this.ts = ts;
this.component = component;
this.task = task;
this.messageId = messageId;
this.values = values;

public long getTs() {
return ts;

public String getComponent() {
return component;

public int getTask() {
return task;

public Object getMessageId() {
return messageId;

public List<Object> getValues() {
return values;

* Returns a default formatted string with fields separated by “,”
* @return a default formatted string with fields separated by “,”
public String toString() {
return new Date(ts).toString() + “,” + component + “,” + String.valueOf(task) + “,”
+ (messageId == null ? “” : messageId.toString()) + “,” + values.toString();
IEventLogger 定义了 log 方法,同时也定义了 EventInfo 对象
public class FileBasedEventLogger implements IEventLogger {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

private static final int FLUSH_INTERVAL_MILLIS = 1000;

private Path eventLogPath;
private BufferedWriter eventLogWriter;
private ScheduledExecutorService flushScheduler;
private volatile boolean dirty = false;

private void initLogWriter(Path logFilePath) {
try {
LOG.info(“logFilePath {}”, logFilePath);
eventLogPath = logFilePath;
eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.APPEND);
} catch (IOException e) {
LOG.error(“Error setting up FileBasedEventLogger.”, e);
throw new RuntimeException(e);

private void setUpFlushTask() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()

flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
Runnable runnable = new Runnable() {
public void run() {
try {
if (dirty) {
dirty = false;
} catch (IOException ex) {
LOG.error(“Error flushing ” + eventLogPath, ex);
throw new RuntimeException(ex);

flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);

public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
String stormId = context.getStormId();
int port = context.getThisWorkerPort();

* Include the topology name & worker port in the file name so that
* multiple event loggers can log independently.
String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

Path path = Paths.get(workersArtifactRoot, “events.log”);
File dir = path.toFile().getParentFile();
if (!dir.exists()) {

public void log(EventInfo event) {
try {
//TODO: file rotation
dirty = true;
} catch (IOException ex) {
LOG.error(“Error logging event {}”, event, ex);
throw new RuntimeException(ex);

protected String buildLogMessage(EventInfo event) {
return event.toString();

public void close() {
try {

} catch (IOException ex) {
LOG.error(“Error closing event log.”, ex);


private void closeFlushScheduler() {
if (flushScheduler != null) {
try {
if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) {
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
// Preserve interrupt status

IEventLogger 默认的实现为 FileBasedEventLogger,它启动一个定时任务,每隔 FLUSH_INTERVAL_MILLIS 时间将数据 flush 到磁盘 (如果是 dirty 的话)
默认的文件路径为 workersArtifactRoot 目录下的 events.log

public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
if (numExecutors == null || numExecutors == 0) {
HashMap<String, Object> componentConf = new HashMap<>();
componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);

for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);

public static List<String> eventLoggerBoltFields() {
return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);

public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
Set<String> allIds = new HashSet<String>();

for (String id : allIds) {
inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
return inputs;

这里从 Config.TOPOLOGY_EVENTLOGGER_EXECUTORS 读取 numExecutors,如果为 null 则使用 Config.TOPOLOGY_WORKERS 的值,默认是 0,即禁用 event logger
这里创建了 EventLoggerBolt,该 bolt 使用了 fieldsGrouping(“component-id”) 以及 Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID) 将所有的 spout 及 bolt 都作为该 bolt 的 inputs,从而接收所有的 tuple,其字段为 ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES;同时也会对每个 spout 或 bolt 添加一个输出到名为 EVENTLOGGER_STREAM_ID 的 stream 的声明,这样使得数据得以流向 EventLoggerBolt

public class EventLoggerBolt implements IBolt {

The below field declarations are also used in common.clj to define the event logger output fields
public static final String FIELD_TS = “ts”;
public static final String FIELD_VALUES = “values”;
public static final String FIELD_COMPONENT_ID = “component-id”;
public static final String FIELD_MESSAGE_ID = “message-id”;
private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class);
private List<IEventLogger> eventLoggers;

public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
LOG.info(“EventLoggerBolt prepare called”);

eventLoggers = new ArrayList<>();
List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER);
if (registerInfo != null && !registerInfo.isEmpty()) {
initializeEventLoggers(topoConf, context, registerInfo);
} else {
initializeDefaultEventLogger(topoConf, context);

public void execute(Tuple input) {
LOG.debug(“** EventLoggerBolt got tuple from sourceComponent {}, with values {}”, input.getSourceComponent(), input.getValues());

Object msgId = input.getValueByField(FIELD_MESSAGE_ID);
EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(),
input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES));

for (IEventLogger eventLogger : eventLoggers) {

public void cleanup() {
for (IEventLogger eventLogger : eventLoggers) {

private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) {
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS);
Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS);

IEventLogger eventLogger;
try {
eventLogger = (IEventLogger) Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException(“Could not instantiate a class listed in config under section ”
+ Config.TOPOLOGY_EVENT_LOGGER_REGISTER + ” with fully qualified name ” + className, e);

eventLogger.prepare(topoConf, arguments, context);

private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) {
FileBasedEventLogger eventLogger = new FileBasedEventLogger();
eventLogger.prepare(topoConf, null, context);

EventLoggerBolt 在 prepare 的时候,从 topoConf 读取 Config.TOPOLOGY_EVENT_LOGGER_REGISTER 信息,如果 registerInfo 为空的话则使用默认的 FileBasedEventLogger,否则按 registerInfo 中注册的 eventLoggers 来初始化
这里的 execute 方法就是挨个遍历 eventLoggers,然后调用 log 方法


要开启 EventLogger 的话,要设置 Config.TOPOLOGY_EVENTLOGGER_EXECUTORS 的值大于 0(conf.setNumEventLoggers),默认为 0,即禁用。开启了 event logger 的话,可以点击 spout 或 bolt 的 debug,然后打开 events 链接,就可以在界面上查看 debug 期间的 tuple 数据。
设置 Config.TOPOLOGY_EVENTLOGGER_EXECUTORS 大于 0 了之后,如果没有自己设置 Config.TOPOLOGY_EVENT_LOGGER_REGISTER,则默认启用的是 FileBasedEventLogger,当开启 spout 或 bolt 的 debug 的时候,会将 EventInfo 打印到 workersArtifactRoot 目录下的 events.log
如果自定义了 Config.TOPOLOGY_EVENT_LOGGER_REGISTER(conf.registerEventLogger),则 StormCommon 采用的是该配置来初始化 EventLogger,默认的 FileBasedEventLogger 如果没有被设置进去的话,则不会被初始化;StormCommon 在 addEventLogger 的时候,对所有的 spout 及 bolt 增加一个 declareStream,输出到 EVENTLOGGER_STREAM_ID;同时对 EventLoggerBolt 通过类似 fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields(“component-id”)) 将所有的 spout 及 bolt 作为 inputs;输入到 EventLoggerBolt 的 tuple 的字段为 ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES


