乐趣区

[case44]聊聊storm trident的operations


本文主要研究一下 storm trident 的 operations
function filter projection
Function
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java
public interface Function extends EachOperation {
/**
* Performs the function logic on an individual tuple and emits 0 or more tuples.
*
* @param tuple The incoming tuple
* @param collector A collector instance that can be used to emit tuples
*/
void execute(TridentTuple tuple, TridentCollector collector);
}
Function 定义了 execute 方法,它发射的字段会追加到 input tuple 中
Filter
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Filter.java
public interface Filter extends EachOperation {

/**
* Determines if a tuple should be filtered out of a stream
*
* @param tuple the tuple being evaluated
* @return `false` to drop the tuple, `true` to keep the tuple
*/
boolean isKeep(TridentTuple tuple);
}
Filter 提供一个 isKeep 方法,用来决定该 tuple 是否输出
projection
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
*
* For example, if you had a Stream `mystream` containing the fields `[“a”, “b”, “c”,”d”]`, calling”
*
* “`java
* mystream.project(new Fields(“b”, “d”))
* “`
*
* would produce a stream containing only the fields `[“b”, “d”]`.
*
*
* @param keepFields The fields in the Stream to keep
* @return
*/
public Stream project(Fields keepFields) {
projectionValidation(keepFields);
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
}
这里使用了 ProjectedProcessor 来进行 projection 操作
repartitioning operations
partition
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @param partitioner
* @return
*/
public Stream partition(CustomStreamGrouping partitioner) {
return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
}
这里使用了 CustomStreamGrouping
partitionBy
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @param fields
* @return
*/
public Stream partitionBy(Fields fields) {
projectionValidation(fields);
return partition(Grouping.fields(fields.toList()));
}
这里使用 Grouping.fields
identityPartition
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @return
*/
public Stream identityPartition() {
return partition(new IdentityGrouping());
}
这里使用 IdentityGrouping
shuffle
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Use random round robin algorithm to evenly redistribute tuples across all target partitions
*
* @return
*/
public Stream shuffle() {
return partition(Grouping.shuffle(new NullStruct()));
}
这里使用 Grouping.shuffle
localOrShuffle
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
* for local tasks.
*
* @return
*/
public Stream localOrShuffle() {
return partition(Grouping.local_or_shuffle(new NullStruct()));
}
这里使用 Grouping.local_or_shuffle
global
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
* @return
*/
public Stream global() {
// use this instead of storm’s built in one so that we can specify a singleemitbatchtopartition
// without knowledge of storm’s internals
return partition(new GlobalGrouping());
}
这里使用 GlobalGrouping
batchGlobal
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
* partitions.
*
* @return
*/
public Stream batchGlobal() {
// the first field is the batch id
return partition(new IndexHashGrouping(0));
}
这里使用 IndexHashGrouping,是对整个 batch 维度的 repartition
broadcast
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
* a stateQuery on every partition of data.
*
* @return
*/
public Stream broadcast() {
return partition(Grouping.all(new NullStruct()));
}
这里使用 Grouping.all
groupBy
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Grouping Operation
*
* @param fields
* @return
*/
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
return new GroupedStream(this, fields);
}
这里返回的是 GroupedStream
aggregators
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
//partition aggregate
public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}

public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}

public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
}

public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}

public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
}

//aggregate
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}

public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}

public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}

//persistent aggregate
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}

public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}

public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
// replaces normal aggregation here with a global grouping because it needs to be consistent across batches
return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
.aggregate(inputFields, agg, functionFields)
.chainEnd()
.partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields);
}

public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}

public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}

public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
}

trident 的 aggregators 主要分为三类,分别是 partitionAggregate、aggregate、persistentAggregate;aggregator 操作会改变输出
partitionAggregate 其作用的粒度为每个 partition,而非整个 batch
aggregrate 操作作用的粒度为 batch,对每个 batch,它先使用 global 操作将该 batch 的 tuple 从所有 partition 合并到一个 partition,最后再对 batch 进行 aggregation 操作;这里提供了三类参数,分别是 Aggregator、CombinerAggregator、ReducerAggregator;调用 stream.aggregrate 方法时,相当于一次 global aggregation,此时使用 Aggregator 或 ReducerAggregator 时,stream 会先将 tuple 划分到一个 partition,然后再进行 aggregate 操作;而使用 CombinerAggregator 时,trident 会进行优化,先对每个 partition 进行局部的 aggregate 操作,然后再划分到一个 partition,最后再进行 aggregate 操作,因而相对 Aggregator 或 ReducerAggregator 可以节省网络传输耗时
persistentAggregate 操作会对 stream 上所有 batch 的 tuple 进行 aggretation,然后将结果存储在 state 中

Aggregator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Aggregator.java
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T val, TridentTuple tuple, TridentCollector collector);
void complete(T val, TridentCollector collector);
}

Aggregator 首先会调用 init 进行初始化,然后通过参数传递给 aggregate 以及 complete 方法
对于 batch partition 中的每个 tuple 执行一次 aggregate;当 batch partition 中的 tuple 执行完 aggregate 之后执行 complete 方法
假设自定义 Aggregator 为累加操作,那么对于 [4]、[7]、[8] 这批 tuple,init 为 0,对于[4],val=0,0+4=4;对于[7],val=4,4+7=11;对于[8],val=11,11+8=19;然后 batch 结束,val=19,此时执行 complete,可以使用 collector 发射数据

CombinerAggregator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/CombinerAggregator.java
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}

CombinerAggregator 每收到一个 tuple,就调用 init 获取当前 tuple 的值,调用 combine 操作使用前一个 combine 的结果 (没有的话取 zero 的值) 与 init 取得的值进行新的 combine 操作,如果该 partition 中没有 tuple,则返回 zero 方法的值
假设 combine 为累加操作,zero 返回 0,那么对于 [4]、[7]、[8] 这批 tuple,init 值分别是 4、7、8,对于[4],没有前一个 combine 结果,于是 val1=0,val2=4,combine 结果为 4;对于[7],val1=4,val2=7,combine 结果为 11;对于[8],val1 为 11,val2 为 8,combine 结果为 19
CombinerAggregator 操作的网络开销相对较低,因此性能比其他两类 aggratator 好

ReducerAggregator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/ReducerAggregator.java
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator 在对一批 tuple 进行计算时,先调用一次 init 获取初始值,然后再执行 reduce 操作,curr 值为前一次 reduce 操作的值,没有的话,就是 init 值
假设 reduce 为累加操作,init 返回 0,那么对于 [4]、[7]、[8] 这批 tuple,对于[4],init 为 0,然后 curr=0,先是 0 +4=4;对于[7],curr 为 4,就是 4 +7=11;对于[8],curr 为 11,最后就是 11+8=19

topology stream operations
join
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
return join(streams, joinFields, outFields, JoinType.INNER);
}

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
return join(streams, joinFields, outFields, repeat(streams.size(), type));
}

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);

}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
}

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
return join(streams, joinFields, outFields, JoinType.INNER, mode);
}

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
}

public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);

}

public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
switch (mode) {
case COMPACT:
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
outFields);

case PRESERVE:
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
outFields);

default:
throw new IllegalArgumentException(“Unsupported out-fields mode: ” + mode);
}
}
可以看到 join 最后调用了 multiReduce,对于 COMPACT 类型使用的 GroupedMultiReducer 是 JoinerMultiReducer,对于 PRESERVE 类型使用的 GroupedMultiReducer 是 PreservingFieldsOrderJoinerMultiReducer
merge
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream merge(Fields outputFields, Stream… streams) {
return merge(outputFields, Arrays.asList(streams));
}

public Stream merge(Stream… streams) {
return merge(Arrays.asList(streams));
}

public Stream merge(List<Stream> streams) {
return merge(streams.get(0).getOutputFields(), streams);
}

public Stream merge(Fields outputFields, List<Stream> streams) {
return multiReduce(streams, new IdentityMultiReducer(), outputFields);
}
可以看到 merge 最后是调用了 multiReduce,使用的 MultiReducer 是 IdentityMultiReducer
multiReduce
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
}

public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
}

public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
}

public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
}

public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
}

public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
}

public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
List<Fields> fullInputFields = new ArrayList<>();
List<Stream> streams = new ArrayList<>();
List<Fields> fullGroupFields = new ArrayList<>();
for(int i=0; i<groupedStreams.size(); i++) {
GroupedStream gs = groupedStreams.get(i);
Fields groupFields = gs.getGroupFields();
fullGroupFields.add(groupFields);
streams.add(gs.toStream().partitionBy(groupFields));
fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));

}
return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
}

public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
List<String> names = new ArrayList<>();
for(Stream s: streams) {
if(s._name!=null) {
names.add(s._name);
}
}
Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, “-“), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
return addSourcedNode(streams, n);
}
multiReduce 方法有个 MultiReducer 参数,join 与 merge 虽然都调用了 multiReduce,但是他们传的 MultiReducer 值不一样
小结

trident 的操作主要有几类,一类是基本的 function、filter、projection 操作;一类是 repartitioning 操作,主要是一些 grouping;一类是 aggregate 操作,包括 aggregate、partitionAggregate、persistentAggregate;一类是在 topology 对 stream 的 join、merge 操作
function 的话,若有 emit 字段会追加到原始的 tuple 上;filter 用于过滤 tuple;projection 用于提取字段
repartitioning 操作有 Grouping.local_or_shuffle、Grouping.shuffle、Grouping.all、GlobalGrouping、CustomStreamGrouping、IdentityGrouping、IndexHashGrouping 等;partition 操作可以理解为将输入的 tuple 分配到 task 上,也可以理解为是对 stream 进行 grouping
aggregate 操作的话,普通的 aggregate 操作有 3 类接口,分别是 Aggregator、CombinerAggregator、ReducerAggregator,其中 Aggregator 是最为通用的,它继承了 Operation 接口,而且在方法参数里头可以使用到 collector,这是 CombinerAggregator 与 ReducerAggregator 所没有的;而 CombinerAggregator 与 Aggregator 及 ReducerAggregator 不同的是,调用 stream.aggregrate 方法时,trident 会优先在 partition 进行局部聚合,然后再归一到一个 partition 做最后聚合,相对来说比较节省网络传输耗时,但是如果将 CombinerAggregator 与非 CombinerAggregator 的进行 chaining 的话,就享受不到这个优化;partitionAggregate 主要是在 partition 维度上进行操作;而 persistentAggregate 则是在整个 stream 的维度上对所有 batch 的 tuple 进行操作,结果持久化在 state 上
对于 stream 的 join 及 merge 操作,其最后都是依赖 multiReduce 来实现,只是传递的 MultiReducer 值不一样;join 的话 join 的话需要字段来进行匹配(字段名可以不一样),可以选择 JoinType,是 INNER 还是 OUTER,不过 join 是对于 spout 的 small batch 来进行 join 的;merge 的话,就是纯粹的几个 stream 进行 tuple 的归总。

doc

Trident API Overview
Trident API 综述
JStorm Trident 入门精华一页纸
Everything You Need to Know about Apache Storm
batch and partition – differences

退出移动版