共计 5605 个字符,预计需要花费 15 分钟才能阅读完成。
序
本文主要研究一下 flink 的 BoundedOutOfOrdernessTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
/**
* This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
* the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
* help reduce the number of elements that are ignored due to lateness when computing the final result for a
* given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
* after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
* */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
/** The current maximum timestamp seen so far. */
private long currentMaxTimestamp;
/** The timestamp of the last emitted watermark. */
private long lastEmittedWatermark = Long.MIN_VALUE;
/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
*/
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException(“Tried to set the maximum allowed ” +
“lateness to ” + maxOutOfOrderness + “. This parameter cannot be negative.”);
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
/**
* Extracts the timestamp from the given element.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp – maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
BoundedOutOfOrdernessTimestampExtractor 抽象类实现 AssignerWithPeriodicWatermarks 接口的 extractTimestamp 及 getCurrentWatermark 方法,同时声明抽象方法 extractAscendingTimestamp 供子类实现
BoundedOutOfOrdernessTimestampExtractor 的构造器接收 maxOutOfOrderness 参数用于指定 element 允许滞后 (t-t_w,t 为 element 的 eventTime,t_w 为前一次 watermark 的时间) 的最大时间,在计算窗口数据时,如果超过该值则会被忽略
BoundedOutOfOrdernessTimestampExtractor 的 extractTimestamp 方法会调用子类的 extractTimestamp 方法抽取时间,如果该时间大于 currentMaxTimestamp,则更新 currentMaxTimestamp;getCurrentWatermark 先计算 potentialWM,如果 potentialWM 大于等于 lastEmittedWatermark 则更新 lastEmittedWatermark(currentMaxTimestamp – lastEmittedWatermark >= maxOutOfOrderness,这里表示 lastEmittedWatermark 太小了所以差值超过了 maxOutOfOrderness,因而调大 lastEmittedWatermark),最后返回 Watermark(lastEmittedWatermark)
实例
public static void main(String[] args) throws Exception {
final int popThreshold = 20; // threshold for popular places
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty(“zookeeper.connect”, LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty(“bootstrap.servers”, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(“group.id”, RIDE_SPEED_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty(“auto.offset.reset”, “earliest”);
// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
“cleansedRides”,
new TaxiRideSchema(),
kafkaProps);
// assign a timestamp extractor to the consumer
consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());
// create a TaxiRide data stream
DataStream<TaxiRide> rides = env.addSource(consumer);
// find popular places
DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
// match ride to grid cell and event type (start or end)
.map(new GridCellMatcher())
// partition by cell id and event type
.keyBy(0, 1)
// build sliding window
.timeWindow(Time.minutes(15), Time.minutes(5))
// count ride events in window
.apply(new RideCounter())
// filter by popularity threshold
.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
// map grid cell to coordinates
.map(new GridToCoordinates());
popularPlaces.print();
// execute the transformation pipeline
env.execute(“Popular Places from Kafka”);
}
/**
* Assigns timestamps to TaxiRide records.
* Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
*/
public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {
public TaxiRideTSExtractor() {
super(Time.seconds(MAX_EVENT_DELAY));
}
@Override
public long extractTimestamp(TaxiRide ride) {
if (ride.isStart) {
return ride.startTime.getMillis();
}
else {
return ride.endTime.getMillis();
}
}
}
该实例使用的是 AssignerWithPeriodicWatermarks,通过 env.getConfig().setAutoWatermarkInterval(1000)设置了 watermark 的时间间隔,通过 assignTimestampsAndWatermarks 指定了 AssignerWithPeriodicWatermarks 为 TaxiRideTSExtractor,它继承了 BoundedOutOfOrdernessTimestampExtractor 抽象类
小结
flink 为了方便开发提供了几个内置的 Pre-defined Timestamp Extractors / Watermark Emitters,其中一个就是 BoundedOutOfOrdernessTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor 抽象类实现 AssignerWithPeriodicWatermarks 接口的 extractTimestamp 及 getCurrentWatermark 方法,同时声明抽象方法 extractAscendingTimestamp 供子类实现
BoundedOutOfOrdernessTimestampExtractor 的构造器接收 maxOutOfOrderness 参数用于指定 element 允许滞后 (t-t_w,t 为 element 的 eventTime,t_w 为前一次 watermark 的时间) 的最大时间,在计算窗口数据时,如果超过该值则会被忽略
doc
Pre-defined Timestamp Extractors / Watermark Emitters