序
本文主要研究一下 flink 的 TimeCharacteristic
TimeCharacteristic
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java
/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
*/
@PublicEvolving
public enum TimeCharacteristic {
/**
* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
*
* <p>Using processing time for window operations results in general in quite non-deterministic
* results, because the contents of the windows depends on the speed in which elements arrive.
* It is, however, the cheapest method of forming windows and the method that introduces the
* least latency.
*/
ProcessingTime,
/**
* Ingestion time means that the time of each individual element in the stream is determined
* when the element enters the Flink streaming data flow. Operations like windows group the
* elements based on that time, meaning that processing speed within the streaming dataflow
* does not affect windowing, but only the speed at which sources receive elements.
*
* <p>Ingestion time is often a good compromise between processing time and event time.
* It does not need and special manual form of watermark generation, and events are typically
* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
* only be introduced by streaming shuffles or split/join/union operations. The fact that
* elements are not very much out-of-order means that the latency increase is moderate,
* compared to event
* time.
*/
IngestionTime,
/**
* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event’s individual custom timestamp. These timestamps either exist in
* the elements from before they entered the Flink streaming dataflow, or are user-assigned at
* the sources. The big implication of this is that it allows for elements to arrive in the
* sources and in all operators out of order, meaning that elements with earlier timestamps may
* arrive after elements with later timestamps.
*
* <p>Operators that window or order data with respect to event time must buffer data until they
* can be sure that all timestamps for a certain time interval have been received. This is
* handled by the so called “time watermarks”.
*
* <p>Operations based on event time are very predictable – the result of windowing operations
* is typically identical no matter when the window is executed and how fast the streams
* operate. At the same time, the buffering and tracking of event time is also costlier than
* operating with processing time, and typically also introduces more latency. The amount of
* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
* time span between the arrival of early and late elements is. With respect to the
* “time watermarks”, this means that the cost typically depends on how early or late the
* watermarks can be generated for their timestamp.
*
* <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
* event’s original time, rather than the time assigned at the data source. Practically, that
* means that event time has generally more meaning, but also that it takes longer to determine
* that all elements for a certain time have arrived.
*/
EventTime
}
ProcessingTime 是以 operator 处理的时间为准,它使用的是机器的系统时间来作为 data stream 的时间
IngestionTime 是以数据进入 flink streaming data flow 的时间为准
EventTime 是以数据自带的时间戳字段为准,应用程序需要指定如何从 record 中抽取时间戳字段
区别
各个时间的区别如上图
实例
public static void main(String[] args) throws Exception {
final int popThreshold = 20; // threshold for popular places
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty(“zookeeper.connect”, LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty(“bootstrap.servers”, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(“group.id”, RIDE_SPEED_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty(“auto.offset.reset”, “earliest”);
// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
“cleansedRides”,
new TaxiRideSchema(),
kafkaProps);
// assign a timestamp extractor to the consumer
consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());
// create a TaxiRide data stream
DataStream<TaxiRide> rides = env.addSource(consumer);
// find popular places
DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
// match ride to grid cell and event type (start or end)
.map(new GridCellMatcher())
// partition by cell id and event type
.keyBy(0, 1)
// build sliding window
.timeWindow(Time.minutes(15), Time.minutes(5))
// count ride events in window
.apply(new RideCounter())
// filter by popularity threshold
.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
// map grid cell to coordinates
.map(new GridToCoordinates());
popularPlaces.print();
// execute the transformation pipeline
env.execute(“Popular Places from Kafka”);
}
/**
* Assigns timestamps to TaxiRide records.
* Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
*/
public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {
public TaxiRideTSExtractor() {
super(Time.seconds(MAX_EVENT_DELAY));
}
@Override
public long extractTimestamp(TaxiRide ride) {
if (ride.isStart) {
return ride.startTime.getMillis();
}
else {
return ride.endTime.getMillis();
}
}
}
这里消费 kafka 的时候 setStreamTimeCharacteristic 为 TimeCharacteristic.EventTime,同时 assignTimestampsAndWatermarks 指定为 TaxiRideTSExtractor,它继承了 BoundedOutOfOrdernessTimestampExtractor,这里的 extractTimestamp 根据 ride 的 start 与否返回 ride.startTime.getMillis() 或者 ride.endTime.getMillis(),来自定义了 eventTime
小结
flink 的 TimeCharacteristic 枚举定义了三类值,分别是 ProcessingTime、IngestionTime、EventTime
ProcessingTime 是以 operator 处理的时间为准,它使用的是机器的系统时间来作为 data stream 的时间;IngestionTime 是以数据进入 flink streaming data flow 的时间为准;EventTime 是以数据自带的时间戳字段为准,应用程序需要指定如何从 record 中抽取时间戳字段
指定为 EventTime 的 source 需要自己定义 event time 以及 emit watermark,或者在 source 之外通过 assignTimestampsAndWatermarks 在程序手工指定
doc
Event Time