序本文主要研究一下flink的Session WindowMergingWindowAssignerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java@PublicEvolvingpublic abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> { private static final long serialVersionUID = 1L; /** * Determines which windows (if any) should be merged. * * @param windows The window candidates. * @param callback A callback that can be invoked to signal which windows should be merged. / public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback); /* * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which * windows should be merged. / public interface MergeCallback<W> { /* * Specifies that the given windows should be merged into the result window. * * @param toBeMerged The list of windows that should be merged into one window. * @param mergeResult The resulting merged window. / void merge(Collection<W> toBeMerged, W mergeResult); }}MergingWindowAssigner继承了WindowAssigner,它自己定义了mergeWindows抽象方法,该方法有一个MergeCallback类型参数,MergeCallback接口定义了merge方法,传入merge前的windows及合并后的windowEventTimeSessionWindowsflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.javapublic class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; protected long sessionTimeout; protected EventTimeSessionWindows(long sessionTimeout) { if (sessionTimeout <= 0) { throw new IllegalArgumentException(“EventTimeSessionWindows parameters must satisfy 0 < size”); } this.sessionTimeout = sessionTimeout; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return “EventTimeSessionWindows(” + sessionTimeout + “)”; } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param size The session timeout, i.e. the time gap between sessions * @return The policy. / public static EventTimeSessionWindows withGap(Time size) { return new EventTimeSessionWindows(size.toMilliseconds()); } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements * @return The policy. / @PublicEvolving public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } /* * Merge overlapping {@link TimeWindow}s. / public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) { TimeWindow.mergeWindows(windows, c); }}EventTimeSessionWindows继承了MergingWindowAssigner,它的构造器参数为sessionTimeout;assignWindows方法返回的TimeWindow的start为timestamp,end为timestamp + sessionTimeoutgetDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的为true;mergeWindows方法调用的是TimeWindow.mergeWindows方法EventTimeSessionWindows定义了两个静态工厂方法,分别是withGap及withDynamicGap,其中withGap创建的是EventTimeSessionWindows,withDynamicGap创建的是DynamicEventTimeSessionWindowsProcessingTimeSessionWindowsflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.javapublic class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; protected long sessionTimeout; protected ProcessingTimeSessionWindows(long sessionTimeout) { if (sessionTimeout <= 0) { throw new IllegalArgumentException(“ProcessingTimeSessionWindows parameters must satisfy 0 < size”); } this.sessionTimeout = sessionTimeout; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { long currentProcessingTime = context.getCurrentProcessingTime(); return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return “ProcessingTimeSessionWindows(” + sessionTimeout + “)”; } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param size The session timeout, i.e. the time gap between sessions * @return The policy. / public static ProcessingTimeSessionWindows withGap(Time size) { return new ProcessingTimeSessionWindows(size.toMilliseconds()); } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements * @return The policy. / @PublicEvolving public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } /* * Merge overlapping {@link TimeWindow}s. / public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) { TimeWindow.mergeWindows(windows, c); }}ProcessingTimeSessionWindows继承了MergingWindowAssigner,它的构造器参数为sessionTimeout;assignWindows方法返回的TimeWindow的start为currentProcessingTime(这里currentProcessingTime值为context.getCurrentProcessingTime()),end为currentProcessingTime + sessionTimeoutgetDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的为false;mergeWindows方法调用的是TimeWindow.mergeWindows方法ProcessingTimeSessionWindows定义了两个静态工厂方法,分别是withGap及withDynamicGap,其中withGap创建的是ProcessingTimeSessionWindows,withDynamicGap创建的是DynamicProcessingTimeSessionWindowsSessionWindowTimeGapExtractorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java@PublicEvolvingpublic interface SessionWindowTimeGapExtractor<T> extends Serializable { /* * Extracts the session time gap. * @param element The input element. * @return The session time gap in milliseconds. / long extract(T element);}SessionWindowTimeGapExtractor接口定义了extract方法,用于从element中提取sessionTimeout参数DynamicEventTimeSessionWindowsflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java@PublicEvolvingpublic class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> { private static final long serialVersionUID = 1L; protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor; protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; } @Override public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) { long sessionTimeout = sessionWindowTimeGapExtractor.extract(element); if (sessionTimeout <= 0) { throw new IllegalArgumentException(“Dynamic session time gap must satisfy 0 < gap”); } return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } @SuppressWarnings(“unchecked”) @Override public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return (Trigger<T, TimeWindow>) EventTimeTrigger.create(); } @Override public String toString() { return “DynamicEventTimeSessionWindows()”; } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements * @return The policy. / public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } /* * Merge overlapping {@link TimeWindow}s. / public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) { TimeWindow.mergeWindows(windows, c); }}DynamicEventTimeSessionWindows也继承了MergingWindowAssigner,与EventTimeSessionWindows不同的是,它的构造器参数为SessionWindowTimeGapExtractorassignWindows方法首先使用sessionWindowTimeGapExtractor从element中提取sessionTimeout,然后返回TimeWindow(timestamp, timestamp + sessionTimeout);getDefaultTrigger方法返回的是EventTimeTrigger;isEventTime返回的为true;mergeWindows方法调用的是TimeWindow.mergeWindows方法DynamicEventTimeSessionWindows定义了withDynamicGap的静态工厂方法,用于创建DynamicEventTimeSessionWindowsDynamicProcessingTimeSessionWindowsflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java@PublicEvolvingpublic class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> { private static final long serialVersionUID = 1L; protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor; protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; } @Override public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) { long currentProcessingTime = context.getCurrentProcessingTime(); long sessionTimeout = sessionWindowTimeGapExtractor.extract(element); if (sessionTimeout <= 0) { throw new IllegalArgumentException(“Dynamic session time gap must satisfy 0 < gap”); } return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); } @SuppressWarnings(“unchecked”) @Override public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create(); } @Override public String toString() { return “DynamicProcessingTimeSessionWindows()”; } /* * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns * elements to sessions based on the element timestamp. * * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements * @return The policy. / public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) { return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } /* * Merge overlapping {@link TimeWindow}s. */ public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) { TimeWindow.mergeWindows(windows, c); }}DynamicProcessingTimeSessionWindows也继承了MergingWindowAssigner,与ProcessingTimeSessionWindows不同的是,它的构造器参数为SessionWindowTimeGapExtractorassignWindows方法首先使用sessionWindowTimeGapExtractor从element中提取sessionTimeout,然后返回TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)(这里currentProcessingTime的值为context.getCurrentProcessingTime());getDefaultTrigger方法返回的是ProcessingTimeTrigger;isEventTime返回的为false;mergeWindows方法调用的是TimeWindow.mergeWindows方法DynamicProcessingTimeSessionWindows定义了withDynamicGap的静态工厂方法,用于创建DynamicProcessingTimeSessionWindows小结flink的session window主要有EventTimeSessionWindows、DynamicEventTimeSessionWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows,它们都继承了MergingWindowAssigner;MergingWindowAssigner定义了mergeWindows抽象方法EventTimeSessionWindows与ProcessingTimeSessionWindows的构造器参数都是sessionTimeout,不同的是,assignWindows中,ProcessingTimeSessionWindows使用context.getCurrentProcessingTime()替代了方法timestamp参数来计算TimeWindow;getDefaultTrigger方法前者返回EventTimeTrigger,后者返回ProcessingTimeTrigger;isEventTime方法前者返回true,后者返回falseDynamicEventTimeSessionWindows与DynamicProcessingTimeSessionWindows,它们与非dynamic的区别是,它们的构造器参数为SessionWindowTimeGapExtractor;SessionWindowTimeGapExtractor接口定义了extract方法,用于从element中提取sessionTimeout参数;而非dynamic的session window,其sessionTimeout参数在构造器传入之后就固定了docSession Windows