flink窗口函数蕴含滚动窗口、滑动窗口、会话窗口和OVER窗口
滚动窗口滚动窗口(TUMBLE)将每个元素调配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会呈现重叠。例如,如果指定了一个5分钟大小的滚动窗口,有限流的数据会依据工夫划分为[0:00 - 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。下图展现了一个30秒的滚动窗口。应用标识函数选出窗口的起始工夫或者完结工夫,窗口的工夫属性用于上级Window的聚合。
窗口标识函数返回类型形容TUMBLE_START(time-attr, size-interval)TIMESTAMP返回窗口的起始工夫(蕴含边界)。例如[00:10, 00:15) 窗口,返回00:10 。TUMBLE_END(time-attr, size-interval)TIMESTAMP返回窗口的完结工夫(蕴含边界)。例如[00:00, 00:15]窗口,返回00:15。TUMBLE_ROWTIME(time-attr, size-interval)TIMESTAMP(rowtime-attr)返回窗口的完结工夫(不蕴含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999 。返回值是一个rowtime attribute,即能够基于该字段做工夫属性的操作,例如,级联窗口只能用在基于Event Time的Window上TUMBLE_PROCTIME(time-attr, size-interval)TIMESTAMP(rowtime-attr)返回窗口的完结工夫(不蕴含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个proctime attribute,即能够基于该字段做工夫属性的操作,例如,级联窗口只能用在基于Processing Time的Window上TUMBLE window示例
import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.sql.Timestamp;import java.util.Arrays;public class TumbleWindowExample { public static void main(String[] args) throws Exception { /** * 1 注册环境 */ EnvironmentSettings mySetting = EnvironmentSettings .newInstance()// .useOldPlanner() .useBlinkPlanner() .inStreamingMode() .build(); // 获取 environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定零碎工夫概念为 event time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,mySetting); // 初始数据 DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList( //工夫 14:53:00 new Tuple3<>(1572591180_000L,"xiao_ming",300), //工夫 14:53:09 new Tuple3<>(1572591189_000L,"zhang_san",303), //工夫 14:53:12 new Tuple3<>(1572591192_000L, "xiao_li",204), //工夫 14:53:21 new Tuple3<>(1572591201_000L,"li_si", 208) )); // 指定工夫戳 SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() { @Override public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) { return element.f0; } }); // 转换为 Table Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v"); Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," + "TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM " + logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)"); TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo(); tEnv.toAppendStream(result, tpinf).print(); env.execute(); }}sql逻辑,每十秒钟聚合执行后果:(2019-11-01 06:53:00.0,2019-11-01 06:53:10.0,603)(2019-11-01 06:53:20.0,2019-11-01 06:53:30.0,208)(2019-11-01 06:53:10.0,2019-11-01 06:53:20.0,204)
...