乐趣区

聊聊storm的WindowedBoltExecutor


本文主要研究一下 storm 的 WindowedBoltExecutor
WindowedBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
/**
* An {@link IWindowedBolt} wrapper that does the windowing of tuples.
*/
public class WindowedBoltExecutor implements IRichBolt {
public static final String LATE_TUPLE_FIELD = “late_tuple”;
private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
private final IWindowedBolt bolt;
// package level for unit tests
transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
private transient WindowedOutputCollector windowedOutputCollector;
private transient WindowLifecycleListener<Tuple> listener;
private transient WindowManager<Tuple> windowManager;
private transient int maxLagMs;
private TimestampExtractor timestampExtractor;
private transient String lateTupleStream;
private transient TriggerPolicy<Tuple, ?> triggerPolicy;
private transient EvictionPolicy<Tuple, ?> evictionPolicy;
private transient Duration windowLengthDuration;

public WindowedBoltExecutor(IWindowedBolt bolt) {
this.bolt = bolt;
timestampExtractor = bolt.getTimestampExtractor();
}

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false);
}

// NOTE: the queue has to be thread safe.
protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
Collection<Event<Tuple>> queue, boolean stateful) {
Objects.requireNonNull(topoConf);
Objects.requireNonNull(context);
Objects.requireNonNull(collector);
Objects.requireNonNull(queue);
this.windowedOutputCollector = new WindowedOutputCollector(collector);
bolt.prepare(topoConf, context, windowedOutputCollector);
this.listener = newWindowLifecycleListener();
this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful);
start();
LOG.info(“Initialized window manager {} “, windowManager);
}

@Override
public void execute(Tuple input) {
if (isTupleTs()) {
long ts = timestampExtractor.extractTimestamp(input);
if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
windowManager.add(input, ts);
} else {
if (lateTupleStream != null) {
windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
} else {
LOG.info(“Received a late tuple {} with ts {}. This will not be processed.”, input, ts);
}
windowedOutputCollector.ack(input);
}
} else {
windowManager.add(input);
}
}

@Override
public void cleanup() {
if (waterMarkEventGenerator != null) {
waterMarkEventGenerator.shutdown();
}
windowManager.shutdown();
bolt.cleanup();
}

// for unit tests
WindowManager<Tuple> getWindowManager() {
return windowManager;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
if (lateTupleStream != null) {
declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
}
bolt.declareOutputFields(declarer);
}

@Override
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}

//……
}

WindowedBoltExecutor 实现了 IRichBolt 接口,在 prepare 的时候初始化 windowedOutputCollector、listener、windowManager,调用了 bolt.prepare;在 cleanup 的时候对 waterMarkEventGenerator、windowManager、bolt 进行清理;TopologyBuilder 在 setBolt 的时候,对原始的 IWindowedBolt 的实现类进行了一次包装,用 WindowedBoltExecutor 替代
declareOutputFields 采用的是 bolt.declareOutputFields(declarer);getComponentConfiguration 也返回的是 bolt.getComponentConfiguration();
execute 方法主要是将 tuple 添加到 windowManager,对于不纳入 window 的 tuple 则立刻进行 ack

WindowedOutputCollector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
/**
* Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting.
*/
private static class WindowedOutputCollector extends OutputCollector {
private List<Tuple> inputTuples;

WindowedOutputCollector(IOutputCollector delegate) {
super(delegate);
}

void setContext(List<Tuple> inputTuples) {
this.inputTuples = inputTuples;
}

@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
return emit(streamId, inputTuples, tuple);
}

@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
emitDirect(taskId, streamId, inputTuples, tuple);
}
}
WindowedOutputCollector 继承了 OutputCollector,可以看到这里重写了 emit 计 emitDirect 方法,默认对 inputTuples 进行 anchor
WindowLifecycleListener
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java
/**
* A callback for expiry, activation of events tracked by the {@link WindowManager}
*
* @param <T> The type of Event in the window (e.g. Tuple).
*/
public interface WindowLifecycleListener<T> {
/**
* Called on expiry of events from the window due to {@link EvictionPolicy}
*
* @param events the expired events
*/
void onExpiry(List<T> events);

/**
* Called on activation of the window due to the {@link TriggerPolicy}
*
* @param events the list of current events in the window.
* @param newEvents the newly added events since last activation.
* @param expired the expired events since last activation.
* @param referenceTime the reference (event or processing) time that resulted in activation
*/
default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) {
throw new UnsupportedOperationException(“Not implemented”);
}

/**
* Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in
* state and is huge to be loaded entirely in memory.
*
* @param eventsIt a supplier of iterator over the list of current events in the window
* @param newEventsIt a supplier of iterator over the newly added events since the last ativation
* @param expiredIt a supplier of iterator over the expired events since the last activation
* @param referenceTime the reference (event or processing) time that resulted in activation
*/
default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt,
Long referenceTime) {
throw new UnsupportedOperationException(“Not implemented”);
}
}

WindowLifecycleListener 定义了几个回调方法,分别是 onExpiry、onActivation
它们分别是由 EvictionPolicy、TriggerPolicy 两种策略来触发

EvictionPolicy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
/**
* Eviction policy tracks events and decides whether an event should be evicted from the window or not.
*
* @param <T> the type of event that is tracked.
*/
public interface EvictionPolicy<T, S> {
/**
* Decides if an event should be expired from the window, processed in the current window or kept for later processing.
*
* @param event the input event
* @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
*/
Action evict(Event<T> event);

/**
* Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not.
*
* @param event the input event to be tracked
*/
void track(Event<T> event);

/**
* Returns the current context that is part of this eviction policy.
*
* @return the eviction context
*/
EvictionContext getContext();

/**
* Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to
* set the reference timestamp.
*
* @param context the eviction context
*/
void setContext(EvictionContext context);

/**
* Resets the eviction policy.
*/
void reset();

/**
* Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures.
*
* @return the state
*/
S getState();

/**
* Restore the eviction policy from the state that was earlier checkpointed by the framework.
*
* @param state the state
*/
void restoreState(S state);

/**
* The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
*/
public enum Action {
/**
* expire the event and remove it from the queue.
*/
EXPIRE,
/**
* process the event in the current window of events.
*/
PROCESS,
/**
* don’t include in the current window but keep the event in the queue for evaluating as a part of future windows.
*/
KEEP,
/**
* stop processing the queue, there cannot be anymore events satisfying the eviction policy.
*/
STOP
}
}

EvictionPolicy 主要负责追踪 event,然后判断 event 是否该从 window 中移除
EvictionPolicy 有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy

TriggerPolicy
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/TriggerPolicy.java
/**
* Triggers the window calculations based on the policy.
*
* @param <T> the type of the event that is tracked
*/
public interface TriggerPolicy<T, S> {
/**
* Tracks the event and could use this to invoke the trigger.
*
* @param event the input event
*/
void track(Event<T> event);

/**
* resets the trigger policy.
*/
void reset();

/**
* Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete.
*/
void start();

/**
* Any clean up could be handled here.
*/
void shutdown();

/**
* Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures.
*
* @return the state
*/
S getState();

/**
* Restore the trigger policy from the state that was earlier checkpointed by the framework.
*
* @param state the state
*/
void restoreState(S state);
}

TriggerPolicy 主要是负责 window 的计算
TriggerPolicy 有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy

WindowedBoltExecutor.newWindowLifecycleListener
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
return new WindowLifecycleListener<Tuple>() {
@Override
public void onExpiry(List<Tuple> tuples) {
for (Tuple tuple : tuples) {
windowedOutputCollector.ack(tuple);
}
}

@Override
public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
windowedOutputCollector.setContext(tuples);
boltExecute(tuples, newTuples, expiredTuples, timestamp);
}

};
}

protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp));
}

这里创建了一个匿名的 WindowLifecycleListener 实现
在 onExpiry 的时候挨个对 tuple 进行 ack,在 onActivation 的时候,调用了 boltExecute,构造 TupleWindowImpl,传递给 bolt 进行执行

WindowedBoltExecutor.initWindowManager
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

WindowManager<Tuple> manager = stateful ?
new StatefulWindowManager<>(lifecycleListener, queue)
: new WindowManager<>(lifecycleListener, queue);

Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
// window length
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
} else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
windowLengthDuration = new Duration(
((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
TimeUnit.MILLISECONDS);
}
// sliding interval
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
} else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
slidingIntervalDuration =
new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
} else {
// default is a sliding window of count 1
slidingIntervalCount = new Count(1);
}
// tuple ts
if (timestampExtractor != null) {
// late tuple stream
lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
if (lateTupleStream != null) {
if (!context.getThisStreams().contains(lateTupleStream)) {
throw new IllegalArgumentException(
“Stream for late tuples must be defined with the builder method withLateTupleStream”);
}
}
// max lag
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
} else {
maxLagMs = DEFAULT_MAX_LAG_MS;
}
// watermark interval
int watermarkInterval;
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
} else {
watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
}
waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
maxLagMs, getComponentStreams(context));
} else {
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
throw new IllegalArgumentException(“Late tuple stream can be defined only when specifying a timestamp field”);
}
}
// validate
validate(topoConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
manager, evictionPolicy);
manager.setEvictionPolicy(evictionPolicy);
manager.setTriggerPolicy(triggerPolicy);
return manager;
}

private EvictionPolicy<Tuple, ?> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {
if (windowLengthCount != null) {
if (isTupleTs()) {
return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
} else {
return new CountEvictionPolicy<>(windowLengthCount.value);
}
} else {
if (isTupleTs()) {
return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
} else {
return new TimeEvictionPolicy<>(windowLengthDuration.value);
}
}
}

private TriggerPolicy<Tuple, ?> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
WindowManager<Tuple> manager, EvictionPolicy<Tuple, ?> evictionPolicy) {
if (slidingIntervalCount != null) {
if (isTupleTs()) {
return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
} else {
return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
}
} else {
if (isTupleTs()) {
return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
} else {
return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
}
}
}

对于 WindowedBoltExecutor 来说,stateful 为 false,这里创建的是 WindowManager
这里默认的 DEFAULT_MAX_LAG_MS 为 0,即没有 lag,默认的 DEFAULT_WATERMARK_EVENT_INTERVAL_MS 为 1000,即 1 秒
这里根据 windowLength 及 slidingInterval 指定的参数类型,来获取相应的 EvictionPolicy 及 TriggerPolicy,对于有配置 timestampField 的,参数是 Duration 的,则创建的是 WatermarkTimeEvictionPolicy 以及 WatermarkTimeTriggerPolicy

WindowManager
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
/**
* Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of the window due to
* {@link TriggerPolicy}.
*
* @param <T> the type of event in the window.
*/
public class WindowManager<T> implements TriggerHandler {

protected final Collection<Event<T>> queue;

private final AtomicInteger eventsSinceLastExpiry;

//……
/**
* Add an event into the window, with the given ts as the tracking ts.
*
* @param event the event to track
* @param ts the timestamp
*/
public void add(T event, long ts) {
add(new EventImpl<T>(event, ts));
}

/**
* Tracks a window event
*
* @param windowEvent the window event to track
*/
public void add(Event<T> windowEvent) {
// watermark events are not added to the queue.
if (!windowEvent.isWatermark()) {
queue.add(windowEvent);
} else {
LOG.debug(“Got watermark event with ts {}”, windowEvent.getTimestamp());
}
track(windowEvent);
compactWindow();
}

/**
* feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger.
*/
private void track(Event<T> windowEvent) {
evictionPolicy.track(windowEvent);
triggerPolicy.track(windowEvent);
}

/**
* expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.
*/
protected void compactWindow() {
if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
scanEvents(false);
}
}

/**
* Scan events in the queue, using the expiration policy to check if the event should be evicted or not.
*
* @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is
* found
* @return the list of events to be processed as a part of the current window
*/
private List<Event<T>> scanEvents(boolean fullScan) {
LOG.debug(“Scan events, eviction policy {}”, evictionPolicy);
List<T> eventsToExpire = new ArrayList<>();
List<Event<T>> eventsToProcess = new ArrayList<>();
try {
lock.lock();
Iterator<Event<T>> it = queue.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
Action action = evictionPolicy.evict(windowEvent);
if (action == EXPIRE) {
eventsToExpire.add(windowEvent.get());
it.remove();
} else if (!fullScan || action == STOP) {
break;
} else if (action == PROCESS) {
eventsToProcess.add(windowEvent);
}
}
expiredEvents.addAll(eventsToExpire);
} finally {
lock.unlock();
}
eventsSinceLastExpiry.set(0);
LOG.debug(“[{}] events expired from window.”, eventsToExpire.size());
if (!eventsToExpire.isEmpty()) {
LOG.debug(“invoking windowLifecycleListener.onExpiry”);
windowLifecycleListener.onExpiry(eventsToExpire);
}
return eventsToProcess;
}

//……
}

WindowedBoltExecutor 的 execute 主要是将 tuple 添加到 windowManager
EventImpl 的 isWatermark 返回 false,这里主要是执行 track 及 compactWindow 操作
track 主要是委托给 evictionPolicy 以及 triggerPolicy 进行 track,compactWindow 在 events 超过指定阈值的时候,会触发 scanEvents,不是 fullScan 的话,检测到一个非过期的 event 就跳出遍历,然后检测 eventsToExpire 是否为空如果有则触发 windowLifecycleListener.onExpiry(eventsToExpire);

WaterMarkEventGenerator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
/**
* Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple
* timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can
* be considered as late events.
*/
public class WaterMarkEventGenerator<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
private final WindowManager<T> windowManager;
private final int eventTsLag;
private final Set<GlobalStreamId> inputStreams;
private final Map<GlobalStreamId, Long> streamToTs;
private final ScheduledExecutorService executorService;
private final int interval;
private ScheduledFuture<?> executorFuture;
private volatile long lastWaterMarkTs;

//……

public void start() {
this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
}

@Override
public void run() {
try {
long waterMarkTs = computeWaterMarkTs();
if (waterMarkTs > lastWaterMarkTs) {
this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
lastWaterMarkTs = waterMarkTs;
}
} catch (Throwable th) {
LOG.error(“Failed while processing watermark event “, th);
throw th;
}
}
}

WindowedBoltExecutor 在 start 的时候会调用 WaterMarkEventGenerator 的 start 方法
该方法每隔 watermarkInterval 时间调度 WaterMarkEventGenerator 这个任务
其 run 方法就是计算 watermark(这批数据最小值 -lag),当大于 lastWaterMarkTs 时,更新 lastWaterMarkTs,往 windowManager 添加 WaterMarkEvent(该 event 的 isWatermark 为 true)
windowManager.add(new WaterMarkEvent<>(waterMarkTs))会触发 triggerPolicy.track(windowEvent)以及 compactWindow 操作

WatermarkTimeTriggerPolicy.track
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
@Override
public void track(Event<T> event) {
if (started && event.isWatermark()) {
handleWaterMarkEvent(event);
}
}

/**
* Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context
* so that the events falling within that window can be processed.
*/
private void handleWaterMarkEvent(Event<T> event) {
long watermarkTs = event.getTimestamp();
long windowEndTs = nextWindowEndTs;
LOG.debug(“Window end ts {} Watermark ts {}”, windowEndTs, watermarkTs);
while (windowEndTs <= watermarkTs) {
long currentCount = windowManager.getEventCount(windowEndTs);
evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
if (handler.onTrigger()) {
windowEndTs += slidingIntervalMs;
} else {
/*
* No events were found in the previous window interval.
* Scan through the events in the queue to find the next
* window intervals based on event ts.
*/
long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
LOG.debug(“Next aligned window end ts {}”, ts);
if (ts == Long.MAX_VALUE) {
LOG.debug(“No events to process between {} and watermark ts {}”, windowEndTs, watermarkTs);
break;
}
windowEndTs = ts;
}
}
nextWindowEndTs = windowEndTs;
}

/**
* Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return
* the end ts of the next aligned window, i.e. the ts when the window should fire.
*
* @param startTs the start timestamp (excluding)
* @param endTs the end timestamp (including)
* @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed.
*/
private long getNextAlignedWindowTs(long startTs, long endTs) {
long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
return nextTs;
}
return nextTs + (slidingIntervalMs – (nextTs % slidingIntervalMs));
}
handleWaterMarkEvent 会触发 handler.onTrigger()方法
WindowManager.onTrigger
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java
/**
* The callback invoked by the trigger policy.
*/
@Override
public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
try {
lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
windowEvents = scanEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
} finally {
lock.unlock();
}
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
events.add(event.get());
if (!prevWindowEvents.contains(event)) {
newEvents.add(event.get());
}
}
prevWindowEvents.clear();
if (!events.isEmpty()) {
prevWindowEvents.addAll(windowEvents);
LOG.debug(“invoking windowLifecycleListener onActivation, [{}] events in window.”, events.size());
windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime());
} else {
LOG.debug(“No events in the window, skipping onActivation”);
}
triggerPolicy.reset();
return !events.isEmpty();
}

onTrigger 方法主要是计算出三类数据,events、expiredEvents、newEvents
当 events 不为空时,触发 windowLifecycleListener.onActivation,也就是调用 bolt 的 execute 方法

小结

WindowedBoltExecutor 实现了 IRichBolt 接口,是一个 bolt,TopologyBuilder 在 setBolt 的时候,对用户的 IWindowedBolt 的实现类进行了一次包装,用 WindowedBoltExecutor 替代,它改造了 execute 方法,对于该纳入 windows 的调用 windowManager.add 添加,该丢弃的则进行 ack,而真正的 bolt 的 execute 操作,则需要等待 window 的触发
WindowLifecycleListener 有两个回调操作,一个是由 EvictionPolicy 触发的 onExpiry,一个是由 TriggerPolicy 触发的 onActivation 操作
由于 window 的 windowLength 及 slidingInterval 参数有 Duration 及 Count 两个维度,因而 EvictionPolicy 及 TriggerPolicy 也有这两类维度,外加 watermark 属性,因而每个 policy 分别有 4 个实现类,EvictionPolicy 有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy;TriggerPolicy 有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicy
windowManager.add 除了把 tuple 保存起来外,还调用了两类 trigger 的 track 操作,然后进行 compactWindow 操作;WatermarkTimeEvictionPolicy 的 track 目前没有操作,而 WatermarkTimeTriggerPolicy 的 track 方法在 event 是 WaterMarkEvent 的时候会触发 window 操作,调用 WindowManager 的 onTrigger 方法,进而筛选出 window 的数据,然后触发 windowLifecycleListener.onActivation 操作,最后触发 windowedBolt 的 execute 方法
WindowManager 的 onTrigger 方法以及 add 方法都会调用 scanEvents,区别是前者是 fullScan,后者不是;scanEvents 会调用 evictionPolicy.evict 来判断是否该剔除 tuple,进而触发 windowLifecycleListener.onExpiry 操作,该操作会对 tuple 进行 ack,即过期的 tuple 在 expired 的时候会自动 ack(理论上所有 tuple 都会过期,也就都会自动被 ack,因而要求 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 大于 windowLength + slidingInterval,避免还没 ack 就被认为超时)
WindowedBoltExecutor 在 start 的时候会启动 WaterMarkEventGenerator,它会注册一个定时任务,每隔 watermarkInterval 时间计算 watermark(这批数据最小值 -lag),当大于 lastWaterMarkTs 时,更新 lastWaterMarkTs,往 windowManager 添加 WaterMarkEvent(该 event 的 isWatermark 为 true),整个 WindowManager 的 onTrigger 方法 (即 windowLifecycleListener.onActivation 操作) 就是靠这里来触发的
关于 ack 的话,在 WindowedBoltExecutor.execute 方法对于未能进入 window 队列的,没有配置配置 Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM 的话,则立马 ack;在 tuple 过期的时候会自 ack;WindowedBoltExecutor 使用了 WindowedOutputCollector,它继承了 OutputCollector,对输入的 tuples 做 anchor 操作

doc
Windowing Support in Core Storm

退出移动版