序本文主要研究一下flink的BoundedOutOfOrdernessTimestampExtractorBoundedOutOfOrdernessTimestampExtractorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java/** * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can * help reduce the number of elements that are ignored due to lateness when computing the final result for a * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp. * /public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; /* The current maximum timestamp seen so far. / private long currentMaxTimestamp; /* The timestamp of the last emitted watermark. / private long lastEmittedWatermark = Long.MIN_VALUE; /* * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. / private final long maxOutOfOrderness; public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException(“Tried to set the maximum allowed " + “lateness to " + maxOutOfOrderness + “. This parameter cannot be negative.”); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } /* * Extracts the timestamp from the given element. * * @param element The element that the timestamp is extracted from. * @return The new timestamp. / public abstract long extractTimestamp(T element); @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; }}BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)实例 public static void main(String[] args) throws Exception { final int popThreshold = 20; // threshold for popular places // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty(“zookeeper.connect”, LOCAL_ZOOKEEPER_HOST); kafkaProps.setProperty(“bootstrap.servers”, LOCAL_KAFKA_BROKER); kafkaProps.setProperty(“group.id”, RIDE_SPEED_GROUP); // always read the Kafka topic from the start kafkaProps.setProperty(“auto.offset.reset”, “earliest”); // create a Kafka consumer FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>( “cleansedRides”, new TaxiRideSchema(), kafkaProps); // assign a timestamp extractor to the consumer consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor()); // create a TaxiRide data stream DataStream<TaxiRide> rides = env.addSource(consumer); // find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); popularPlaces.print(); // execute the transformation pipeline env.execute(“Popular Places from Kafka”); } /* * Assigns timestamps to TaxiRide records. * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted. */ public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> { public TaxiRideTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(TaxiRide ride) { if (ride.isStart) { return ride.startTime.getMillis(); } else { return ride.endTime.getMillis(); } } }该实例使用的是AssignerWithPeriodicWatermarks,通过env.getConfig().setAutoWatermarkInterval(1000)设置了watermark的时间间隔,通过assignTimestampsAndWatermarks指定了AssignerWithPeriodicWatermarks为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor抽象类小结flink为了方便开发提供了几个内置的Pre-defined Timestamp Extractors / Watermark Emitters,其中一个就是BoundedOutOfOrdernessTimestampExtractorBoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略docPre-defined Timestamp Extractors / Watermark Emitters