本系列每篇文章都比拟短小,不定期更新,从一些理论的 case 登程抛砖引玉,进步小伙伴的姿♂势程度。本文介绍在满足原有需要、实现原有逻辑的场景下,在 Flink 中应用 union 代替 cogroup(或者join) ,简化工作逻辑,晋升工作性能的办法,浏览时长大略一分钟,话不多说,间接进入注释!
## 需要场景剖析

需要场景

需要诱诱诱来了。。。数据产品妹妹想要统计单个短视频粒度的点赞,播放,评论,分享,举报五类实时指标,并且汇总成 photo_id、1 分钟工夫粒度的实时视频生产宽表(即宽表字段至多为:photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp)产出至实时大屏。
问题在于对同一个视频,五类视频消费行为的触发机制以及上报工夫是不同,也就决定了对实时处理来说五类行为日志对应着五个不同的数据源。sql boy 们天然就想到了 join 操作将五类消费行为日志合并,可是实时 join(cogroup) 真的那么完满咩~,下文细谈。

source 输出以及特点

首先咱们剖析下需要中的 source 特点:

  • photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)明细数据,用户播放(点赞、评论...)n 次,客户端服务端就会上传 n 条播放(点赞、评论...)日志至数据源
  • 五类视频消费行为日志的 source schema 都为:photo_id + timestamp + 其余维度
    ### sink 输入以及特点

sink 特点如下:

  • photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)1 分钟级别窗口聚合数据
  • 实时视频生产宽表 sink schema 为:photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp

source、sink 样例数据

source 数据:

photo_idtimestampuser_id阐明
12020/10/3 11:30:333播放
12020/10/3 11:30:334播放
12020/10/3 11:30:335播放
12020/10/3 11:30:334点赞
22020/10/3 11:30:335点赞
12020/10/3 11:30:335评论

sink 数据:

photo_idtimestampplay_cntlike_cntcomment_cnt
12020/10/3 11:30:00311
22020/10/3 11:30:00010

咱们曾经对数据源输出和输入有了残缺的剖析,那就瞧瞧有什么计划能够实现上述需要吧。

实现计划

  • 计划1:本大节 cogroup 计划间接生产原始日志数据,对五类不同的视频消费行为日志应用 cogroup 或者 join 进行窗口聚合计算
  • 计划2:对五类不同的视频消费行为日志别离独自聚合计算出分钟粒度指标数据,上游再对聚合好的指标数据依照 photo_id 进行合并
  • 计划3:本大节 union 计划既然数据源 schema 雷同,间接对五类不同的视频消费行为日志做 union 操作,在后续的窗口函数中对五类指标进行聚合计算。后文介绍 union 计划的设计过程

咱们先上 cogroup 计划的示例代码。

cogroup

cogroup 实现示例如下,示例代码间接应用了解决工夫(也可替换为事件工夫~),因而对数据源的工夫戳做了简化(间接干掉):

public class Cogroup { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Long -> photo_id 播放一次  DataStream<Long> play = SourceFactory.getDataStream(xxx);  // Long -> photo_id 点赞一次  DataStream<Long> like = SourceFactory.getDataStream(xxx);  // Long -> photo_id 评论一次  DataStream<Long> comment = SourceFactory.getDataStream(xxx);  // Long -> photo_id 分享一次  DataStream<Long> share = SourceFactory.getDataStream(xxx);  // Long -> photo_id 举报一次  DataStream<Long> negative = SourceFactory.getDataStream(xxx); // Tuple3<Long, Long, Long> -> photo_id + play_cnt + like_cnt 播放和点赞的数据合并  DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play .coGroup(like) .where(KeySelectorFactory.get(Function.identity())) .equalTo(KeySelectorFactory.get(Function.identity())) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(xxx1); // Tuple4<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt 播放、点赞、评论的数据合并  DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt .coGroup(comment) .where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0)) .equalTo(KeySelectorFactory.get(Function.identity())) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(xxx2); // Tuple5<Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt 播放、点赞、评论、分享的数据合并  DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment .coGroup(share) .where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0)) .equalTo(KeySelectorFactory.get(Function.identity())) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(xxx2); // Tuple7<Long, Long, Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp 播放、点赞、评论、分享、举报的数据合并 // 同上~  DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***; env.execute(); }}

粗犷一想,下面这样一搞不就完结了么,事件没那么简略,咱们来做一个具体点的剖析。

上述实现可能会存在的问题点

  • 从 flink 生产到 play 数据源的一条数据到最终产出这条数据被聚合后的数据,整个过程的数据提早 > 3 分钟...
  • 如果数据源继续减少(比方增加其余视频生产操作数据源),则整个工作算子变多,数据链路更长,工作稳定性会变差,产出数据提早也会随着窗口计算变多,提早更久
数据产品妹妹:????,小哥哥好棒,既然问题点都剖析进去了,技术小哥哥就帮人家解决一下嘛~

头文字 ∩ 技术小哥哥:搞。

头文字 ∩ 技术小哥哥:既然可能因为过多的窗口导致数据产出提早,job 不稳固,那有没有什么办法缩小窗口数量呢,思路转换一下。咱们间接以整个 job 中只蕴含一个窗口算子操作为基点,逆推一下,则有以下数据链路。

逆推链路

1 - 5 为逆推的整条链路。

  • 1.五类指标的数据都在单个窗口中计算
  • 2.五类指标的窗口 model 雷同
  • 3.keyby 中的 key 统一(photo_id)
  • 4.五类指标的数据源都为 photo_id 粒度,并且五类数据源的 model 都必须雷同,并且能够做合并
  • 5.union 算子能够对五类数据源做合并!!!

话不多说间接上 union 计划代码。

union

public class Union { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Tuple2<Long, String> -> photo_id + "PLAY"标签  DataStream<Tuple2<Long, String>> play = SourceFactory.getDataStream(xxx);  // Tuple2<Long, String> -> photo_id + "LIKE"标签  DataStream<Tuple2<Long, String>> like = SourceFactory.getDataStream(xxx);  // Tuple2<Long, String> -> photo_id + "COMMENT"标签  DataStream<Tuple2<Long, String>> comment = SourceFactory.getDataStream(xxx);  // Tuple2<Long, String> -> photo_id + "SHARE"标签  DataStream<Tuple2<Long, String>> share = SourceFactory.getDataStream(xxx);  // Tuple2<Long, String> -> photo_id + "NEGATIVE"标签  DataStream<Tuple2<Long, String>> negative = SourceFactory.getDataStream(xxx); // Tuple5<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + window_start_timestamp  DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play .union(like) .union(comment) .union(share) .union(negative) .keyBy(KeySelectorFactory.get(i -> i.f0)) .timeWindow(Time.seconds(60)) .process(xxx); env.execute(); }}

能够发现,无论上游数据源怎么进行变动,上述 union 计划中始终能够放弃只有一个窗口算子解决和计算数据,则能够解决之前列举的数据提早以及 flink 工作算子过多的问题。
在数据源的 schema 雷同(或者不同但通过解决之后能够 format 成雷同格局)的状况下,或者解决逻辑雷同的话,能够应用 union 进行逻辑简化。

总结

本文首先介绍了咱们的需要场景,第二局部剖析了应用 cogroup(案例代码)是如何解决此需要场景,再剖析了此实现计划可能会存在一些问题,并引出了 union 解决方案的逆推和设计思路。
在第三局部针对此场景应用 union 代替 cogroup 进行了肯定水平上的优化。如果针对此场景,大佬们有更好的优化计划的话,期待留言喔。