聊聊flink KeyedStream的intervalJoin操作

33次阅读

共计 13729 个字符,预计需要花费 35 分钟才能阅读完成。


本文主要研究一下 flink KeyedStream 的 intervalJoin 操作
实例
DataStream<Integer> orangeStream = …
DataStream<Integer> greenStream = …

orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){

@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + “,” + second);
}
});
KeyedStream.intervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
//……

@PublicEvolving
public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
return new IntervalJoin<>(this, otherStream);
}

//……
}
KeyedStream 的 intervalJoin 创建并返回 IntervalJoin
IntervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving
public static class IntervalJoin<T1, T2, KEY> {

private final KeyedStream<T1, KEY> streamOne;
private final KeyedStream<T2, KEY> streamTwo;

IntervalJoin(
KeyedStream<T1, KEY> streamOne,
KeyedStream<T2, KEY> streamTwo
) {
this.streamOne = checkNotNull(streamOne);
this.streamTwo = checkNotNull(streamTwo);
}

@PublicEvolving
public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

TimeCharacteristic timeCharacteristic =
streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();

if (timeCharacteristic != TimeCharacteristic.EventTime) {
throw new UnsupportedTimeCharacteristicException(“Time-bounded stream joins are only supported in event time”);
}

checkNotNull(lowerBound, “A lower bound needs to be provided for a time-bounded join”);
checkNotNull(upperBound, “An upper bound needs to be provided for a time-bounded join”);

return new IntervalJoined<>(
streamOne,
streamTwo,
lowerBound.toMilliseconds(),
upperBound.toMilliseconds(),
true,
true
);
}
}
IntervalJoin 提供了 between 操作,用于设置 interval 的 lowerBound 及 upperBound,这里可以看到 between 方法里头对非 TimeCharacteristic.EventTime 的直接抛出 UnsupportedTimeCharacteristicException;between 操作创建并返回 IntervalJoined
IntervalJoined
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving
public static class IntervalJoined<IN1, IN2, KEY> {

private final KeyedStream<IN1, KEY> left;
private final KeyedStream<IN2, KEY> right;

private final long lowerBound;
private final long upperBound;

private final KeySelector<IN1, KEY> keySelector1;
private final KeySelector<IN2, KEY> keySelector2;

private boolean lowerBoundInclusive;
private boolean upperBoundInclusive;

public IntervalJoined(
KeyedStream<IN1, KEY> left,
KeyedStream<IN2, KEY> right,
long lowerBound,
long upperBound,
boolean lowerBoundInclusive,
boolean upperBoundInclusive) {

this.left = checkNotNull(left);
this.right = checkNotNull(right);

this.lowerBound = lowerBound;
this.upperBound = upperBound;

this.lowerBoundInclusive = lowerBoundInclusive;
this.upperBoundInclusive = upperBoundInclusive;

this.keySelector1 = left.getKeySelector();
this.keySelector2 = right.getKeySelector();
}

@PublicEvolving
public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
this.upperBoundInclusive = false;
return this;
}

@PublicEvolving
public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
this.lowerBoundInclusive = false;
return this;
}

@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
Preconditions.checkNotNull(processJoinFunction);

final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
processJoinFunction,
ProcessJoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
left.getType(),
right.getType(),
Utils.getCallLocationName(),
true
);

return process(processJoinFunction, outputType);
}

@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);

final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);

return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform(“Interval Join”, outputType, operator);
}
}
IntervalJoined 默认对 lowerBound 及 upperBound 是 inclusive 的,它也提供了 lowerBoundExclusive、upperBoundExclusive 来单独设置为 exclusive;IntervalJoined 提供了 process 操作,接收的是 ProcessJoinFunction;process 操作里头创建了 IntervalJoinOperator,然后执行 left.connect(right).keyBy(keySelector1, keySelector2).transform(“Interval Join”, outputType, operator),返回的是 SingleOutputStreamOperator(本实例 left 为 orangeStream,right 为 greenStream)
ProcessJoinFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@PublicEvolving
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {

private static final long serialVersionUID = -2444626938039012398L;

public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;

public abstract class Context {

public abstract long getLeftTimestamp();

public abstract long getRightTimestamp();

public abstract long getTimestamp();

public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
ProcessJoinFunction 继承了 AbstractRichFunction,它定义了 processElement 抽象方法,同时也定义了自身的 Context 对象,该对象定义了 getLeftTimestamp、getRightTimestamp、getTimestamp、output 四个抽象方法
IntervalJoinOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {

private static final long serialVersionUID = -5380774605111543454L;

private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);

private static final String LEFT_BUFFER = “LEFT_BUFFER”;
private static final String RIGHT_BUFFER = “RIGHT_BUFFER”;
private static final String CLEANUP_TIMER_NAME = “CLEANUP_TIMER”;
private static final String CLEANUP_NAMESPACE_LEFT = “CLEANUP_LEFT”;
private static final String CLEANUP_NAMESPACE_RIGHT = “CLEANUP_RIGHT”;

private final long lowerBound;
private final long upperBound;

private final TypeSerializer<T1> leftTypeSerializer;
private final TypeSerializer<T2> rightTypeSerializer;

private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;

private transient InternalTimerService<String> internalTimerService;

public IntervalJoinOperator(
long lowerBound,
long upperBound,
boolean lowerBoundInclusive,
boolean upperBoundInclusive,
TypeSerializer<T1> leftTypeSerializer,
TypeSerializer<T2> rightTypeSerializer,
ProcessJoinFunction<T1, T2, OUT> udf) {

super(Preconditions.checkNotNull(udf));

Preconditions.checkArgument(lowerBound <= upperBound,
“lowerBound <= upperBound must be fulfilled”);

// Move buffer by +1 / -1 depending on inclusiveness in order not needing
// to check for inclusiveness later on
this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
this.upperBound = (upperBoundInclusive) ? upperBound : upperBound – 1L;

this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
}

@Override
public void open() throws Exception {
super.open();

collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction);
internalTimerService =
getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);

this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));

this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}

@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}

@SuppressWarnings(“unchecked”)
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {

final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();

if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException(“Long.MIN_VALUE timestamp: Elements used in ” +
“interval stream joins need to have timestamps meaningful timestamps.”);
}

if (isLate(ourTimestamp)) {
return;
}

addToBuffer(ourBuffer, ourValue, ourTimestamp);

for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();

if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}

for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}

long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}

private boolean isLate(long timestamp) {
long currentWatermark = internalTimerService.currentWatermark();
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}

private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

userFunction.processElement(left, right, context, collector);
}

@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {

long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();

logger.trace(“onEventTime @ {}”, timerTimestamp);

switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp – upperBound;
logger.trace(“Removing from left buffer @ {}”, timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace(“Removing from right buffer @ {}”, timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException(“Invalid namespace ” + namespace);
}
}

@Override
public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
// do nothing.
}

//……
}

IntervalJoinOperator 继承了 AbstractUdfStreamOperator 抽象类,实现了 TwoInputStreamOperator 及 Triggerable 接口
IntervalJoinOperator 覆盖了 AbstractUdfStreamOperator(StreamOperator 定义) 的 open、initializeState 方法,它在 open 方法里头创建了 InternalTimerService,传递的 Triggerable 参数为 this,即自身实现的 Triggerable 接口;在 initializeState 方法里头创建了 leftBuffer 和 rightBuffer 两个 MapState
IntervalJoinOperator 实现了 TwoInputStreamOperator 接口定义的 processElement1、processElement2 方法 (TwoInputStreamOperator 接口定义的其他一些方法在 AbstractUdfStreamOperator 的父类 AbstractStreamOperator 中有实现);processElement1、processElement2 方法内部都调用了 processElement 方法,只是传递的 relativeLowerBound、relativeUpperBound、isLeft 参数不同以及 leftBuffer 和 rightBuffer 的传参顺序不同
processElement 方法里头实现了 intervalJoin 的时间匹配逻辑,它会从 internalTimerService 获取 currentWatermark,然后判断 element 是否 late,如果 late 直接返回,否则继续往下执行;之后就是把 element 的 value 添加到 ourBuffer 中 (对于 processElement1 来说 ourBuffer 为 leftBuffer,对于 processElement2 来说 ourBuffer 为 rightBuffer);之后就是遍历 otherBuffer 中的每个元素,挨个判断时间是否满足要求 (即 ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不满足要求的直接跳过,满足要求的就调用 collect 方法 (collect 方法里头执行的是 userFunction.processElement,即调用用户定义的 ProcessJoinFunction 的 processElement 方法);之后就是计算 cleanupTime,调用 internalTimerService.registerEventTimeTimer 注册清理该 element 的 timer
IntervalJoinOperator 实现了 Triggerable 接口定义的 onEventTime 及 onProcessingTime 方法,其中 onProcessingTime 不做任何操作,而 onEventTime 则会根据 timestamp 清理 leftBuffer 或者 rightBuffer 中的 element

小结

flink 的 intervalJoin 操作要求是 KeyedStream,而且必须是 TimeCharacteristic.EventTime;KeyedStream 的 intervalJoin 创建并返回 IntervalJoin;IntervalJoin 提供了 between 操作,用于设置 interval 的 lowerBound 及 upperBound,该操作创建并返回 IntervalJoined
IntervalJoined 提供了 process 操作,接收的是 ProcessJoinFunction;process 操作里头创建了 IntervalJoinOperator,然后执行 left.connect(right).keyBy(keySelector1, keySelector2).transform(“Interval Join”, outputType, operator),返回的是 SingleOutputStreamOperator
IntervalJoinOperator 继承了 AbstractUdfStreamOperator 抽象类,实现了 TwoInputStreamOperator 及 Triggerable 接口;它覆盖了 AbstractUdfStreamOperator(StreamOperator 定义) 的 open、initializeState 方法,它在 open 方法里头创建了 InternalTimerService,传递的 Triggerable 参数为 this,即自身实现的 Triggerable 接口;在 initializeState 方法里头创建了 leftBuffer 和 rightBuffer 两个 MapState;它实现了 TwoInputStreamOperator 接口定义的 processElement1、processElement2 方法,processElement1、processElement2 方法内部都调用了 processElement 方法,只是传递的 relativeLowerBound、relativeUpperBound、isLeft 参数不同以及 leftBuffer 和 rightBuffer 的传参顺序不同
IntervalJoinOperator 的 processElement 方法里头实现了 intervalJoin 的时间匹配逻辑,它首先判断 element 是否 late,如果 late 直接返回,之后将 element 添加到 buffer 中,然后对之后就是遍历 otherBuffer 中的每个元素,挨个判断时间是否满足要求 (即 ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不满足要求的直接跳过,满足要求的就调用 collect 方法 (collect 方法里头执行的是 userFunction.processElement,即调用用户定义的 ProcessJoinFunction 的 processElement 方法);之后就是计算 cleanupTime,调用 internalTimerService.registerEventTimeTimer 注册清理该 element 的 timer
IntervalJoinOperator 实现了 Triggerable 接口定义的 onEventTime 及 onProcessingTime 方法,其中 onProcessingTime 不做任何操作,而 onEventTime 则会根据 timestamp 清理 leftBuffer 或者 rightBuffer 中的 element

doc
Interval Join

正文完
 0