乐趣区

聊聊flink的EventTime


本文主要研究一下 flink 的 EventTime
SourceFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {

/**
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system’s
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);

/**
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system’s current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);

/**
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t’ <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);

/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();

/**
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock
*/
Object getCheckpointLock();

/**
* This method is called by the system to shut down the context.
*/
void close();
}
SourceFunction 里头定义了 SourceContext 接口,它里头定义了 collectWithTimestamp、emitWatermark 方法,前者用来 assign event timestamp,后者用来 emit watermark
实例
public abstract class TestSource implements SourceFunction {
private volatile boolean running = true;
protected Object[] testStream;

@Override
public void run(SourceContext ctx) throws Exception {
for (int i = 0; (i < testStream.length) && running; i++) {
if (testStream[i] instanceof TaxiRide) {
TaxiRide ride = (TaxiRide) testStream[i];
ctx.collectWithTimestamp(ride, ride.getEventTime());
} else if (testStream[i] instanceof TaxiFare) {
TaxiFare fare = (TaxiFare) testStream[i];
ctx.collectWithTimestamp(fare, fare.getEventTime());
} else if (testStream[i] instanceof String) {
String s = (String) testStream[i];
ctx.collectWithTimestamp(s, 0);
} else if (testStream[i] instanceof Long) {
Long ts = (Long) testStream[i];
ctx.emitWatermark(new Watermark(ts));
} else {
throw new RuntimeException(testStream[i].toString());
}
}
// test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes
}

@Override
public void cancel() {
running = false;
}
}
这里展示了如何在 SourceFunction 里头来 assign timestamp(collectWithTimestamp)以及 emit watermark(emitWatermark)
DataStream.assignTimestampsAndWatermarks
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
* <p>This method creates watermarks periodically (for example every second), based
* on the watermarks indicated by the given watermark generator. Even when no new elements
* in the stream arrive, the given watermark generator will be periodically checked for
* new watermarks. The interval in which watermarks are generated is defined in
* {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* <p>Use this method for the common cases, where some characteristic over all elements
* should generate the watermarks, or where watermarks are simply trailing behind the
* wall clock time by a certain amount.
*
* <p>For the second case and when the watermarks are required to lag behind the maximum
* timestamp seen so far in the elements of the stream by a fixed amount of time, and this
* amount is known in advance, use the
* {@link BoundedOutOfOrdernessTimestampExtractor}.
*
* <p>For cases where watermarks should be created in an irregular fashion, for example
* based on certain markers that some element carry, use the
* {@link AssignerWithPunctuatedWatermarks}.
*
* @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
* watermark generator.
* @return The stream after the transformation, with assigned timestamps and watermarks.
*
* @see AssignerWithPeriodicWatermarks
* @see AssignerWithPunctuatedWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
*/
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

return transform(“Timestamps/Watermarks”, getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}

/**
* Assigns timestamps to the elements in the data stream and creates watermarks to
* signal event time progress based on the elements themselves.
*
* <p>This method creates watermarks based purely on stream elements. For each element
* that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
* the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
* method is called, and a new watermark is emitted, if the returned watermark value is
* non-negative and greater than the previous watermark.
*
* <p>This method is useful when the data stream embeds watermark elements, or certain elements
* carry a marker that can be used to determine the current event time watermark.
* This operation gives the programmer full control over the watermark generation. Users
* should be aware that too aggressive watermark generation (i.e., generating hundreds of
* watermarks every second) can cost some performance.
*
* <p>For cases where watermarks should be created in a regular fashion, for example
* every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}.
*
* @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
* watermark generator.
* @return The stream after the transformation, with assigned timestamps and watermarks.
*
* @see AssignerWithPunctuatedWatermarks
* @see AssignerWithPeriodicWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
*/
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {

// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

TimestampsAndPunctuatedWatermarksOperator<T> operator =
new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);

return transform(“Timestamps/Watermarks”, getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
DataStream 定义了 assignTimestampsAndWatermarks 方法,用来在 source 外头设置 timestampAndWatermarkAssigner(AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks 类型),告知 flink 如何提取 eventTime
AssignerWithPeriodicWatermarks
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return {@code null} to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non-null and its timestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If the current watermark is still
* identical to the previous one, no progress in event time has happened since
* the previous call to this method. If a null value is returned, or the timestamp
* of the returned watermark is smaller than that of the last emitted one, then no
* new watermark will be generated.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark getCurrentWatermark();
}
AssignerWithPeriodicWatermarks 继承了 TimestampAssigner 接口(定义了 extractTimestamp 方法),这里定义了 getCurrentWatermark 方法,该方法会被周期性调用返回 current watermark,如果没有的话返回 null
AssignerWithPeriodicWatermarks 实例
public static void main(String[] args) throws Exception {

final int popThreshold = 20; // threshold for popular places

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty(“zookeeper.connect”, LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty(“bootstrap.servers”, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(“group.id”, RIDE_SPEED_GROUP);
// always read the Kafka topic from the start
kafkaProps.setProperty(“auto.offset.reset”, “earliest”);

// create a Kafka consumer
FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
“cleansedRides”,
new TaxiRideSchema(),
kafkaProps);
// assign a timestamp extractor to the consumer
consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

// create a TaxiRide data stream
DataStream<TaxiRide> rides = env.addSource(consumer);

// find popular places
DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
// match ride to grid cell and event type (start or end)
.map(new GridCellMatcher())
// partition by cell id and event type
.keyBy(0, 1)
// build sliding window
.timeWindow(Time.minutes(15), Time.minutes(5))
// count ride events in window
.apply(new RideCounter())
// filter by popularity threshold
.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
// map grid cell to coordinates
.map(new GridToCoordinates());

popularPlaces.print();

// execute the transformation pipeline
env.execute(“Popular Places from Kafka”);
}

/**
* Assigns timestamps to TaxiRide records.
* Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
*/
public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

public TaxiRideTSExtractor() {
super(Time.seconds(MAX_EVENT_DELAY));
}

@Override
public long extractTimestamp(TaxiRide ride) {
if (ride.isStart) {
return ride.startTime.getMillis();
}
else {
return ride.endTime.getMillis();
}
}
}
这里使用了 DataStream 的 assignTimestampsAndWatermarks 方法,设置的 timestampAndWatermarkAssigner 实现了 AssignerWithPeriodicWatermarks 接口 (BoundedOutOfOrdernessTimestampExtractor 实现了 AssignerWithPeriodicWatermarks 接口);这里通过 env.getConfig().setAutoWatermarkInterval(1000) 来设置 AssignerWithPeriodicWatermarks 的间隔
AssignerWithPunctuatedWatermarks
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

/**
* Asks this implementation if it wants to emit a watermark. This method is called right after
* the {@link #extractTimestamp(Object, long)} method.
*
* <p>The returned watermark will be emitted only if it is non-null and its timestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If a null value is returned, or the timestamp of the returned
* watermark is smaller than that of the last emitted one, then no new watermark will
* be generated.
*
* <p>For an example how to use this method, see the documentation of
* {@link AssignerWithPunctuatedWatermarks this class}.
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
AssignerWithPunctuatedWatermarks 接口继承了 TimestampAssigner 接口(定义了 extractTimestamp 方法),这里定义了 checkAndGetNextWatermark 方法,该方法会在 extractTimestamp 方法执行之后被调用(调用时通过方法参数传递刚获取的 extractedTimestamp)
AssignerWithPunctuatedWatermarks 实例
public static void main(String[] args) throws Exception {

// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired(“input”);

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

// connect to the data file
DataStream<String> carData = env.readTextFile(input);

// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());

// sort events
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())
.print();

env.execute(“Sort Connected Car Events”);
}

public class ConnectedCarAssigner implements AssignerWithPunctuatedWatermarks<ConnectedCarEvent> {
@Override
public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) {
return event.timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) {
// simply emit a watermark with every event
return new Watermark(extractedTimestamp – 30000);
}
}
这里使用了 DataStream 的 assignTimestampsAndWatermarks 方法,设置的 timestampAndWatermarkAssigner 实现了 AssignerWithPunctuatedWatermarks 接口
小结

使用 EventTime 的话就需要告知 flink 每个数据的 eventTime 从哪里取,这个通常跟 generate watermarks 操作一起告知 flink eventTime;有两种方式,一种是 data stream source 内部处理,一种是通过 timestam assigner/watermark generator(在 flink 中,timestamp assigners 也定义了如何 emit watermark,它们使用的是距离 1970-01-01T00:00:00Z 以来的毫秒数)
在 source 里头定义的话,即使用 SourceFunction 里头定义的 SourceContext 接口的 collectWithTimestamp、emitWatermark 方法,前者用来 assign event timestamp,后者用来 emit watermark
在 source 外头定义的话,就是通过 DataStream 的 assignTimestampsAndWatermarks 方法,设置 timestampAndWatermarkAssigner;它有两种类型:AssignerWithPeriodicWatermarks(定义了 getCurrentWatermark 方法,用于返回当前的 watermark;periodic 间隔参数通过 env.getConfig().setAutoWatermarkInterval(1000)来设置);AssignerWithPunctuatedWatermarks(定义了 checkAndGetNextWatermark 方法,该方法会在 extractTimestamp 方法执行之后被调用(调用时通过方法参数传递刚获取的 extractedTimestamp`)

doc
Generating Timestamps / Watermarks

退出移动版