聊聊flink DataStream的join操作

5次阅读

共计 18172 个字符,预计需要花费 46 分钟才能阅读完成。


本文主要研究一下 flink DataStream 的 join 操作
实例
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
这里首先调用 join,与另外一个 stream 合并,返回的是 JoinedStreams,之后就可以调用 JoinedStreams 的 where 操作来构建 Where 对象构造条件;Where 有 equalTo 操作可以构造 EqualTo,而 EqualTo 有 window 操作可以构造 WithWindow,而 WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作
DataStream.join
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//……

/**
* Creates a join operation. See {@link JoinedStreams} for an example of how the keys
* and window can be specified.
*/
public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
return new JoinedStreams<>(this, otherStream);
}

//……
}
DataStream 提供了 join 方法,用于执行 join 操作,它返回的是 JoinedStreams
JoinedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public
public class JoinedStreams<T1, T2> {

/** The first input stream. */
private final DataStream<T1> input1;

/** The second input stream. */
private final DataStream<T2> input2;

public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);
}

public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return where(keySelector, keyType);
}

public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) {
requireNonNull(keySelector);
requireNonNull(keyType);
return new Where<>(input1.clean(keySelector), keyType);
}

//……
}
JoinedStreams 主要是提供 where 操作来构建 Where 对象
Where
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public
public class Where<KEY> {

private final KeySelector<T1, KEY> keySelector1;
private final TypeInformation<KEY> keyType;

Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keyType = keyType;
}

public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return equalTo(keySelector, otherKey);
}

public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) {
requireNonNull(keySelector);
requireNonNull(keyType);

if (!keyType.equals(this.keyType)) {
throw new IllegalArgumentException(“The keys for the two inputs are not equal: ” +
“first key = ” + this.keyType + ” , second key = ” + keyType);
}

return new EqualTo(input2.clean(keySelector));
}

//……

}
Where 对象主要提供 equalTo 操作用于构建 EqualTo 对象
EqualTo
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public
public class EqualTo {

private final KeySelector<T2, KEY> keySelector2;

EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}

/**
* Specifies the window on which the join operation works.
*/
@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
EqualTo 对象提供 window 操作用于构建 WithWindow 对象
WithWindow
/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@Public
public static class WithWindow<T1, T2, KEY, W extends Window> {

private final DataStream<T1> input1;
private final DataStream<T2> input2;

private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;
private final TypeInformation<KEY> keyType;

private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

private final Time allowedLateness;

private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;

@PublicEvolving
protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
Time allowedLateness) {

this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);

this.keySelector1 = requireNonNull(keySelector1);
this.keySelector2 = requireNonNull(keySelector2);
this.keyType = requireNonNull(keyType);

this.windowAssigner = requireNonNull(windowAssigner);

this.trigger = trigger;
this.evictor = evictor;

this.allowedLateness = allowedLateness;
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, newTrigger, evictor, allowedLateness);
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, newEvictor, allowedLateness);
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, evictor, newLateness);
}

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
JoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
input1.getType(),
input2.getType(),
“Join”,
false);

return apply(function, resultType);
}

@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}

public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);

coGroupedWindowedStream = input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);

return coGroupedWindowedStream
.apply(new FlatJoinCoGroupFunction<>(function), resultType);
}

@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}

public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
function,
FlatJoinFunction.class,
0,
1,
2,
new int[]{2, 0},
input1.getType(),
input2.getType(),
“Join”,
false);

return apply(function, resultType);
}

@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);

coGroupedWindowedStream = input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);

return coGroupedWindowedStream
.apply(new JoinCoGroupFunction<>(function), resultType);
}

@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}

@VisibleForTesting
Time getAllowedLateness() {
return allowedLateness;
}

@VisibleForTesting
CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
return coGroupedWindowedStream;
}
}

WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作 (with 操作被标记为废弃)
apply 操作可以接收 JoinFunction 或者 FlatJoinFunction,它内部是使用 DataStream 的 coGroup 方法创建 CoGroupedStreams,之后将自身的 where 及 equalTo 的 keySelector、windowAssigner、trigger、evictor、allowedLateness 都设置给 CoGroupedStreams,最后调用 CoGroupedStreams 的 WithWindow 对象的 apply 方法
CoGroupedStreams 的 WithWindow 对象的 apply 方法与 JoinedStreams 的 WithWindow 对象的 apply 方法参数不同,CoGroupedStreams 的 WithWindow 的 apply 方法接收的是 CoGroupFunction,因而 JoinedStreams 的 WithWindow 对象的 apply 方法内部将 JoinFunction 或者 FlatJoinFunction 包装为 CoGroupFunction(JoinFunction 使用 JoinCoGroupFunction 包装,FlatJoinFunction 使用 FlatJoinCoGroupFunction 包装) 传递给 CoGroupedStreams 的 WithWindow 的 apply 方法

JoinFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.java
@Public
@FunctionalInterface
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {

/**
* The join method, called once per joined pair of elements.
*
* @param first The element from first input.
* @param second The element from second input.
* @return The resulting element.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
OUT join(IN1 first, IN2 second) throws Exception;
}
JoinFunction 继承了 Function、Serializable,它定义了 join 操作,默认是 inner join 的语义,如果需要 outer join,可以使用 CoGroupFunction
FlatJoinFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.java
@Public
@FunctionalInterface
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {

/**
* The join method, called once per joined pair of elements.
*
* @param first The element from first input.
* @param second The element from second input.
* @param out The collector used to return zero, one, or more elements.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
FlatJoinFunction 继承了 Function、Serializable,它定义了 join 操作,默认是 inner join 的语义,如果需要 outer join,可以使用 CoGroupFunction;与 JoinFunction 的 join 方法不同,FlatJoinFunction 的 join 方法多了 Collector 参数,可以用来发射 0 条、1 条或者多条数据,所以是 Flat 命名
CoGroupedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@Public
public class CoGroupedStreams<T1, T2> {
//……

@Public
public static class WithWindow<T1, T2, KEY, W extends Window> {
private final DataStream<T1> input1;
private final DataStream<T2> input2;

private final KeySelector<T1, KEY> keySelector1;
private final KeySelector<T2, KEY> keySelector2;

private final TypeInformation<KEY> keyType;

private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

private final Time allowedLateness;

private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;

this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
this.keyType = keyType;

this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;

this.allowedLateness = allowedLateness;
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, newTrigger, evictor, allowedLateness);
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, newEvictor, allowedLateness);
}

@PublicEvolving
public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
windowAssigner, trigger, evictor, newLateness);
}

public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
function,
input1.getType(),
input2.getType(),
“CoGroup”,
false);

return apply(function, resultType);
}

public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);

UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType);

DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);

if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}

return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}

//……

}

//……
}
CoGroupedStreams 的整体类结构跟 JoinedStreams 很像,CoGroupedStreams 提供 where 操作来构建 Where 对象;Where 对象主要提供 equalTo 操作用于构建 EqualTo 对象;EqualTo 对象提供 window 操作用于构建 WithWindow 对象;WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作;其中一个不同的地方是 CoGroupedStreams 定义的 WithWindow 对象的 apply 操作接收的 Function 是 CoGroupFunction 类型,而 JoinedStreams 定义的 WithWindow 对象的 apply 操作接收的 Function 类型是 JoinFunction 或 FlatJoinFunction
CoGroupFunction
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java
@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

/**
* This method must be implemented to provide a user implementation of a
* coGroup. It is called for each pair of element groups where the elements share the
* same key.
*
* @param first The records from the first input.
* @param second The records from the second.
* @param out A collector to return elements.
*
* @throws Exception The function may throw Exceptions, which will cause the program to cancel,
* and may trigger the recovery logic.
*/
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
CoGroupFunction 继承了 Function、Serializable,它定义了 coGroup 操作,可以用来实现 outer join,其参数使用的是 Iterable,而 JoinFunction 与 FlatJoinFunction 的 join 参数使用的是单个对象类型
WrappingFunction
flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {

private static final long serialVersionUID = 1L;

protected T wrappedFunction;

protected WrappingFunction(T wrappedFunction) {
this.wrappedFunction = wrappedFunction;
}

@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(this.wrappedFunction, parameters);
}

@Override
public void close() throws Exception {
FunctionUtils.closeFunction(this.wrappedFunction);
}

@Override
public void setRuntimeContext(RuntimeContext t) {
super.setRuntimeContext(t);

FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
}

public T getWrappedFunction () {
return this.wrappedFunction;
}
}
WrappingFunction 继承了 AbstractRichFunction,这里它覆盖了父类的 open、close、setRuntimeContext 方法,用于管理 wrappedFunction
JoinCoGroupFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/**
* CoGroup function that does a nested-loop join to get the join result.
*/
private static class JoinCoGroupFunction<T1, T2, T>
extends WrappingFunction<JoinFunction<T1, T2, T>>
implements CoGroupFunction<T1, T2, T> {
private static final long serialVersionUID = 1L;

public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
super(wrappedFunction);
}

@Override
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
for (T1 val1: first) {
for (T2 val2: second) {
out.collect(wrappedFunction.join(val1, val2));
}
}
}
}

JoinCoGroupFunction 继承了 WrappingFunction,同时实现 CoGroupFunction 接口定义的 coGroup 方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行 wrappedFunction.join,然后发射 join 数据
JoinedStreams 定义了私有静态类 JoinCoGroupFunction,JoinedStreams 的 WithWindow 对象的 apply 方法内部使用它将 JoinFunction 进行包装,然后好调用 CoGroupedStreams 的 WithWindow 的 apply 方法
JoinFunction 定义的 join 方法,接收的是两个对象类型参数,而 JoinCoGroupFunction 定义的 coGroup 方法,接收的两个 Iterable 类型参数

FlatJoinCoGroupFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/**
* CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
*/
private static class FlatJoinCoGroupFunction<T1, T2, T>
extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
implements CoGroupFunction<T1, T2, T> {
private static final long serialVersionUID = 1L;

public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
super(wrappedFunction);
}

@Override
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
for (T1 val1: first) {
for (T2 val2: second) {
wrappedFunction.join(val1, val2, out);
}
}
}
}

FlatJoinCoGroupFunction 继承了 WrappingFunction,同时实现 CoGroupFunction 接口定义的 coGroup 方法,默认是遍历第一个集合,对其每个元素遍历第二个集合,挨个执行 wrappedFunction.join,然后发射 join 数据
JoinedStreams 定义了私有静态类 FlatJoinCoGroupFunction,JoinedStreams 的 WithWindow 对象的 apply 方法内部使用它将 FlatJoinFunction 进行包装,然后好调用 CoGroupedStreams 的 WithWindow 的 apply 方法
FlatJoinFunction 定义的 join 方法,接收的是两个对象类型参数,而 FlatJoinCoGroupFunction 定义的 coGroup 方法,接收的两个 Iterable 类型参数

小结

DataStream 提供了 join 方法,用于执行 join 操作,它返回的是 JoinedStreams;JoinedStreams 主要是提供 where 操作来构建 Where 对象;Where 对象主要提供 equalTo 操作用于构建 EqualTo 对象;EqualTo 对象提供 window 操作用于构建 WithWindow 对象;WithWindow 可以设置 windowAssigner、trigger、evictor、allowedLateness,它提供 apply 操作
apply 操作可以接收 JoinFunction 或者 FlatJoinFunction,它内部是使用 DataStream 的 coGroup 方法创建 CoGroupedStreams,之后将自身的 where 及 equalTo 的 keySelector、windowAssigner、trigger、evictor、allowedLateness 都设置给 CoGroupedStreams,最后调用 CoGroupedStreams 的 WithWindow 对象的 apply 方法;JoinFunction 及 FlatJoinFunction 都继承了 Function、Serializable,它定义了 join 操作,默认是 inner join 的语义,如果需要 outer join,可以使用 CoGroupFunction;而 FlatJoinFunction 与 JoinFunction 的 join 的不同之处的在于 FlatJoinFunction 的 join 方法多了 Collector 参数,可以用来发射 0 条、1 条或者多条数据,所以是 Flat 命名
CoGroupedStreams 的 WithWindow 对象的 apply 方法与 JoinedStreams 的 WithWindow 对象的 apply 方法参数不同,CoGroupedStreams 的 WithWindow 的 apply 方法接收的是 CoGroupFunction,因而 JoinedStreams 的 WithWindow 对象的 apply 方法内部将 JoinFunction 或者 FlatJoinFunction 包装为 CoGroupFunction(JoinFunction 使用 JoinCoGroupFunction 包装,FlatJoinFunction 使用 FlatJoinCoGroupFunction 包装) 传递给 CoGroupedStreams 的 WithWindow 的 apply 方法;JoinCoGroupFunction 与 FlatJoinCoGroupFunction 都继承了 WrappingFunction(它继承了 AbstractRichFunction,这里它覆盖了父类的 open、close、setRuntimeContext 方法,用于管理 wrappedFunction),同时实现 CoGroupFunction 接口定义的 coGroup 方法,不同的是一个是包装 JoinFunction,一个是包装 FlatJoinFunction,不同的是后者是包装 FlatJoinFunction,因而 join 方法多传递了 out 参数

doc

Joining
Flink 原理与实现:数据流上的类型和操作
JoinedStreams 与 CoGroupedStreams 实现原理和区别

正文完
 0