序
本文主要研究一下 flink DataStream 的 window coGroup 操作
实例
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {…});
这里展示了 DataStream 的 window coGroup 操作的基本用法
DataStream.coGroup
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//……
public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
return new CoGroupedStreams<>(this, otherStream);
}
//……
}
DataStream 的 coGroup 操作创建的是 CoGroupedStreams
CoGroupedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public
public class CoGroupedStreams<T1, T2> {
private final DataStream<T1> input1;
private final DataStream<T2> input2;
public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);
}
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return where(keySelector, keyType);
}
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) {
Preconditions.checkNotNull(keySelector);
Preconditions.checkNotNull(keyType);
return new Where<>(input1.clean(keySelector), keyType);
}
//…….
}
CoGroupedStreams 提供了 where 操作,用于指定 input1 的 keySelector,它创建并返回 Where 对象
Where
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public
public class Where<KEY> {
private final KeySelector<T1, KEY> keySelector1;
private final TypeInformation<KEY> keyType;
Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keyType = keyType;
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return equalTo(keySelector, otherKey);
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) {
Preconditions.checkNotNull(keySelector);
Preconditions.checkNotNull(keyType);
if (!keyType.equals(this.keyType)) {
throw new IllegalArgumentException(“The keys for the two inputs are not equal: ” +
“first key = ” + this.keyType + ” , second key = ” + keyType);
}
return new EqualTo(input2.clean(keySelector));
}
//……
}
Where 对象提供了 equalTo 操作,用于指定 input2 的 keySelector,它创建并返回 EqualTo 对象
EqualTo
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public
public class EqualTo {
private final KeySelector<T2, KEY> keySelector2;
EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}
@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
EqualTo 对象提供了 window 操作,它创建并返回 WithWindow 对象
WithWindow
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public
public static class WithWindow<T1, T2, KEY, W extends Window> {
private final DataStream<T1> input1;
private final DataStream<T2> input2;
private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;
private final TypeInformation<KEY> keyType;
private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
private final Time allowedLateness;
private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
this.keyType = keyType;
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
this.allowedLateness = allowedLateness;
}
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, newTrigger, evictor, allowedLateness);
}
@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, newEvictor, allowedLateness);
}
@PublicEvolving
public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, evictor, newLateness);
}
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
function,
input1.getType(),
input2.getType(),
“CoGroup”,
false);
return apply(function, resultType);
}
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}
@VisibleForTesting
Time getAllowedLateness() {
return allowedLateness;
}
@VisibleForTesting
WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
return windowedStream;
}
}
WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作 (with 操作被标记为废弃)
apply 操作接收 CoGroupFunction,它内部是先根据两个 keySelector 创建 UnionKeySelector,然后对两个 input stream 分别使用 Input1Tagger 及 Input2Tagger 进行 map 转换为 TaggedUnion 对象的 stream,然后执行 taggedInput1.union(taggedInput2) 得到 unionStream,之后使用 UnionKeySelector 将 unionStream 转换为 KeyedStream,之后在对 KeyedStream 执行 window 操作,把原来的 windowAssigner、trigger、evictor、allowedLateness 都赋值过去,最后将用户定义的 CoGroupFunction 包装为 CoGroupWindowFunction,然后调用 windowedStream.apply 方法
可以看到 apply 操作内部转化的 WindowedStream,其 element 类型为 TaggedUnion;WindowedStream 使用的 KeyedStream,它的 KeySelector 为 UnionKeySelector;而 KeyedStream 是基于 TaggedUnion 类型的 DataStream,是 taggedInput1.union(taggedInput2) 操作而来;而 taggedInput1 及 taggedInput2 是对原始 input stream 进行 map 操作而来,使用的 MapFunction 分别是 Input1Tagger 及 Input2Tagger
CoGroupFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java
@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
CoGroupFunction 继承了 Function,它定义了 coGroup 方法,该方法接收两个 Iterable 类型的 element 集合
Input1Tagger 及 Input2Tagger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;
@Override
public TaggedUnion<T1, T2> map(T1 value) throws Exception {
return TaggedUnion.one(value);
}
}
private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;
@Override
public TaggedUnion<T1, T2> map(T2 value) throws Exception {
return TaggedUnion.two(value);
}
}
Input1Tagger 及 Input2Tagger 实现了 MapFunction,该 map 方法返回的类型为 TaggedUnion
TaggedUnion
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Internal
public static class TaggedUnion<T1, T2> {
private final T1 one;
private final T2 two;
private TaggedUnion(T1 one, T2 two) {
this.one = one;
this.two = two;
}
public boolean isOne() {
return one != null;
}
public boolean isTwo() {
return two != null;
}
public T1 getOne() {
return one;
}
public T2 getTwo() {
return two;
}
public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
return new TaggedUnion<>(one, null);
}
public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
return new TaggedUnion<>(null, two);
}
}
TaggedUnion 里头有 one、two 两个属性,它提供了两个静态工厂方法 one 及 two,可以看到 TaggedUnion 对象要么 one 为 null,要么 two 为 null,不可能两个同时有值
UnionKeySelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
private static final long serialVersionUID = 1L;
private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;
public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2) {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
}
@Override
public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
if (value.isOne()) {
return keySelector1.getKey(value.getOne());
} else {
return keySelector2.getKey(value.getTwo());
}
}
}
UnionKeySelector 有两个 KeySelector 属性,它的 getKey 操作根据 TaggedUnion 来判断,如果是 one,则使用 keySelector1.getKey(value.getOne()),否则使用 keySelector2.getKey(value.getTwo())
DataStream.union
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//……
@SafeVarargs
public final DataStream<T> union(DataStream<T>… streams) {
List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
unionedTransforms.add(this.transformation);
for (DataStream<T> newStream : streams) {
if (!getType().equals(newStream.getType())) {
throw new IllegalArgumentException(“Cannot union streams of different types: ”
+ getType() + ” and ” + newStream.getType());
}
unionedTransforms.add(newStream.getTransformation());
}
return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
}
//……
}
DataStream 的 union 操作,使用 UnionTransformation 创建了一个新的 DataStream;注意 union 操作需要两个 stream 使用相同类型的 element,这就是为什么 WithWindow 的 apply 操作对两个 input stream 分别使用 Input1Tagger 及 Input2Tagger 进行 map 转换为 TaggedUnion 对象来统一两个 stream 的 element 类型的原因
CoGroupWindowFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}
@Override
public void apply(KEY key,
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
CoGroupWindowFunction 继承了 WrappingFunction(WrappingFunction 继承了 AbstractRichFunction,覆盖了父类的 open、close、setRuntimeContext 方法,用于管理 wrappedFunction),实现了 WindowFunction 接口,其 apply 方法对 TaggedUnion 类型的 Iterable 数据进行拆解,分别拆分到 oneValues 及 twoValues 中,然后调用用户定义的 CoGroupFunction 的 coGroup 方法
小结
DataStream 提供了 coGroup 方法,用于执行 window coGroup 操作,它返回的是 CoGroupedStreams;CoGroupedStreams 主要是提供 where 操作来构建 Where 对象;Where 对象主要提供 equalTo 操作用于构建 EqualTo 对象;EqualTo 对象提供 window 操作用于构建 WithWindow 对象;WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作
CoGroupedStreams 的 WithWindow 对象的 apply 操作接收 CoGroupFunction,它内部是先根据两个 keySelector 创建 UnionKeySelector,然后对两个 input stream 分别使用 Input1Tagger 及 Input2Tagger 进行 map 转换为 TaggedUnion 对象的 stream,然后执行 taggedInput1.union(taggedInput2) 得到 unionStream,之后使用 UnionKeySelector 将 unionStream 转换为 KeyedStream,之后在对 KeyedStream 执行 window 操作,把原来的 windowAssigner、trigger、evictor、allowedLateness 都赋值过去,最后将用户定义的 CoGroupFunction 包装为 CoGroupWindowFunction,然后调用 windowedStream.apply 方法
CoGroupedStreams 的 WithWindow 对象的 apply 操作借助了 DataStream 的 union 操作类合并两个 stream,然后转换为 KeyedStream,这里关键的两个类分别是 TaggedUnion 及 UnionKeySelector;TaggedUnion 里头有 one、two 两个属性,它提供了两个静态工厂方法 one 及 two,可以看到 TaggedUnion 对象要么 one 为 null,要么 two 为 null,不可能两个同时有值;UnionKeySelector 有两个 KeySelector 属性,它的 getKey 操作根据 TaggedUnion 来判断,如果是 one,则使用 keySelector1.getKey(value.getOne()),否则使用 keySelector2.getKey(value.getTwo())(借助 TaggedUnion 类统一两个 stream 的 element 类型,然后好执行 union 操作)
CoGroupWindowFunction 继承了 WrappingFunction(WrappingFunction 继承了 AbstractRichFunction,覆盖了父类的 open、close、setRuntimeContext 方法,用于管理 wrappedFunction),实现了 WindowFunction 接口,其 apply 方法对 TaggedUnion 类型的 Iterable 数据进行拆解,分别拆分到 oneValues 及 twoValues 中,然后调用用户定义的 CoGroupFunction 的 coGroup 方法
CoGroupFunction 继承了 Function,它定义了 coGroup 方法,该方法接收两个 Iterable 类型的 element 集合;JoinedStreams 的 WithWindow 对象的 apply 方法内部将 JoinFunction 或者 FlatJoinFunction 包装为 CoGroupFunction(JoinFunction 使用 JoinCoGroupFunction 包装,FlatJoinFunction 使用 FlatJoinCoGroupFunction 包装),然后去调用 CoGroupedStreams 的 WithWindow 的 apply 方法;而 JoinCoGroupFunction 及 FlatJoinCoGroupFunction 继承了 WrappingFunction,同时实现 CoGroupFunction 接口定义的 coGroup 方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行 JoinFunction 或 FlatJoinFunction 的 join 方法 (这里的操作对集合为空的情况不做任何操作,因而实现的就是 inner join 效果;用户使用 coGroup 操作可以自定义 CoGroupFunction 实现 outer join)
doc
DataStream Transformations
聊聊 flink DataStream 的 join 操作
Apache Flink using coGroup to achieve left-outer join