聊聊flink KeyedStream的aggregation操作

29次阅读

共计 8507 个字符,预计需要花费 22 分钟才能阅读完成。


本文主要研究一下 flink KeyedStream 的 aggregation 操作
实例
@Test
public void testMax() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WordCount[] data = new WordCount[]{new WordCount(1,”Hello”, 1), new
WordCount(1,”World”, 3), new WordCount(2,”Hello”, 1)};
env.fromElements(data)
.keyBy(“word”)
.max(“frequency”)
.addSink(new SinkFunction<WordCount>() {
@Override
public void invoke(WordCount value, Context context) throws Exception {
LOGGER.info(“value:{}”,value);
}
});
env.execute(“testMax”);
}
这里先对 word 字段进行 keyBy 操作,然后再通过 KeyedStream 的 max 方法按 frequency 字段取最大的 WordCount
KeyedStream.aggregate
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}

public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}

public SingleOutputStreamOperator<T> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
getExecutionConfig()));
}

public SingleOutputStreamOperator<T> max(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
false, getExecutionConfig()));
}

public SingleOutputStreamOperator<T> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
getExecutionConfig()));
}

public SingleOutputStreamOperator<T> min(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
false, getExecutionConfig()));
}

public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}

public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}

public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
getExecutionConfig()));
}

public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
first, getExecutionConfig()));
}

public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}

public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}

public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
getExecutionConfig()));
}

public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
first, getExecutionConfig()));
}

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform(“Keyed Aggregation”, getType(), operator);
}

KeyedStream 的 aggregation 方法是 protected 修饰的,sum、max、min、maxBy、minBy 这几个方法实际都是调用了 aggregate 方法,只是它们创建的 ComparableAggregator 的 AggregationType 不一样
每个 sum、max、min、maxBy、minBy 都有两个重载方法,一个是 int 类型的参数,一个是 String 类型的参数
maxBy、minBy 比 sum、max、min 多了 boolean 参数,该参数用于指定在碰到多个 compare 值相等时,是否取第一个返回

ComparableAggregator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {

private static final long serialVersionUID = 1L;

private Comparator comparator;
private boolean byAggregate;
private boolean first;
private final FieldAccessor<T, Object> fieldAccessor;

private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
this.comparator = Comparator.getForAggregation(aggregationType);
this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
this.first = first;
this.fieldAccessor = fieldAccessor;
}

public ComparableAggregator(int positionToAggregate,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
ExecutionConfig config) {
this(positionToAggregate, typeInfo, aggregationType, false, config);
}

public ComparableAggregator(int positionToAggregate,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
}

public ComparableAggregator(String field,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
}

@SuppressWarnings(“unchecked”)
@Override
public T reduce(T value1, T value2) throws Exception {
Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
Object o2 = fieldAccessor.get(value2);

int c = comparator.isExtremal(o1, o2);

if (byAggregate) {
// if they are the same we choose based on whether we want to first or last
// element with the min/max.
if (c == 0) {
return first ? value1 : value2;
}

return c == 1 ? value1 : value2;

} else {
if (c == 0) {
value1 = fieldAccessor.set(value1, o2);
}
return value1;
}
}
}
ComparableAggregator 继承了 AggregationFunction,而 AggregationFunction 则实现了 ReduceFunction 接口,这里 ComparableAggregator 实现的 reduce 方法,它首先借助 Comparator 来比较两个对象,然后根据是否是 byAggregate 做不同处理,如果是 byAggregate,则在比较值为 0 时,判断是否返回最先遇到的元素,如果是则返回最先遇到的,否则返回最后遇到的,比较值非 0 时,则取比较值最大的元素返回;如果不是 byAggregate,则如果比较值为 0,则使用反射方法将后者的值更新到 value1,最后都是返回 value1
AggregationFunction
@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
private static final long serialVersionUID = 1L;

/**
* Aggregation types that can be used on a windowed stream or keyed stream.
*/
public enum AggregationType {
SUM, MIN, MAX, MINBY, MAXBY,
}
}
AggregationFunction 声明实现了 ReduceFunction,同时定义了五种类型的 AggregationType
Comparator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@Internal
public abstract class Comparator implements Serializable {

private static final long serialVersionUID = 1L;

public abstract <R> int isExtremal(Comparable<R> o1, R o2);

public static Comparator getForAggregation(AggregationType type) {
switch (type) {
case MAX:
return new MaxComparator();
case MIN:
return new MinComparator();
case MINBY:
return new MinByComparator();
case MAXBY:
return new MaxByComparator();
default:
throw new IllegalArgumentException(“Unsupported aggregation type.”);
}
}

private static class MaxComparator extends Comparator {

private static final long serialVersionUID = 1L;

@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
return o1.compareTo(o2) > 0 ? 1 : 0;
}

}

private static class MaxByComparator extends Comparator {

private static final long serialVersionUID = 1L;

@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
int c = o1.compareTo(o2);
if (c > 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return -1;
}
}

}

private static class MinByComparator extends Comparator {

private static final long serialVersionUID = 1L;

@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
int c = o1.compareTo(o2);
if (c < 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return -1;
}
}

}

private static class MinComparator extends Comparator {

private static final long serialVersionUID = 1L;

@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
return o1.compareTo(o2) < 0 ? 1 : 0;
}

}
}

Comparator 则实现 Serializable 接口,定义了 isExtremal 抽象方法,同时提供了 getForAggregation 工厂方法,根据不同的 AggregationType 创建不同的 Comparator
Comparator 里头定义了 MaxComparator、MinComparator、MinByComparator、MaxByComparator 四个子类,它们都实现了 isExtremal 方法
MaxComparator 直接利用 Comparable 接口定义的 compareTo 方法,不过它的返回只有 0 和 1,compareTo 大于 0 的时候才返回 1,否则返回 0,也就是大于的情况才返回 1,否则返回 0;MaxByComparator 也先根据 Comparable 接口定义的 compareTo 方法获取值,不过它的返回值有 3 种,大于 0 的时候返回 1,等于 0 时返回 0,小于 0 时返回 -1,也就是大于的情况返回 1,相等的情况返回 0,小于的情况返回 -1

小结

KeyedStream 的 aggregation 操作主要分为 sum、max、min、maxBy、minBy 这几个方法,它们内部都调用了 protected 修饰的 aggregation 方法,只是它们创建的 ComparableAggregator 的 AggregationType 不一样
ComparableAggregator 继承了 AggregationFunction,而 AggregationFunction 则实现了 ReduceFunction 接口,这里 ComparableAggregator 实现的 reduce 方法,它首先借助 Comparator 来比较两个对象,然后根据是否是 byAggregate 做不同处理,如果是 byAggregate,则在比较值为 0 时,判断是否返回最先遇到的元素,如果是则返回最先遇到的,否则返回最后遇到的,比较值非 0 时,则取比较值最大的元素返回;如果不是 byAggregate,则如果比较值为 0,则使用反射方法将后者的值更新到 value1,最后都是返回 value1
Comparator 里头定义了 MaxComparator、MinComparator、MinByComparator、MaxByComparator 四个子类,它们都实现了 isExtremal 方法;MaxComparator 与 MaxByComparator 的区别在于,MaxComparator 大于返回 1,小于等于返回 0,而 MaxByComparator 返回值更精细,大于返回 1,等于返回 0,小于返回 -1;这个区别也体现在 ComparableAggregator 的 reduce 方法中,而且 maxBy、minBy 比其他方法多了一个 first(boolean) 参数,专门用于在比较值为的 0 的时候选择返回哪个元素;而 reduce 方法对于非 byAggregate 操作,始终返回的是 value1,在比较值小于等于的时候,使用反射更新 value1,然后返回 value1

doc
DataStream Transformations

正文完
 0