共计 8335 个字符,预计需要花费 21 分钟才能阅读完成。
在数据库中的动态表上做 OLAP 剖析时,两表 join 是十分常见的操作。同理,在流式解决作业中,有时也须要在两条流上做 join 以取得更丰盛的信息。Flink DataStream API 为用户提供了 3 个算子来实现双流 join,别离是:
- join()
- coGroup()
- intervalJoin()
本文举例说明它们的应用办法,顺便聊聊比拟非凡的 interval join 的原理。
筹备数据
从 Kafka 别离接入点击流和订单流,并转化为 POJO。
DataStream<String> clickSourceStream = env .addSource(new FlinkKafkaConsumer011<>( “ods_analytics_access_log”, new SimpleStringSchema(), kafkaProps ).setStartFromLatest()); DataStream<String> orderSourceStream = env .addSource(new FlinkKafkaConsumer011<>( “ods_ms_order_done”, new SimpleStringSchema(), kafkaProps ).setStartFromLatest()); DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class)); DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));
–
join()
join() 算子提供的语义为 ”Window join”,即依照指定字段和(滚动 / 滑动 / 会话)窗口进行 inner join,反对解决工夫和事件工夫两种工夫特色。以下示例以 10 秒滚动窗口,将两个流通过商品 ID 关联,获得订单流中的售价相干字段。
clickRecordStream .join(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Override public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {return StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount()), ‘t’); } }) .print().setParallelism(1);
简略易用。
coGroup()
只有 inner join 必定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用形式相似于 join() 算子,也须要开窗,然而 CoGroupFunction 比 JoinFunction 更加灵便,能够依照用户指定的逻辑匹配左流和 / 或右流的数据并输入。
以下的例子就实现了点击流 left join 订单流的性能,是很奢侈的 nested loop join 思维(二重循环)。
clickRecordStream .coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {@Override public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {for (AnalyticsAccessLogRecord accessRecord : accessRecords) {boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) {// 右流中有对应的记录 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) {// 右流中没有对应的记录 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);
intervalJoin()
join() 和 coGroup() 都是基于窗口做关联的。然而在某些状况下,两条流的数据步调未必统一。例如,订单流的数据有可能在点击流的购买动作产生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了 ”Interval join” 的语义,依照指定字段以及右流绝对左流偏移的工夫区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,尽管不须要开窗,然而须要用户指定偏移区间的上下界,并且只反对事件工夫。
示例代码如下。留神在运行之前,须要别离在两个流上利用 assignTimestampsAndWatermarks() 办法获取事件工夫戳和水印。
clickRecordStream .keyBy(record -> record.getMerchandiseId()) .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Override public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {collector.collect(StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount()), ‘t’)); } }) .print().setParallelism(1);
由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且须要调用 between() 办法指定偏移区间的上下界。如果想令上下界是开区间,能够调用 upperBoundExclusive()/lowerBoundExclusive() 办法。
interval join 的实现原理
以下是 KeyedStream.process(ProcessJoinFunction) 办法调用的重载办法的逻辑。
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) {Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>(lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySelector2) .transform(“Interval Join”, outputType, operator); }
可见是先对两条流执行 connect() 和 keyBy() 操作,而后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 别离缓存左流和右流的数据。
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer; @Override public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context); this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) )); this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) )); }
其中 Long 示意事件工夫戳,List> 示意该时刻到来的数据记录。当左流和右流有数据达到时,会别离调用 processElement1() 和 processElement2() 办法,它们都调用了 processElement() 办法,代码如下。
@Override public void processElement1(StreamRecord<T1> record) throws Exception {processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); } @Override public void processElement2(StreamRecord<T2> record) throws Exception {processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); } @SuppressWarnings(“unchecked”) private <THIS, OTHER> void processElement(final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception {final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) {throw new FlinkException(“Long.MIN_VALUE timestamp: Elements used in ” + “interval stream joins need to have timestamps meaningful timestamps.”); } if (isLate(ourTimestamp)) {return;} addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) {continue;} for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }
这段代码的思路是:
- 获得以后流 StreamRecord 的工夫戳,调用 isLate() 办法判断它是否是早退数据(即工夫戳小于以后水印值),如是则抛弃。
- 调用 addToBuffer() 办法,将工夫戳和数据一起插入以后流对应的 MapState。
- 遍历另外一个流的 MapState,如果数据满足前述的工夫区间条件,则调用 collect() 办法将该条数据投递给用户定义的 ProcessJoinFunction 进行解决。collect() 办法的代码如下,留神后果对应的工夫戳是左右流工夫戳里较大的那个。
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); }
- 调用 TimerService.registerEventTimeTimer() 注册工夫戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,避免数据沉积。留神左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 办法中。
@Override public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace(“onEventTime @ {}”, timerTimestamp); switch (namespace) {case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp – upperBound; logger.trace(“Removing from left buffer @ {}”, timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace(“Removing from right buffer @ {}”, timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException(“Invalid namespace ” + namespace); } }
原文链接
本文为阿里云原创内容,未经容许不得转载。