关于大数据:赵强老师Flink的Watermark机制基于Flink-1110实现

36次阅读

共计 5749 个字符,预计需要花费 15 分钟才能阅读完成。

在应用 eventTime 的时候如何解决乱序数据?咱们晓得,流解决从事件产生,到流经 source,再到 operator,两头是有一个过程和工夫的。尽管大部分状况下,流到 operator 的数据都是依照事件产生的工夫程序来的,然而也不排除因为网络提早等起因,导致乱序的产生,特地是应用 kafka 的话,多个分区的数据无奈保障有序。所以在进行 window 计算的时候,咱们又不能无限期的等上来,必须要有个机制来保障一个特定的工夫后,必须触发 window 去进行计算了。这个特地的机制,就是 watermark。Watermark 是用于解决乱序事件的,用于掂量 Event Time 停顿的机制。watermark 能够翻译为水位线。

一、Watermark 的外围原理

Watermark 的外围实质能够了解成一个提早触发机制。
在 Flink 的窗口处理过程中,如果确定全副数据达到,就能够对 Window 的所有数据做 窗口计算操作(如汇总、分组等),如果数据没有全副达到,则持续期待该窗口中的数据全 部达到才开始解决。这种状况下就须要用到水位线(WaterMarks)机制,它可能掂量数据处 理进度(表白数据达到的完整性),保障事件数据(全副)达到 Flink 零碎,或者在乱序及 提早达到时,也可能像预期一样计算出正确并且间断的后果。当任何 Event 进入到 Flink 零碎时,会依据以后最大事件工夫产生 Watermarks 工夫戳。

那么 Flink 是怎么计算 Watermak 的值呢?

Watermark = 进入 Flink 的最大的事件工夫 (mxtEventTime)- 指定的延迟时间 (t)

那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
如果有窗口的进行工夫等于或者小于 maxEventTime – t(过后的 warkmark),那么这个窗口被触发执行。

其外围解决流程如下图所示。

二、Watermark 的三种应用状况

1、原本有序的 Stream 中的 Watermark

如果数据元素的事件工夫是有序的,Watermark 工夫戳会随着数据元素的事件工夫按顺 序生成,此时水位线的变动和事件工夫放弃始终(因为既然是有序的工夫,就不须要设置提早了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是现实状态下的水位 线。当 Watermark 工夫大于 Windows 完结工夫就会触发对 Windows 的数据计算,以此类推,下一个 Window 也是一样。 这种状况其实是乱序数据的一种非凡状况。

2、乱序事件中的 Watermark

现实情况下数据元素往往并不是依照其产生程序接入到 Flink 零碎中进行解决,而频繁 呈现乱序或早退的状况,这种状况就须要应用 Watermarks 来应答。比方下图,设置延迟时间 t 为 2。

3、并行数据流中的 Watermark

在多并行度的状况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。

三、设置 Watermark 的外围代码

1、首先,正确设置事件处理的工夫语义,个别都是采纳 Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    

2、其次,指定生成 Watermark 的机制,包含:延时解决的工夫和 EventTime 对应的字段。如下:

留神:不论是数据是否有序,都能够应用下面的代码。有序的数据只是无序数据的一种非凡状况。

四、Watermark 编程案例

测试数据:基站的手机通话数据,如下:

需要:按基站,每 5 秒统计通话时间最长的记录。

  • StationLog 用于封装基站数据
package watermark;

//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
    private String stationID;   // 基站 ID
    private String from;        // 呼叫放
    private String to;            // 被叫方
    private long duration;        // 通话的持续时间
    private long callTime;        // 通话的呼叫工夫
    public StationLog(String stationID, String from, 
                      String to, long duration, 
                      long callTime) {
        this.stationID = stationID;
        this.from = from;
        this.to = to;
        this.duration = duration;
        this.callTime = callTime;
    }
    public String getStationID() {return stationID;}
    public void setStationID(String stationID) {this.stationID = stationID;}
    public long getCallTime() {return callTime;}
    public void setCallTime(long callTime) {this.callTime = callTime;}
    public String getFrom() {return from;}
    public void setFrom(String from) {this.from = from;}

    public String getTo() {return to;}
    public void setTo(String to) {this.to = to;}
    public long getDuration() {return duration;}
    public void setDuration(long duration) {this.duration = duration;}
}
  • 代码实现:WaterMarkDemo 用于实现计算(留神:为了不便咱们测试设置工作的并行度为 1)
package watermark;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

// 每隔五秒,将过来是 10 秒内,通话时间最长的通话日志输入。public class WaterMarkDemo {public static void main(String[] args) throws Exception {
        // 失去 Flink 流式解决的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        // 设置周期性的产生水位线的工夫距离。当数据流很大的时候,如果每个事件都产生水位线,会影响性能。env.getConfig().setAutoWatermarkInterval(100);// 默认 100 毫秒
        
        // 失去输出流
        DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
        stream.flatMap(new FlatMapFunction<String, StationLog>() {public void flatMap(String data, Collector<StationLog> output) throws Exception {String[] words = data.split(",");
                //                           基站 ID            from    to        通话时长                                                    callTime
                output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
            }
        }).filter(new FilterFunction<StationLog>() {
            
            @Override
            public boolean filter(StationLog value) throws Exception {return value.getDuration() > 0?true:false;
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {return element.getCallTime(); // 指定 EventTime 对应的字段
                    }
                })
        ).keyBy(new KeySelector<StationLog, String>(){
            @Override
            public String getKey(StationLog value) throws Exception {return value.getStationID();  // 依照基站分组
            }}
        ).timeWindow(Time.seconds(5)) // 设置工夫窗口
        .reduce(new MyReduceFunction(),new MyProcessWindows()).print();

        env.execute();}
}
// 用于如何解决窗口中的数据,即:找到窗口内通话时间最长的记录。class MyReduceFunction implements ReduceFunction<StationLog> {
    @Override
    public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
        // 找到通话时间最长的通话记录
        return value1.getDuration() >= value2.getDuration() ? value1 : value2;
    }
}
// 窗口解决实现后,输入的后果是什么
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
    @Override
    public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
            Iterable<StationLog> elements, Collector<String> out) throws Exception {StationLog maxLog = elements.iterator().next();

        StringBuffer sb = new StringBuffer();
        sb.append("窗口范畴是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
        sb.append("基站 ID:").append(maxLog.getStationID()).append("\t")
          .append("呼叫工夫:").append(maxLog.getCallTime()).append("\t")
          .append("主叫号码:").append(maxLog.getFrom()).append("\t")
          .append("被叫号码:")    .append(maxLog.getTo()).append("\t")
          .append("通话时长:").append(maxLog.getDuration()).append("\n");
        out.collect(sb.toString());
    }
}

正文完
 0