关于Flink:Flink-手写ATLEASTONCE语义的Source

Flink 手写AT_LEAST_ONCE语义的Source

需要

老王交给个工作,心愿我用Flink实现一个简略的数据ETL。 须要实时拉取数据,保证数据不丢。
我想了想整顿这篇文章。仅供提供思路,并把相干常识串起来。

Flink自身提供了CDC性能更加弱小不便。尽管如此,然而我想通过 SourceFunction 实现Mysql实时数据拉取。
简略思路:


为了保障起码一次读取数据(数据不丢),应用状态存储查问到最新截止工夫。这里须要自定义实现数据库连贯。
应用官网jdbc-sink写入mysql

应用到知识点

  • 自定义 SourceFunction
  • 状态 Operator State (_non-keyed state_)
  • 检查点 CheckPoint
  • 状态后端 StateBackend
  • 数据库写入 Mysql Sink

1. 自定义 SourceFunction

Flink 对数据的产生通过提供了对立的接口,不便用户应用。

首先看一张类继承依赖图:

咱们只须要重点关注上面3个类就好,首先看下他们的接口定义:

  • SourceFunction: Flink中所有流数据源的根本接口
  • RichSourceFunction : 用于实现能够拜访上下文信息的数据源的基类
  • RichParallelSourceFunction:用于实现并行数据源的基类

1.1 SourceFunction接口

再来看看SourceFunction接口的罕用办法:

@Public
public interface SourceFunction<T> extends Function, Serializable {
    /**
     * 启动该数据源发送元素
     */
    void run(SourceContext<T> ctx) throws Exception;
     /**
     * 勾销这个源的数据发送
     */
    void cancel();
}

SourceFunction实现例子:

/**
 * 不带上下文 不反对并发
 */
public class E02CustomizeSource implements SourceFunction<OrderInfo> {
    //运行标记位
    private volatile boolean isRunning = true;
    
    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {
        while (isRunning) {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderNo(System.currentTimeMillis() + "" + new Random().nextInt(10));
            orderInfo.setItemId(1000);
            orderInfo.setItemName("iphone 12pro max");
            orderInfo.setPrice(9800D);
            orderInfo.setQty(1);
            ctx.collect(orderInfo);
            Thread.sleep(new Random().nextInt(10) * 1000);

        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

从例子能够看出 run 办法中只有 isRunning!=false 就能够始终运行下 , cancel 能够批改这个标记位,

  • SourceContext<OrderInfo> ctx

    •  ctx.collect(元素); 咱们通过该办法发送元素数据

1.2 RichSourceFunction 形象办法

@Public
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {

    private static final long serialVersionUID = 1L;
}

该办法外围办法继承自 SourceFunction 然而为了提供上下文信息又继承了 AbstractRichFunction 且额定提供了 资源初始化: open()  和  资源清理 close()  别离在 run() 运行前执行和运行后运行,罕用比方Mysql连贯关上和敞开。

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    
    private transient RuntimeContext runtimeContext;

    //该办法能够获取上下文
    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }
    //资源初始化
    @Override
    public void open(Configuration parameters) throws Exception {}
    //资源敞开
    @Override
    public void close() throws Exception {}
}

RichSourceFunction 实现实例:

/**
 * 带上下文 不反对并发
 * 而且减少了两个资源关上敞开办法 能够再run之前调用
 */
public class E03CustomizeRichSource extends RichSourceFunction<OrderInfo> {
    private transient Connection connection;
    private transient PreparedStatement ps;
    //运行标记位
    private volatile boolean isRunning = true;

    /**
     * run 之前调用
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //通过高低问获取全局配置对象且读取配置信息
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String url = parameterTool.get("url");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");

        //初始化
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection(url, username, password);
        ps = connection.prepareStatement("select * from order_info");

    }

    /**
     *  run 之后调用
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        ps.close();
        connection.close();
    }

    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {
        try (ResultSet resultSet = ps.executeQuery()) {
            while (resultSet.next()) {
                //todo ...
                //ctx.
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

1.3 RichParallelSourceFunction 形象办法

@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
        implements ParallelSourceFunction<OUT> {

    private static final long serialVersionUID = 1L;
}

从名字就能够晓得这是一个并行数据源,他只是把 SourceFunction 替换了 ParallelSourceFunction 。
SourceFunctionRichSourceFunction  并行度设置只能为 1 否则会报错 The parallelism of non parallel operator must be 1. _ 。 _ RichParallelSourceFunction  则不受影响。

java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
    at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:37)
    at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:103)
    at flinklearning._1source.DemoMain.main(DemoMain.java:30)

1.4 需要选型

咱们晓得咱们查询数据库并行查问,会查问反复数据,所以没必须要应用 RichParallelSourceFunction
咱们须要应用mysql。须要进行资源操作,所以最好应用 RichSourceFunction  

2. 检查点 CheckPoint & 状态 Operator State

Flink 是有状态的流解决框架。自己了解:状态是某个工夫节点元素产生事件的形容。
比方:你想治理历史数据的时候,你能够将该历史数据存储在状态外面,这样你能够在将来某个工夫拜访。

2.1 状态的持久性

Flink 通过 CheckPoint(检查点) 将 state(状态) 长久化在状态后端。如果程序失败或异样重启,能够通过长久化的检查点复原到过后状态。
罕用后端:

  • MemoryStateBackend (默认)
  • FsStateBackend
  • RocksDBStateBackend

_
_
_

2.2 检查点触发


Flink 默认是不开启检查点,用户能够 StreamExecutionEnvironment.enableCheckpointing(2000`)`设置。
Flink 会依据用户设置的工夫 周期有序的从source往数据流中插入 barrier(栏栅)

如:通过下图咱们初步理解下 检查点机制 并行度为一状况

  • 工夫线 1

    • Source  端网数据流中插入 checkPoint 1
  • 工夫线 2

    • checkPoint 1 进入 Map 触发 State 长久化 ,并持续往上游算子数据流中传送。
  • 工夫线3

    • checkPoint 1 进入 Sink 触发 State 长久化 ,整个检查点触发结束。

2.3 Operator State 应用

Operator State ,只反对 ListState 状态存储构造和播送状态构造。其实还有另一种类型状态,这里咱们先只记住这个,有机会前面独自讲讲区别和应用。
应用Operator State须要联合CheckpointedFunction 接口应用。算子实现该接口,重写其中的两个办法:

  • snapshotState 快照状态    触发检查点快照的时候
  • initializeState 初始化状态 再构造函数后调用
public interface CheckpointedFunction{
    //触发检查点的时会调用此办法
    void snapshotState(FunctionSnapshotContext context) throws Exception;
  //创立函数的时候回嗲用此办法,能够获取状态信息 复原嘻嘻
  void initializeState(FunctionInitializationContext context) throws Exception;
}

代码示例:

 public class E05ReadMysqlRichSource02 implements CheckpointedFunction {
  /**
     * 存储 state 的变量.
     */
    private ListState<LocalDateTime> startDateTimeState;
 /**
     * 检查点快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        System.err.println("快照时候执行 snapshotState---------------");
        startDateTimeState.clear();
        startDateTimeState.add(startDateTime);
        System.out.println("快照后果: " + startDateTime);
    }

    /**
     * 启动时执行快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.err.println("第二执行 initializeState---------------");
        //定义状态形容
        ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
        startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
        //从状态中复原  
        if (context.isRestored()) {
            for (LocalDateTime localDateTime : startDateTimeState.get()) {
                startDateTime = localDateTime;
            }
            System.out.println("复原后果:" + startDateTime);
        }
    }
}

2.4 需要剖析

下面咱们理解快照的作用,和怎么应用状态。咱们就能够解决需要中 最新查问截止工夫 不会因为程序故障失落,实现起码一次读取。

3. 了解自定义实现的SourceFunction生命周期

咱们尝试来编写 MysqlReadSourceFunction 首先继承 RichSourceFunction 实现数据发送;实现**CheckpointedFunction** 来保障数据完整性。然而提供了那么办法谁先执行谁后执行,关系是什么?

/**
 * 实现自定实现 Mysql 数据读取器
 * 反对 定点读取和切好一次读取
 */
public class MysqlReadSourceFunction extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {
    public MysqlReadSourceFunction() {
        super();
        System.err.println("第一执行 1构造方法");

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        System.err.println("第三执行 open---------------");
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        System.err.println("第五执行 close---------------");
        super.close();
    }

    @Override
    public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {
        System.err.println("第三四 run---------------");
    }

    @Override
    public void cancel() {
        System.err.println("勾销时候执行 cancel---------------");
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        System.err.println("快照时候执行 snapshotState---------------");
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

        System.err.println("第二执行 initializeState---------------");
    }
}
  1. 首先必定是结构器
  2. 其次会调用 initializeState 办法,进行数据状态复原
  3. 再而 open() 办法 初始化资源
  4. 紧接着 就进入真正执行 执行数据发送 run() 外围办法
  5. 此时如果程序勾销了,会调用 cancel() 能够再外面敞开 run() 外面的循环,它会在线程中断前执行。
  6. 接着会调用 清理 close()
  7. 最初完结

3. 需要实现,就是填写代码了

/**
 * 实现自定实现 Mysql 数据读取器
 * 反对 定点读取和至多一次
 */
public class E05ReadMysqlRichSource02 extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {
    //常量名称
    public static final String url = "url";
    public static final String username = "username";
    public static final String password = "password";
    //数据库连贯资源对象
    private transient Connection connection;
    private transient PreparedStatement statement;
    //查问SQL
    private String querySql;
    //是否持续运行 
    private volatile boolean isRunning = true;

    //查问开始工夫
    private volatile LocalDateTime startDateTime;
    //工夫增长步长  单位分钟
    private volatile int timeStep = 1;


    /**
     * 存储 state 的变量.
     */
    private ListState<LocalDateTime> startDateTimeState;


    public E05ReadMysqlRichSource02(String querySql, LocalDateTime startDateTime) {
        super();
        //设置查问SQL
        this.querySql = querySql;
        //初始化查问其实工夫
        this.startDateTime = startDateTime;

    }

    /**
     * 在run办法前执行
     * 进行资源初始化
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //获取系统配置
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        //创立连贯资源
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection(parameterTool.get(url), parameterTool.get(username), parameterTool.get(password));
        statement = connection.prepareStatement(querySql);
    }

    /**
     * 资源进行敞开 在线程中断前执行
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        System.err.println("第五执行 close---------------");
        isRunning = false;
        //敞开资源连贯
        if (statement != null) {
            statement.close();
        }
        if (connection != null) {
            connection.close();
        }


    }

    /**
     * 在反对只反对不会并行执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {
        final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        //获取检查点锁
        final Object lock = ctx.getCheckpointLock();
        //循环遍历 
        while (isRunning) {

            //依照 工夫步长生成  查问起止工夫
            String startDateTimeStr = dtf.format(startDateTime);
            LocalDateTime endDateTime = startDateTime.plusMinutes(timeStep);
            if (endDateTime.isAfter(LocalDateTime.now())) {
                //最新截止工夫 大于零碎工夫休眠后跳出循环
                Thread.sleep(timeStep * 60000);
                continue;
            }
            String endDateTimeStr = dtf.format(endDateTime);
            //设置参数
            statement.setString(1, startDateTimeStr);
            statement.setString(2, endDateTimeStr);

            //查问数据且封装参数
            List<TraceSegmentRecordInfo> containerList = new ArrayList<>();
            ResultSet resultSet = statement.executeQuery();
            while (resultSet.next()) {
                TraceSegmentRecordInfo info = new TraceSegmentRecordInfo();
                info.setSegmentId(resultSet.getString("segment_id"));
                info.setTraceId(resultSet.getString("trace_id"));
                info.setServiceName(resultSet.getString("service_name"));
                info.setServiceIp(resultSet.getString("service_ip"));
                info.setEndpointName(resultSet.getString("endpoint_name"));
                info.setDataBinary(resultSet.getString("data_binary"));
                info.setTimeBucket(resultSet.getLong("time_bucket"));
                info.setStartTime(resultSet.getLong("start_time"));
                info.setEndTime(resultSet.getLong("end_time"));
                info.setLatency(resultSet.getInt("latency"));
                info.setIsError(resultSet.getInt("is_error"));
                info.setCreateDate(resultSet.getDate("create_time"));
                info.setStatement(resultSet.getString("statement"));
                containerList.add(info);
            }
            System.out.println(String.format("执行后果[%s - %s] 查问条数%s", startDateTimeStr, endDateTimeStr, containerList.size()));
            if (CollectionUtils.isNotEmpty(containerList)) {
                for (Iterator<TraceSegmentRecordInfo> iterator = containerList.iterator(); iterator.hasNext(); ) {
                    //发送数据
                    ctx.collect(iterator.next());
                }
            }

            synchronized (lock) {
                //推送结束 最初应用endDateTime 设置为起始工夫
                startDateTime = endDateTime;
            }
        }
    }

    @Override
    public void cancel() {
        System.err.println("勾销时候执行 cancel---------------");
        //设置进行run办法循环 
        isRunning = false;

    }

    /**
     * 检查点快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        System.err.println("快照时候执行 snapshotState---------------");
        //快照最新状态
        startDateTimeState.clear();
        startDateTimeState.add(startDateTime);
        System.out.println("快照后果: " + startDateTime);
    }

    /**
     * 启动时执行快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.err.println("第二执行 initializeState---------------");
        //定义状态形容
        ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
        startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
        //从状态中复原,
        if (context.isRestored()) {
            //只有重状态复原 才会进来,失常启动 isRestored  = false
            for (LocalDateTime localDateTime : startDateTimeState.get()) {
                startDateTime = localDateTime;

            }
            System.out.println("复原后果:" + startDateTime);
        }
    }
}
public class DemoMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //本人实现带环境的零碎参数解析
        ParameterTool parameterTool = ParameterToolEnvironmentUtils.createParameterTool(args);
        env.getConfig().setGlobalJobParameters(parameterTool);
        //开启检查点
        env.enableCheckpointing(2000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置文件系统作为状态后端
        FsStateBackend fsStateBackend = new FsStateBackend("file:///usr/flink-1.12.1/tmp");
        env.setStateBackend(fsStateBackend);
        // 执行后果[2021-02-09 07:39:01 - 2021-02-09 07:40:01] 查问条数177
        String sql = "SELECT * from trace_segment_record where create_time >= ? AND create_time < ?";
        //source
        DataStreamSource<TraceSegmentRecordInfo> source2 = env.addSource(new E05ReadMysqlRichSource02(sql, LocalDateTime.of(2021, 01, 8, 0, 0, 01))).setParallelism(2);
        
        //过滤
        SingleOutputStreamOperator<TraceSegmentRecordInfo> filter = source2.filter(new FilterFunction<TraceSegmentRecordInfo>() {
            @Override
            public boolean filter(TraceSegmentRecordInfo value) throws Exception {
                //过滤等于null的数据
                return value.getEndpointName() != null;
            }
        });

        //sink
        JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder().withBatchIntervalMs(2000L).withBatchSize(5000).build();
        JdbcConnectionOptions connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder())
                .withUrl("jdbc:mysql://xxxx:3306/yto_data_pipeline_init?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("xxx")
                .withPassword("xxxx").build();
        final String sinkSql = "REPLACE INTO trace_segment_record (segment_id, trace_id, service_name, service_ip, endpoint_name, start_time, end_time,latency,is_error, data_binary,time_bucket,create_time,statement) VALUE(?,?,?,?,?,?,?,?,?,?,?,?,?)";

        filter.addSink(JdbcSink.sink(sinkSql, (ps, v) -> {
            ps.setString(1, v.getSegmentId());
            ps.setString(2, v.getTraceId());
            ps.setString(3, v.getServiceName());
            ps.setString(4, v.getServiceIp());
            ps.setString(5, v.getEndpointName());
            ps.setLong(6, v.getStartTime());
            ps.setLong(7, v.getEndTime());
            ps.setLong(8, v.getLatency());
            ps.setInt(9, v.getIsError());
            ps.setString(10, v.getDataBinary());
            ps.setLong(11, v.getTimeBucket());
            ps.setTimestamp(12, new Timestamp(v.getCreateDate().getTime()));
            ps.setString(13, v.getStatement());
        }, executionOptions, connectionOptions)).uid("sink_mysql");

        env.execute("aaa");
    }
}

3.1 中断写入演示


我在执行到 2021-01-08 00:32:01 勾销了工作,发现零碎曾经存储快照了检查点。

咱们应用检查点重启我的项目在察看日志是否从 2021-01-08 00:32:01 开始打印!


其实从日志能够看到 我打印了日志不代表那个工夫节点曾经长久化到数据状态后端,
这里获得上一次的工夫节点 2021-01-08 00:30:01
数据也确实是从上次检查点开始查问的。

其实大家也发现了,数据反复读取了,这个也是文章结尾起码一次读取,局部反复,最受是通过数据唯一性查问语句实现数据一致性的。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理