聊聊storm的window trigger

54次阅读

共计 7335 个字符,预计需要花费 19 分钟才能阅读完成。


本文主要研究一下 storm 的 window trigger
WindowTridentProcessor.prepare
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
this.topologyContext = context;
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if (parents.size() != 1) {
throw new RuntimeException(“Aggregation related operation can only have one parent”);
}

Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

this.tridentContext = tridentContext;
collector = new FreshCollector(tridentContext);
projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

windowStore = windowStoreFactory.create(stormConf);
windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

tridentWindowManager = storeTuplesInStore ?
new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
: new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

tridentWindowManager.prepare();
}
这里调用了 tridentWindowManager.prepare()
AbstractTridentWindowManager.prepare
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
Aggregator aggregator, BatchOutputCollector delegateCollector) {
this.windowTaskId = windowTaskId;
this.windowStore = windowStore;
this.aggregator = aggregator;
this.delegateCollector = delegateCollector;

windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
windowManager.setEvictionPolicy(evictionPolicy);
triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
windowManager.setTriggerPolicy(triggerPolicy);
}

public void prepare() {
preInitialize();

initialize();

postInitialize();
}

private void postInitialize() {
// start trigger once the initialization is done.
triggerPolicy.start();
}
AbstractTridentWindowManager 在构造器里头调用 windowStrategy.getTriggerPolicy 获取 triggerPolicy;prepare 方法调用了 postInitialize,而它触发 triggerPolicy.start()
SlidingDurationWindowStrategy.getTriggerPolicy
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
/**
* Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
*
* @param triggerHandler
* @param evictionPolicy
* @return
*/
@Override
public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
}
以 SlidingDurationWindowStrategy 为例,这里创建的是 TimeTriggerPolicy,其 duration 为 windowConfig.getSlidingLength(),而 triggerHandler 则为 WindowManager
TimeTriggerPolicy.start
storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java
public void start() {
executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
}

private Runnable newTriggerTask() {
return new Runnable() {
@Override
public void run() {
// do not process current timestamp since tuples might arrive while the trigger is executing
long now = System.currentTimeMillis() – 1;
try {
/*
* set the current timestamp as the reference time for the eviction policy
* to evict the events
*/
if (evictionPolicy != null) {
evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
}
handler.onTrigger();
} catch (Throwable th) {
LOG.error(“handler.onTrigger failed “, th);
/*
* propagate it so that task gets canceled and the exception
* can be retrieved from executorFuture.get()
*/
throw th;
}
}
};
}
start 方法注册了一个调度任务,每隔 duration 触发 (windowConfig.getSlidingLength());而 run 方法是触发 handler.onTrigger(),即 WindowManager.onTrigger()
WindowManager.onTrigger
storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java
/**
* The callback invoked by the trigger policy.
*/
@Override
public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
try {
lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
windowEvents = scanEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
} finally {
lock.unlock();
}
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
events.add(event.get());
if (!prevWindowEvents.contains(event)) {
newEvents.add(event.get());
}
}
prevWindowEvents.clear();
if (!events.isEmpty()) {
prevWindowEvents.addAll(windowEvents);
LOG.debug(“invoking windowLifecycleListener onActivation, [{}] events in window.”, events.size());
windowLifecycleListener.onActivation(events, newEvents, expired);
} else {
LOG.debug(“No events in the window, skipping onActivation”);
}
triggerPolicy.reset();
return !events.isEmpty();
}

这里调用了 windowLifecycleListener.onActivation(events, newEvents, expired),而 windowLifecycleListener 为 AbstractTridentWindowManager 的 TridentWindowLifeCycleListener
TridentWindowLifeCycleListener.onActivation
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
/**
* Listener to reeive any activation/expiry of windowing events and take further action on them.
*/
class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

@Override
public void onExpiry(List<T> expiredEvents) {
LOG.debug(“onExpiry is invoked”);
onTuplesExpired(expiredEvents);
}

@Override
public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
LOG.debug(“onActivation is invoked with events size: [{}]”, events.size());
// trigger occurred, create an aggregation and keep them in store
int currentTriggerId = triggerId.incrementAndGet();
execAggregatorAndStoreResult(currentTriggerId, events);
}
}

private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

// run aggregator to compute the result
AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
Object state = aggregator.init(currentTriggerId, collector);
for (TridentTuple resultTuple : resultTuples) {
aggregator.aggregate(state, resultTuple, collector);
}
aggregator.complete(state, collector);

List<List<Object>> resultantAggregatedValue = collector.values;

ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
windowStore.putAll(entries);

pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
}

TridentWindowLifeCycleListener.onActivation 方法主要是 execAggregatorAndStoreResult
而 execAggregatorAndStoreResult 则依次调用 aggregator 的 init、aggregate 及 complete 方法
最后将 TriggerResult 放入 pendingTriggers

小结

storm 在 TimeTriggerPolicy.start 的时候注册了定时任务 TriggerTask,以 SlidingDurationWindowStrategy 为例,它的调度间隔为 windowConfig.getSlidingLength()
TriggerTask 定时触发 WindowManager.onTrigger 方法,该方法会回调 windowLifecycleListener.onActivation
AbstractTridentWindowManager 提供了 TridentWindowLifeCycleListener,它的 onActivation 主要是调用 execAggregatorAndStoreResult;而 execAggregatorAndStoreResult 方法主要完成对 aggregator 的一系列调用,先是调用 init 方法,然后遍历 resultTuples 挨个调用 aggregate 方法,最后 complete 方法 (从这里可以清晰看到 Aggregator 接口的各个方法的调用逻辑及顺序)

doc
Windowing Support in Core Storm

正文完
 0