聊聊storm window trident的FreshCollector

11次阅读

共计 13275 个字符,预计需要花费 34 分钟才能阅读完成。


本文主要研究一下 storm window trident 的 FreshCollector
实例
TridentTopology topology = new TridentTopology();
topology.newStream(“spout1”, spout)
.partitionBy(new Fields(“user”))
.window(windowConfig,windowsStoreFactory,new Fields(“user”,”score”),new UserCountAggregator(),new Fields(“aggData”))
.parallelismHint(1)
.each(new Fields(“aggData”), new PrintEachFunc(),new Fields());
WindowTridentProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
public class WindowTridentProcessor implements TridentProcessor {

private FreshCollector collector;

//……

public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
this.topologyContext = context;
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if (parents.size() != 1) {
throw new RuntimeException(“Aggregation related operation can only have one parent”);
}

Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

this.tridentContext = tridentContext;
collector = new FreshCollector(tridentContext);
projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

windowStore = windowStoreFactory.create(stormConf);
windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

tridentWindowManager = storeTuplesInStore ?
new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
: new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

tridentWindowManager.prepare();
}

public void finishBatch(ProcessorContext processorContext) {

Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);

LOG.debug(“Received finishBatch of : [{}] “, batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);

List<Integer> pendingTriggerIds = null;
List<String> triggerKeys = new ArrayList<>();
Iterable<Object> triggerValues = null;

if (retriedAttempt(batchId)) {
pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
if (pendingTriggerIds != null) {
for (Integer pendingTriggerId : pendingTriggerIds) {
triggerKeys.add(triggerKey(pendingTriggerId));
}
triggerValues = windowStore.get(triggerKeys);
}
}

// if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
if(triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
LOG.debug(“pending triggers at batch: [{}] and triggers.size: [{}] “, batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
while (pendingTriggersIter.hasNext()) {
triggerResult = pendingTriggersIter.next();
for (List<Object> aggregatedResult : triggerResult.result) {
String triggerKey = triggerKey(triggerResult.id);
triggerKeys.add(triggerKey);
values.add(aggregatedResult);
pendingTriggerIds.add(triggerResult.id);
}
pendingTriggersIter.remove();
}
triggerValues = values;
} finally {
// store inprocess triggers of a batch in store for batch retries for any failures
if (!pendingTriggerIds.isEmpty()) {
windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
}
}
}

collector.setContext(processorContext);
int i = 0;
for (Object resultValue : triggerValues) {
collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
}
collector.setContext(null);
}
}

WindowTridentProcessor 在 prepare 的时候创建了 FreshCollector
finishBatch 的时候,调用 FreshCollector.emit 将窗口的 aggregate 的结果集传递过去
传递的数据结构为 ConsList,其实是个 AbstractList 的实现,由 Object 类型的 first 元素,以及 List<Object> 结构的_elems 组成

FreshCollector
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/FreshCollector.java
public class FreshCollector implements TridentCollector {
FreshOutputFactory _factory;
TridentContext _triContext;
ProcessorContext context;

public FreshCollector(TridentContext context) {
_triContext = context;
_factory = new FreshOutputFactory(context.getSelfOutputFields());
}

public void setContext(ProcessorContext pc) {
this.context = pc;
}

@Override
public void emit(List<Object> values) {
TridentTuple toEmit = _factory.create(values);
for(TupleReceiver r: _triContext.getReceivers()) {
r.execute(context, _triContext.getOutStreamId(), toEmit);
}
}

@Override
public void reportError(Throwable t) {
_triContext.getDelegateCollector().reportError(t);
}

public Factory getOutputFactory() {
return _factory;
}
}

FreshCollector 在构造器里头根据 context 的 selfOutputFields(第一个 field 固定为_task_info,之后的几个 field 为用户在 window 方法定义的 functionFields) 构造 FreshOutputFactory
emit 方法,首先使用 FreshOutputFactory 根据 outputFields 构造 TridentTupleView,之后获取 TupleReceiver,调用 TupleReceiver 的 execute 方法把 TridentTupleView 传递过去
这里的 TupleReceiver 有 ProjectedProcessor、PartitionPersistProcessor

TridentTupleView.FreshOutputFactory
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.java
public static class FreshOutputFactory implements Factory {
Map<String, ValuePointer> _fieldIndex;
ValuePointer[] _index;

public FreshOutputFactory(Fields selfFields) {
_fieldIndex = new HashMap<>();
for(int i=0; i<selfFields.size(); i++) {
String field = selfFields.get(i);
_fieldIndex.put(field, new ValuePointer(0, i, field));
}
_index = ValuePointer.buildIndex(selfFields, _fieldIndex);
}

public TridentTuple create(List<Object> selfVals) {
return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
}

@Override
public Map<String, ValuePointer> getFieldIndex() {
return _fieldIndex;
}

@Override
public int numDelegates() {
return 1;
}

@Override
public List<String> getOutputFields() {
return indexToFieldsList(_index);
}
}

FreshOutputFactory 是 TridentTupleView 的一个静态类,其构造方法主要是计算_index 以及_fieldIndex
_fieldIndex 是一个 map,key 是 field 字段,value 是 ValuePointer,记录其 delegateIndex(这里固定为 0)、index 及 field 信息;第一个 field 为_task_info,index 为 0;之后的 fields 为用户在 window 方法定义的 functionFields
这里的 create 方法主要是构造 TridentTupleView,其构造器第一个值为 IPersistentVector,第二个值为_index,第三个值为_fieldIndex

ValuePointer
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/ValuePointer.java
public class ValuePointer {
public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
for(ValuePointer ptr: pointers) {
ret.put(ptr.field, ptr);
}
return ret;
}

public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
if(fieldsOrder.size()!=pointers.size()) {
throw new IllegalArgumentException(“Fields order must be same length as pointers map”);
}
ValuePointer[] ret = new ValuePointer[pointers.size()];
for(int i=0; i<fieldsOrder.size(); i++) {
ret[i] = pointers.get(fieldsOrder.get(i));
}
return ret;
}

public int delegateIndex;
protected int index;
protected String field;

public ValuePointer(int delegateIndex, int index, String field) {
this.delegateIndex = delegateIndex;
this.index = index;
this.field = field;
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
这里的 buildIndex,主要是根据 selfOutputFields 的顺序返回 ValuePointer 数组
ProjectedProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/ProjectedProcessor.java
public class ProjectedProcessor implements TridentProcessor {
Fields _projectFields;
ProjectionFactory _factory;
TridentContext _context;

public ProjectedProcessor(Fields projectFields) {
_projectFields = projectFields;
}

@Override
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
if(tridentContext.getParentTupleFactories().size()!=1) {
throw new RuntimeException(“Projection processor can only have one parent”);
}
_context = tridentContext;
_factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);
}

@Override
public void cleanup() {
}

@Override
public void startBatch(ProcessorContext processorContext) {
}

@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
TridentTuple toEmit = _factory.create(tuple);
for(TupleReceiver r: _context.getReceivers()) {
r.execute(processorContext, _context.getOutStreamId(), toEmit);
}
}

@Override
public void finishBatch(ProcessorContext processorContext) {
}

@Override
public Factory getOutputFactory() {
return _factory;
}
}

ProjectedProcessor 在 prepare 的时候,创建了 ProjectionFactory,其_projectFields 就是 window 方法定义的 functionFields,这里还使用 tridentContext.getParentTupleFactories().get(0) 提取了 parent 的第一个 Factory,由于是 FreshCollector 传递过来的,因而这里是 TridentTupleView.FreshOutputFactory
execute 的时候,首先调用 ProjectionFactory.create 方法,对 TridentTupleView 进行字段提取操作,toEmit 就是根据 window 方法定义的 functionFields 重新提取的 TridentTupleView
execute 方法之后对_context.getReceivers() 挨个调用 execute 操作,将 toEmit 传递过去,这里的 receiver 就是 window 操作之后的各种 processor 了,比如 EachProcessor

TridentTupleView.ProjectionFactory
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.java
public static class ProjectionFactory implements Factory {
Map<String, ValuePointer> _fieldIndex;
ValuePointer[] _index;
Factory _parent;

public ProjectionFactory(Factory parent, Fields projectFields) {
_parent = parent;
if(projectFields==null) projectFields = new Fields();
Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
_fieldIndex = new HashMap<>();
for(String f: projectFields) {
_fieldIndex.put(f, parentFieldIndex.get(f));
}
_index = ValuePointer.buildIndex(projectFields, _fieldIndex);
}

public TridentTuple create(TridentTuple parent) {
if(_index.length==0) return EMPTY_TUPLE;
else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
}

@Override
public Map<String, ValuePointer> getFieldIndex() {
return _fieldIndex;
}

@Override
public int numDelegates() {
return _parent.numDelegates();
}

@Override
public List<String> getOutputFields() {
return indexToFieldsList(_index);
}
}
ProjectionFactory 是 TridentTupleView 的静态类,它在构造器里头根据 projectFields 构造_index 及_fieldIndex,这样 create 方法就能根据所需的字段创建 TridentTupleView
EachProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java
public class EachProcessor implements TridentProcessor {
Function _function;
TridentContext _context;
AppendCollector _collector;
Fields _inputFields;
ProjectionFactory _projection;

public EachProcessor(Fields inputFields, Function function) {
_function = function;
_inputFields = inputFields;
}

@Override
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size()!=1) {
throw new RuntimeException(“Each operation can only have one parent”);
}
_context = tridentContext;
_collector = new AppendCollector(tridentContext);
_projection = new ProjectionFactory(parents.get(0), _inputFields);
_function.prepare(conf, new TridentOperationContext(context, _projection));
}

@Override
public void cleanup() {
_function.cleanup();
}

@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext, tuple);
_function.execute(_projection.create(tuple), _collector);
}

@Override
public void startBatch(ProcessorContext processorContext) {
}

@Override
public void finishBatch(ProcessorContext processorContext) {
}

@Override
public Factory getOutputFactory() {
return _collector.getOutputFactory();
}
}

EachProcessor 的 execute 方法,首先设置_collector 的 context 为 processorContext,然后调用_function.execute 方法
这里调用了_projection.create(tuple) 来提取字段,主要是根据_function 定义的 inputFields 来提取
这里传递给_function 的 collector 为 AppendCollector

AppendCollector
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java
public class AppendCollector implements TridentCollector {
OperationOutputFactory _factory;
TridentContext _triContext;
TridentTuple tuple;
ProcessorContext context;

public AppendCollector(TridentContext context) {
_triContext = context;
_factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
}

public void setContext(ProcessorContext pc, TridentTuple t) {
this.context = pc;
this.tuple = t;
}

@Override
public void emit(List<Object> values) {
TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
for(TupleReceiver r: _triContext.getReceivers()) {
r.execute(context, _triContext.getOutStreamId(), toEmit);
}
}

@Override
public void reportError(Throwable t) {
_triContext.getDelegateCollector().reportError(t);
}

public Factory getOutputFactory() {
return _factory;
}
}
AppendCollector 在构造器里头创建了 OperationOutputFactory,其 emit 方法也是提取 OperationOutputFields,然后挨个调用_triContext.getReceivers() 的 execute 方法;如果 each 之后没有其他操作,那么 AppendCollector 的_triContext.getReceivers() 就为空
小结

WindowTridentProcessor 里头使用的是 FreshCollector,WindowTridentProcessor 在 finishBatch 的时候,会从 TridentWindowManager 提取 window 创建的 pendingTriggers(提取之后会将其数据从 pendingTriggers 移除),里头包含了窗口累积的数据,然后使用 FreshCollector 发射这些数据,默认第一个 value 为 TriggerInfo,第二个 value 就是窗口累积发射的 values
FreshCollector 的 emit 方法首先使用 TridentTupleView.FreshOutputFactory 根据 selfOutputFields(第一个 field 固定为_task_info,之后的几个 field 为用户在 window 方法定义的 functionFields) 构建 TridentTupleView,然后挨个调用_triContext.getReceivers() 的 execute 方法
后续的 receivers 中有一个 ProjectedProcessor,用于根据 window 方法定义的 functionFields 重新提取的 TridentTupleView,它的 execute 方法也类似 FreshCollector.emit 方法,先提取所需字段构造 TridentTupleView,然后挨个调用_triContext.getReceivers() 的 execute 方法 ( 比如 EachProcessor.execute)
EachProcessor 使用的 collector 为 AppendCollector,它的 emit 方法也类似 FreshCollector 的 emit 方法,先进行字段提取构造 TridentTupleView,然后挨个调用_triContext.getReceivers() 的 execute 方法
FreshCollector 的 emit 方法与 ProjectedProcessor 的 execute 方法以及 AppendCollector 的 emit 方法都非常类似,首先是使用 Factory 提取所需字段构建 TridentTupleView,然后挨个调用_triContext.getReceivers() 的 execute 方法;当一个_triContext 没有 receiver 的时候,tuple 的传递也就停止了

doc
Windowing Support in Core Storm

正文完
 0