乐趣区

flink学习系列基础知识学习二

前言

回顾第一讲:我们了解了流(有界,无界),窗口(翻转,滑动,会话,全局)等基本概念。
知道了 DataStreamAPI 的基本用法:
source->transformation->sink 的基本步骤
其中
source 可以使用 flink 原生的(如 kafka,rabbitmq…)还可以继承 RichSourceFunction
transformation 常用的有 window keyby map reduce 聚合函数 …
sink 可以使用 flink 原生的(如 kafka,rabbitmq…)还可以继承 RichSinkFunction
第一讲详见:https://segmentfault.com/a/11…

但是我们知道,Flink api 有三大类,分别是 DataStreamAPI DataSetAPI tableAPI(sql)
DataSetAPI 和 DataStreamAPI 区别是一个是批处理,一个是流处理。而今天将介绍 table api 用法。
另外 我们知道,flink 是有状态的流式处理计算。那么它的状态是桌面管理的?有哪些类型?我们该怎么样去使用?我这一讲会以 kafka 消费位点 为切入点,一步一步的分析出 kafka 为何是可以保证 exactly-once。了解了 flink 对 kafka 消费位点原理后。我们怎么使用 flink 的 state 存储我们想要的数据呢?除此之外,今天还会介绍 flink 的水印,触发器,清除器,吃到生存周期,还会简单介绍一下 flink 的 processfunction. 比如我们想合并两个数据源,就会用到 coprocessfunction..

即这一讲的内容包含:

  • flink 关系型 API(table API sql)
  • flink 状态管理(state)
  • 水印(watermark)
  • 触发器(trigger)
  • 清除器
  • 迟到生存周期
  • 低层次的 Join(Low-Level Joins)

一 flink 关系型 API(table API sql)

  • 表 API 和 SQL 程序的结构

批处理和流式传输的所有 Table API 和 SQL 程序都遵循相同的模式。以下代码示例显示了 Table API 和 SQL 程序的常见结构。

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");

// execute
env.execute();

注意:表 API 和 SQL 查询可以轻松集成并嵌入到 DataStream 或 DataSet 程序中

下面举一个例子来说明:

package cn.crawler.mft_seconed.demo2;

import cn.crawler.mft_seconed.KafkaEntity;
import cn.crawler.mft_seconed.KafkaEntityTableSource;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.WindowGroupedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;


import java.lang.reflect.Type;
import java.util.Properties;

/**
 * Created by liuliang
 * on 2019/7/13
 */
public class TableAPIDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1. 简单的 kafka->flink sql->mysql
        // 配置正好执行一次策略
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1S 执行一次 checkpoint
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "172.19.141.XX:31090");
        prop.setProperty("group.id", "flink-group");
        FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("fan_or_jia", new SimpleStringSchema(), prop);
        myConsumer.setStartFromGroupOffsets();// 默认消费策略
        DataStreamSource<String> kafkaSource = env.addSource(myConsumer);

        // 将 kafka 中取出的数据流映射为 operator
        SingleOutputStreamOperator<KafkaEntity> map = kafkaSource.map(new MapFunction<String, KafkaEntity>() {
            private static final long serialVersionUID = 1471936326697828381L;
            @Override
            public KafkaEntity map(String value) {KafkaEntity kafkaEntity = JSON.parseObject(value, KafkaEntity.class);

                return kafkaEntity;
            }
        });

        map.print(); // 打印 operator
        // 注册为 mft_flink_kafka 表 从 map 里面取值,字段分别是 id,message,name,time
        // 数据流无缝转换为 table
        tableEnv.registerDataStream("mft_flink_kafka",map,"id,message,name,create_time");

//        Table table = tableEnv.scan("mft_flink_kafka");
//        Table table1 = table.filter("id%2!=0")
//                .window(Tumble.over("1000.millis").on("rowtime").as("total"))
//                .groupBy("total")
//                .select("id,name,total.end as second");
//        table1.printSchema();

        Table sqlQuery = tableEnv.sqlQuery("select id,message,name,create_time from mft_flink_kafka");

        //sink to mysql
        DataStream<Tuple4<Integer,String,String,Long>> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING,Types.STRING, Types.LONG));

        appendStream.print();

        appendStream.map(new MapFunction<Tuple4<Integer,String,String,Long>, KafkaEntity>() {
            @Override
            public KafkaEntity map(Tuple4<Integer, String,String,Long> stringStringTuple4) throws Exception {return new KafkaEntity(stringStringTuple4.f0,stringStringTuple4.f1,stringStringTuple4.f2,stringStringTuple4.f3);
            }
        }).addSink(new SinkKafkaEntity2Mysql());

//        2.table api 带窗口函数的,处理时间属性由 KafkaEntityTableSource 实现 DefinedProctimeAttribute 接口的定义。逻辑时间属性附加到由返回类型定义的物理模式 TableSource。//        tableEnv.registerDataStream("kafka_demo",new KafkaEntityTableSource());


        env.execute("kafkaEntity from Kafka");

    }
}







    
 package cn.crawler.mft_seconed.demo2;

import cn.crawler.mft_seconed.KafkaEntity;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;

/**
 * Created by liuliang
 * on 2019/7/13
 */
public class SendDataToKafkaSql {public static void main(String[] args){SendDataToKafkaSql sendDataToKafka = new SendDataToKafkaSql();
    for(int i=100;i<200;i++){

        String name = "";
        if(i%2==0){name = "范冰冰";}else {name = "贾玲";}

        KafkaEntity build = KafkaEntity.builder()
                .message("meaasge" + i)
                .id(i)
                .name(name+i)
                .create_time(System.currentTimeMillis())
                .build();
        System.out.println(build.toString());
        sendDataToKafka.send("fan_or_jia", "123", JSON.toJSONString(build));
    }

}

public void send(String topic,String key,String data){Properties props = new Properties();
    props.put("bootstrap.servers", "172.19.141.60:31090");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props);
    for(int i=1;i<2;i++){
        try {Thread.sleep(100);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        producer.send(new ProducerRecord<String, String>(topic, key+i, data));
    }
    prod

ucer.close();}
}






package cn.crawler.mft_seconed.demo2;

import cn.crawler.mft_seconed.KafkaEntity;
import cn.crawler.util.MysqlUtil;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * Created by liuliang
 * on 2019/7/15
 */
public class SinkKafkaEntity2Mysql extends RichSinkFunction<KafkaEntity> {
    /**
     * 每条数据得插入都要掉一次 invoke 方法
     * @param kafkaEntity
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(KafkaEntity kafkaEntity, Context context) throws Exception {MysqlUtil mysqlUtil = new MysqlUtil();
        mysqlUtil.insertKafkaEntity(kafkaEntity);
    }
}

注:table api 还可以有流字段的逻辑时间属性 … 也很简单,创建一个接受类,继承 StreamTableSource<Row> 就好, 有机会可以详细介绍,详见官网:https://ci.apache.org/project…

二 flink 状态管理(state)

  • 什么是 State(状态)?
    可以回忆一下 http 是有状态的传输协议吗?

      我们可以这样理解:state 是 flink 在某 task/operator 在某时刻的一个中间结果
      快照(shapshot)
      在 flink 中状态可以理解为一种数据结构 
    
  • State 类型

    总的来说,state 分为两种,operator state 和 key state,key state 专门对 keystream 使用,所包含的 Sate 种类也更多,可理解为 dataStream.keyBy()之后的 Operator State,Operator State 是对每一个 Operator 的状态进行记录,而 key State 则是在 dataSteam 进行 keyBy()后,记录相同 keyId 的 keyStream 上的状态 key State 提供的数据类型:ValueState<T>、ListState<T>、ReducingState<T>、MapState<T>。
    operator state 种类只有一种就是 ListState<T>,flink 官方文档用 kafka 的消费者举例,认为 kafka 消费者的 partitionId 和 offset 类似 flink 的 operator state

  • State 理解
    state 分为 operator state 和 key state 两种,都属于 manage state,优势是可以结合 checkpoint,实现自动存储状态和异常恢复功能,但是 state 不一定要使用 manage state,在 source、windows 和 sink 中自己声明一个 int 都可以作为状态进行使用,只不过需要自己实现快照状态保存和恢复。

说了这么多,有些概念第一次见到,会很懵逼,我还是重复以前的方式,对于难以理解的东西,先举一个实际例子看效果。然后根据我们看到的效果简单分析一下他是怎么做到的。然后知道 state 可以干什么以后,再来细细的分析 state 的基础知识。

演示程序:

package cn.crawler.mft_seconed.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by liuliang
 * on 2019/7/12
 */
public class SendDataToKafkaDemo {public static void main(String[] args){SendDataToKafkaDemo sendDataToKafka = new SendDataToKafkaDemo();
        for(int i=100;i<200;i++){sendDataToKafka.send("demo", "123", "这是测试的数据"+i);
        }

    }

    public void send(String topic,String key,String data){Properties props = new Properties();
        props.put("bootstrap.servers", "172.19.141.60:31090");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props);
        for(int i=1;i<2;i++){
            try {Thread.sleep(100);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            producer.send(new ProducerRecord<String, String>(topic, key+i, data));
        }
        producer.close();}

}




    package cn.crawler.mft_seconed.demo1;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * Created by liuliang
 * on 2019/6/19
 */
public class KafkaDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置正好执行一次策略
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "172.19.141.60:31090");
        prop.setProperty("group.id", "flink-group");
        FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("mysqltest", new SimpleStringSchema(), prop);
        myConsumer.setStartFromGroupOffsets();// 默认消费策略
        DataStreamSource<String> source = env.addSource(myConsumer);
        source.addSink(new PrintSinkFunction());
        env.execute("StreamingFromCollection");

    }

}

我们先将 KafkaDemo(前者)启动,然后启动 SendDataToKafkaDemo(后者)。让后者像前者发送数据,正常情况下,前者会接收到后者的所有数据。现在,我们在后者发送数据的过程中,将前者程序停止(模拟程序 crash)这时候根据输出,观察前者消费了多少数据。记住数字 1. 然后再启动后者,观察开始输出的数字 2 是否是接着数字 1 的。我们可以看到,数字 2 就是接着数字 1 的。前者在 crash 的时候保留了状态!

接下来就是揭秘环节:
我们以 flink 消费 kafka 消费位点为例。我们知道 flink 结合 checkpoint 可以将 kafka 实现仅仅执行一次的原理。
那么 flink 是如何管理实现的呢?接下来的知识点可能需要知道 kafka 的工作原理(不了解的需要自行百度一下,我就不介绍了)真的不懂,我就简单介绍一下,并配合 rabbitmq ack 后 删除消息 以示区别。
Flink 中实现的 Kafka 消费者是一个有状态的算子(operator),它集成了 Flink 的检查点机制,它的状态是所有 Kafka 分区的读取偏移量。当一个检查点被触发时,每一个分区的偏移量都被存到了这个检查点中。Flink 的检查点机制保证了所有 operator task 的存储状态都是一致的。

第一步:
如下所示,一个 Kafka topic,有两个 partition,每个 partition 都含有“A”,“B”,“C”,”D”,“E”5 条消息。我们将两个 partition 的偏移量(offset)都设置为 0.

第二步:

Kafka comsumer(消费者)开始从 partition 0 读取消息。消息“A”正在被处理,第一个 consumer 的 offset 变成了 1。

第三步:
消息“A”到达了 Flink Map Task。两个 consumer 都开始读取他们下一条消息(partition 0 读取“B”,partition 1 读取“A”)。各自将 offset 更新成 2 和 1。同时,Flink 的 JobMaster 开始在 source 触发了一个检查点。

第四步:
接下来,由于 source 触发了检查点,Kafka consumer 创建了它们状态的第一个快照(”offset = 2, 1”),并将快照存到了 Flink 的 JobMaster 中。Source 在消息“B”和“A”从 partition 0 和 1 发出后,发了一个 checkpoint barrier。Checkopint barrier 用于各个 operator task 之间对齐检查点,保证了整个检查点的一致性。消息“A”到达了 Flink Map Task,而上面的 consumer 继续读取下一条消息(消息“C”)。

第五步:

Flink Map Task 收齐了同一版本的全部 checkpoint barrier 后,那么就会将它自己的状态也存储到 JobMaster。同时,consumer 会继续从 Kafka 读取消息。

第六步:
Flink Map Task 完成了它自己状态的快照流程后,会向 Flink JobMaster 汇报它已经完成了这个 checkpoint。当所有的 task 都报告完成了它们的状态 checkpoint 后,JobMaster 就会将这个 checkpoint 标记为成功。从此刻开始,这个 checkpoint 就可以用于故障恢复了。值得一提的是,Flink 并不依赖 Kafka offset 从系统故障中恢复。

故障恢复
在发生故障时(比如,某个 worker 挂了),所有的 operator task 会被重启,而他们的状态会被重置到最近一次成功的 checkpoint。Kafka source 分别从 offset 2 和 1 重新开始读取消息(因为这是完成的 checkpoint 中存的 offset)。当作业重启后,我们可以期待正常的系统操作,就好像之前没有发生故障一样。如下图所示:

既然 flink-kafka 是这样实现的,那么我们怎么自定义去使用 state 呢?下面也用程序来举例说明一下:

CheckPointing

(1)介绍,实现方式分类
           checkpoint 可以保存窗口和算子的执行状态,在出现异常之后重启计算任务,并保证已经执行和不会再重复执行,检查点可以分为两种,托管的和自定义的,托管检查点会自动的进行存储到指定位置:内存、磁盘和分布式存储中,自定义就需要自行实现保存相关,实现 checkpoint 有如下两种方式:

使用托管 State 变量
使用自定义 State 变量实现 CheckpointedFunction 接口或者 ListCheckpoint<T extends Serializable> 接口
    下面将会给出两种方式的使用代码
(2)使用 Manage State,Flink 自动实现 state 保存和恢复
下面先给出托管状态变量(manage stata)使用代码,后面给出了代码执行的打印日志。

代码分析:

  • 代码每隔 2s 发送 10 条记录,所有数据 key=1, 会发送到统一窗口进行计数,发送超过 100 条是,抛出异常,模拟异常
  • 窗口中统计收到的消息记录数,当异常发生时,查看 windows 是否会从 state 中获取历史数据,即 checkpoint 是否生效
  • 有个问题有时,state.value()在 open()方法中调用的时候,会抛出 null 异常,而在 apply 中
    使用就不会抛出异常。

Console 日志输出分析:

  • source 发送记录到达 100 抛出异常
  • source 抛出异常之后,count 发送统计数丢失,重新从 0 开始
  • windows 函数,重启后调用 open 函数,获取 state 数据,处理记录数从 checkpoint 中获取恢复,所以从 100 开始

总结:source 没有使用 manage state 状态丢失,windows 使用 manage state,异常状态不丢失

package cn.crawler.mft_seconed.demo3;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by liuliang
 * on 2019/7/15
 */
public class StateCheckPoint {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();

        // 打开并设置 checkpoint
        // 1. 设置 checkpoint 目录,这里我用的是本地路径,记得本地路径要 file:// 开头
        // 2. 设置 checkpoint 类型,at lease onece or EXACTLY_ONCE
        // 3. 设置间隔时间,同时打开 checkpoint 功能
        //
        env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint"));

//        env.setStateBackend(new FsStateBackend("file://D:\\softs\\flink\\state"));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointInterval(1000);


        // 添加 source 每个 2s 发送 10 条数据,key=1,达到 100 条时候抛出异常

        //source 发送记录到达 100 抛出异常
        //source 抛出异常之后,count 发送统计数丢失,重新从 0 开始
        //windows 函数,重启后调用 open 函数,获取 state 数据,处理记录数从 checkpoint 中获取恢复,所以从 100 开始
        // 总结:source 没有使用 manage state 状态丢失,windows 使用 manage state,异常状态不丢失
        // 问: 1. state.value()在 open()方法中调用的时候,会抛出 null 异常,而在 apply 中使用就不会抛出异常。为什么?//    2. 为什么 source 里面没有 open 方法?source 想使用 state 该桌面操作?env.addSource(new SourceFunction<Tuple3<Integer,String,Integer>>() {
            private Boolean isRunning = true;
            private int count = 0;

            @Override
            public void run(SourceContext<Tuple3<Integer, String, Integer>> sourceContext) throws Exception {while(isRunning){for (int i = 0; i < 10; i++) {sourceContext.collect(Tuple3.of(1,"ahah",count));
                        count++;
                    }
                    if(count>100){System.out.println("err_________________");
                        throw new Exception("123");
                    }
                    System.out.println("source:"+count);
                    Thread.sleep(2000);
                }

            }

            @Override
            public void cancel() {}
        }).keyBy(0)

                .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))

                // 窗口函数,比如是 richwindowsfunction 否侧无法使用 manage state
                .apply(new RichWindowFunction<Tuple3<Integer,String,Integer>, Integer, Tuple, TimeWindow>() {
                    private transient ValueState<Integer> state;
                    private int count = 0;
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<Integer> collector) throws Exception {
                        // 从 state 中获取值
                        count=state.value();
                        for(Tuple3<Integer, String, Integer> item : iterable){count++;}
                        // 更新 state 值
                        state.update(count);
                        System.out.println("windows:"+tuple.toString()+""+count+"   state count:"+state.value());
                        collector.collect(count);
                    }


                    // 获取 state
                    @Override
                    public void open(Configuration parameters) throws Exception {System.out.println("##open");
                        ValueStateDescriptor<Integer> descriptor =
                                new ValueStateDescriptor<Integer>(
                                        "average", // the state name
                                        TypeInformation.of(new TypeHint<Integer>() {}), // type information
                                        0);
                        state = getRuntimeContext().getState(descriptor);




                    }
                }).print();
        env.execute();}
}

我们可以看一下我这边打印的日志:

source:10
##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
windows:(1)  10   state count:10
9> 10
source:20
windows:(1)  20   state count:20
9> 20
source:30
windows:(1)  30   state count:30
9> 30
source:40
windows:(1)  40   state count:40
9> 40
source:50
windows:(1)  50   state count:50
9> 50
source:60
windows:(1)  60   state count:60
9> 60
source:70
windows:(1)  70   state count:70
9> 70
source:80
windows:(1)  80   state count:80
9> 80
source:90
windows:(1)  90   state count:90
9> 90
source:100
windows:(1)  100   state count:100
9> 100
err_________________     // 抛出异常
source:10      // 没管理,从头开始(无状态)##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
##open
windows:(1)  110   state count:110   //flink 管理了,哪里跌倒的,从哪里开始(有状态)9> 110
source:20
windows:(1)  120   state count:120
9> 120
source:30
windows:(1)  130   state count:130
9> 130
source:40
windows:(1)  140   state count:140
9> 140
source:50
windows:(1)  150   state count:150
9> 150
source:60
Disconnected from the target VM, address: '127.0.0.1:53266', transport: 'socket'

(3)自定义 state 自行实现实现 checkpoint 接口

实现 CheckpointedFunction 接口或者 ListCheckpoint<T extends Serializable> 接口

分析说明:

         因为需要实现 ListCheckpoint 接口,所以 source 和 windows 处理代码,单独写成了 JAVA 类的形似,实现逻辑和验证方法跟 manage state 相似,但是在如下代码中,Source 和 Window 都实现了 ListCheckpoint 接口,也就是说,Source 抛出异常的时候,Source 和 Window 都将可以从 checkpoint 中获取历史状态,从而达到不丢失状态的能力。

代码列表:

AutoSourceWithCp.java Source 代码
WindowStatisticWithChk.java windows apply 函数代码
CheckPointMain.java 主程序,调用

package cn.crawler.mft_seconed.demo3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 基于上面的提问,自定义一个 state 实现 checkpoint 接口
 * Created by liuliang
 * on 2019/7/15
 */
public class CheckPointMain {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint"));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        env.getCheckpointConfig().setCheckpointInterval(1000);

        /**
         * 说明:
         * 因为需要实现 ListCheckpoint 接口,所以 source 和 windows 处理代码,单独写成了 JAVA 类的形似,* 实现逻辑和验证方法跟 manage state 相似,但是在如下代码中,Source 和 Window 都实现了 ListCheckpoint 接口,* 也就是说,Source 抛出异常的时候,Source 和 Window 都将可以从 checkpoint 中获取历史状态,从而达到不丢失状态的能力。*/
        DataStream<Tuple4<Integer,String,String,Integer>> data = env.setParallelism(1).addSource(new AutoSourceWithCp());
        env.setParallelism(1);
        data.keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                .apply(new WindowStatisticWithChk())
                .print();

        env.execute();}

}


package cn.crawler.mft_seconed.demo3;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by liuliang
 * on 2019/7/15
 */
public class AutoSourceWithCp extends RichSourceFunction<Tuple4<Integer,String,String,Integer>> implements ListCheckpointed<UserState> {
    private int count = 0;
    private boolean is_running = true;

    @Override
    public void run(SourceContext sourceContext) throws Exception {while(is_running){for (int i = 0; i < 10; i++) {sourceContext.collect(Tuple4.of(1, "hello-" + count, "alphabet", count));
                count++;
            }
            System.out.println("source:"+count);
            Thread.sleep(2000);

            if(count>100){System.out.println("准备异常啦.....");
                throw new Exception("exception made by ourself!");
            }
        }
    }

    @Override
    public void cancel() {is_running = false;}

    @Override
    public List<UserState> snapshotState(long l, long l1) throws Exception {List<UserState> listState= new ArrayList<>();
        UserState state = new UserState(count);
        listState.add(state);
        System.out.println(System.currentTimeMillis()+"#############  check point :"+listState.get(0).getCount());
        return listState;
    }

    @Override
    public void restoreState(List<UserState> list) throws Exception {count = list.get(0).getCount();
        System.out.println("AutoSourceWithCp restoreState:"+count);

    }
}





package cn.crawler.mft_seconed.demo3;


import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;


import java.util.ArrayList;
import java.util.List;


import java.util.ArrayList;

/**
 * Created by liuliang
 * on 2019/7/15
 */
public class WindowStatisticWithChk implements WindowFunction<Tuple4<Integer,String,String,Integer>,Integer,Tuple,TimeWindow> ,ListCheckpointed<UserState> {
    private int total = 0;
    @Override
    public List<UserState> snapshotState(long l, long l1) throws Exception {List<UserState> listState= new ArrayList<>();
        UserState state = new UserState(total);
        listState.add(state);
        return listState;
    }

    @Override
    public void restoreState(List<UserState> list) throws Exception {total = list.get(0).getCount();}

    @Override
    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Integer, String, String, Integer>> iterable, Collector<Integer> collector) throws Exception {
        int count  =  0;
        for(Tuple4<Integer, String, String, Integer> data : iterable){
            count++;
            System.out.println("apply key"+tuple.toString()+"count:"+data.f3+" "+data.f0);
        }
        total = total+count;
        System.out.println("windows  total:"+total+"count:"+count+" ");
        collector.collect(count);
    }


}


package cn.crawler.mft_seconed.demo3;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * Created by liuliang
 * on 2019/7/15
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserState implements Serializable {private Integer count;}


特别说明,1. 码字不易。

    2. 我的代码都放在了 github 上:欢迎 start。https://github.com/iamcrawler/flink-learnning
    3. 本文参照了 flink 官方文档以及一下文档:

https://www.jianshu.com/p/efa…
https://blog.csdn.net/u013560…

退出移动版