关于Flink:Flink-手写ATLEASTONCE语义的Source

39次阅读

共计 14629 个字符,预计需要花费 37 分钟才能阅读完成。

Flink 手写 AT_LEAST_ONCE 语义的 Source

需要

老王交给个工作,心愿我用 Flink 实现一个简略的数据 ETL。须要实时拉取数据,保证数据不丢。
我想了想整顿这篇文章。仅供提供思路,并把相干常识串起来。

Flink 自身提供了 CDC 性能更加弱小不便。尽管如此,然而我想通过 SourceFunction 实现 Mysql 实时数据拉取。
简略思路:


为了保障起码一次读取数据(数据不丢),应用 状态存储 查问到最新截止工夫。这里须要自定义实现数据库连贯。
应用官网 jdbc-sink 写入 mysql

应用到知识点

  • 自定义 SourceFunction
  • 状态 Operator State(_non-keyed state_)
  • 检查点 CheckPoint
  • 状态后端 StateBackend
  • 数据库写入 Mysql Sink

1. 自定义 SourceFunction

Flink 对数据的产生通过提供了对立的接口,不便用户应用。

首先看一张类继承依赖图:

咱们只须要重点关注上面 3 个类就好,首先看下他们的接口定义:

  • SourceFunction:Flink 中所有流数据源的根本接口
  • RichSourceFunction:用于实现能够拜访上下文信息的数据源的基类
  • RichParallelSourceFunction:用于实现并行数据源的基类

1.1 SourceFunction 接口

再来看看 SourceFunction 接口的罕用办法:

@Public
public interface SourceFunction<T> extends Function, Serializable {
    /**
     * 启动该数据源发送元素
     */
    void run(SourceContext<T> ctx) throws Exception;
     /**
     * 勾销这个源的数据发送
     */
    void cancel();}

SourceFunction 实现例子:

/**
 * 不带上下文 不反对并发
 */
public class E02CustomizeSource implements SourceFunction<OrderInfo> {
    // 运行标记位
    private volatile boolean isRunning = true;
    
    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {while (isRunning) {OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderNo(System.currentTimeMillis() + "" + new Random().nextInt(10));
            orderInfo.setItemId(1000);
            orderInfo.setItemName("iphone 12pro max");
            orderInfo.setPrice(9800D);
            orderInfo.setQty(1);
            ctx.collect(orderInfo);
            Thread.sleep(new Random().nextInt(10) * 1000);

        }
    }
    
    @Override
    public void cancel() {isRunning = false;}
}

从例子能够看出 run 办法中只有 isRunning!=false 就能够始终运行下,cancel 能够批改这个标记位,

  • SourceContext<OrderInfo> ctx

    •  ctx.collect(元素); 咱们通过该办法发送元素数据

1.2 RichSourceFunction 形象办法

@Public
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {private static final long serialVersionUID = 1L;}

该办法外围办法继承自 SourceFunction 然而为了提供上下文信息又继承了 AbstractRichFunction 且额定提供了 资源初始化:open()  和  资源清理 close()  别离在 run() 运行前执行和运行后运行,罕用比方 Mysql 连贯关上和敞开。

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    
    private transient RuntimeContext runtimeContext;

    // 该办法能够获取上下文
    @Override
    public RuntimeContext getRuntimeContext() {if (this.runtimeContext != null) {return this.runtimeContext;} else {throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }
    // 资源初始化
    @Override
    public void open(Configuration parameters) throws Exception {}
    // 资源敞开
    @Override
    public void close() throws Exception {}
}

RichSourceFunction 实现实例:

/**
 * 带上下文 不反对并发
 * 而且减少了两个资源关上敞开办法 能够再 run 之前调用
 */
public class E03CustomizeRichSource extends RichSourceFunction<OrderInfo> {
    private transient Connection connection;
    private transient PreparedStatement ps;
    // 运行标记位
    private volatile boolean isRunning = true;

    /**
     * run 之前调用
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);
        // 通过高低问获取全局配置对象且读取配置信息
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        String url = parameterTool.get("url");
        String username = parameterTool.get("username");
        String password = parameterTool.get("password");

        // 初始化
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection(url, username, password);
        ps = connection.prepareStatement("select * from order_info");

    }

    /**
     *  run 之后调用
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {super.close();
        ps.close();
        connection.close();}

    @Override
    public void run(SourceContext<OrderInfo> ctx) throws Exception {try (ResultSet resultSet = ps.executeQuery()) {while (resultSet.next()) {
                //todo ...
                //ctx.
            }
        }
    }

    @Override
    public void cancel() {isRunning = false;}
}

1.3 RichParallelSourceFunction 形象办法

@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
        implements ParallelSourceFunction<OUT> {private static final long serialVersionUID = 1L;}

从名字就能够晓得这是一个并行数据源,他只是把 SourceFunction 替换了 ParallelSourceFunction
SourceFunctionRichSourceFunction  并行度设置只能为 1 否则会报错 The parallelism of non parallel operator must be 1. _。_ RichParallelSourceFunction  则不受影响。

java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
    at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:37)
    at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:103)
    at flinklearning._1source.DemoMain.main(DemoMain.java:30)

1.4 需要选型

咱们晓得咱们查询数据库并行查问,会查问反复数据,所以没必须要应用 RichParallelSourceFunction
咱们须要应用 mysql。须要进行资源操作,所以最好应用 RichSourceFunction  

2. 检查点 CheckPoint & 状态 Operator State

Flink 是有状态的流解决框架。自己了解:状态是某个工夫节点元素产生事件的形容。
比方:你想治理历史数据的时候,你能够将该历史数据存储在状态外面,这样你能够在将来某个工夫拜访。

2.1 状态的持久性

Flink 通过 CheckPoint(检查点) 将 state(状态) 长久化在状态后端。如果程序失败或异样重启,能够通过长久化的检查点复原到过后状态。
罕用后端:

  • MemoryStateBackend(默认)
  • FsStateBackend
  • RocksDBStateBackend

_
_
_

2.2 检查点触发


Flink 默认是不开启检查点,用户能够 StreamExecutionEnvironment.enableCheckpointing(2000`)` 设置。
Flink 会依据用户设置的工夫 周期有序的从 source 往数据流中插入 barrier(栏栅)

如:通过下图咱们初步理解下 检查点机制 并行度为一状况

  • 工夫线 1

    • Source  端网数据流中插入 checkPoint 1
  • 工夫线 2

    • checkPoint 1 进入 Map 触发 State 长久化,并持续往上游算子数据流中传送。
  • 工夫线 3

    • checkPoint 1 进入 Sink 触发 State 长久化,整个检查点触发结束。

2.3 Operator State 应用

Operator State,只反对 ListState 状态存储构造和播送状态构造。其实还有另一种类型状态,这里咱们先只记住这个,有机会前面独自讲讲区别和应用。
应用 Operator State 须要联合CheckpointedFunction 接口应用。算子实现该接口,重写其中的两个办法:

  • snapshotState 快照状态    触发检查点快照的时候
  • initializeState 初始化状态 再构造函数后调用
public interface CheckpointedFunction{
    // 触发检查点的时会调用此办法
    void snapshotState(FunctionSnapshotContext context) throws Exception;
  // 创立函数的时候回嗲用此办法,能够获取状态信息 复原嘻嘻
  void initializeState(FunctionInitializationContext context) throws Exception;
}

代码示例:

 public class E05ReadMysqlRichSource02 implements CheckpointedFunction {
  /**
     * 存储 state 的变量.
     */
    private ListState<LocalDateTime> startDateTimeState;
 /**
     * 检查点快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
        startDateTimeState.clear();
        startDateTimeState.add(startDateTime);
        System.out.println("快照后果:" + startDateTime);
    }

    /**
     * 启动时执行快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
        // 定义状态形容
        ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
        startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
        // 从状态中复原  
        if (context.isRestored()) {for (LocalDateTime localDateTime : startDateTimeState.get()) {startDateTime = localDateTime;}
            System.out.println("复原后果:" + startDateTime);
        }
    }
}

2.4 需要剖析

下面咱们理解快照的作用,和怎么应用状态。咱们就能够解决需要中 最新查问截止工夫 不会因为程序故障失落,实现起码一次读取。

3. 了解自定义实现的 SourceFunction 生命周期

咱们尝试来编写 MysqlReadSourceFunction 首先继承 RichSourceFunction 实现数据发送;实现**CheckpointedFunction** 来保障数据完整性。然而提供了那么办法谁先执行谁后执行,关系是什么?

/**
 * 实现自定实现 Mysql 数据读取器
 * 反对 定点读取和切好一次读取
 */
public class MysqlReadSourceFunction extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {public MysqlReadSourceFunction() {super();
        System.err.println("第一执行 1 构造方法");

    }

    @Override
    public void open(Configuration parameters) throws Exception {System.err.println("第三执行 open---------------");
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {System.err.println("第五执行 close---------------");
        super.close();}

    @Override
    public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {System.err.println("第三四 run---------------");
    }

    @Override
    public void cancel() {System.err.println("勾销时候执行 cancel---------------");
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
    }
}
  1. 首先必定是结构器
  2. 其次会调用 initializeState 办法,进行数据状态复原
  3. 再而 open() 办法 初始化资源
  4. 紧接着 就进入真正执行 执行数据发送 run() 外围办法
  5. 此时如果程序勾销了,会调用 cancel() 能够再外面敞开 run() 外面的循环, 它会在线程中断前执行。
  6. 接着会调用 清理 close()
  7. 最初完结

3. 需要实现,就是填写代码了

/**
 * 实现自定实现 Mysql 数据读取器
 * 反对 定点读取和至多一次
 */
public class E05ReadMysqlRichSource02 extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {
    // 常量名称
    public static final String url = "url";
    public static final String username = "username";
    public static final String password = "password";
    // 数据库连贯资源对象
    private transient Connection connection;
    private transient PreparedStatement statement;
    // 查问 SQL
    private String querySql;
    // 是否持续运行 
    private volatile boolean isRunning = true;

    // 查问开始工夫
    private volatile LocalDateTime startDateTime;
    // 工夫增长步长  单位分钟
    private volatile int timeStep = 1;


    /**
     * 存储 state 的变量.
     */
    private ListState<LocalDateTime> startDateTimeState;


    public E05ReadMysqlRichSource02(String querySql, LocalDateTime startDateTime) {super();
        // 设置查问 SQL
        this.querySql = querySql;
        // 初始化查问其实工夫
        this.startDateTime = startDateTime;

    }

    /**
     * 在 run 办法前执行
     * 进行资源初始化
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);
        // 获取系统配置
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        // 创立连贯资源
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection(parameterTool.get(url), parameterTool.get(username), parameterTool.get(password));
        statement = connection.prepareStatement(querySql);
    }

    /**
     * 资源进行敞开 在线程中断前执行
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {super.close();
        System.err.println("第五执行 close---------------");
        isRunning = false;
        // 敞开资源连贯
        if (statement != null) {statement.close();
        }
        if (connection != null) {connection.close();
        }


    }

    /**
     * 在反对只反对不会并行执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 获取检查点锁
        final Object lock = ctx.getCheckpointLock();
        // 循环遍历 
        while (isRunning) {

            // 依照 工夫步长生成  查问起止工夫
            String startDateTimeStr = dtf.format(startDateTime);
            LocalDateTime endDateTime = startDateTime.plusMinutes(timeStep);
            if (endDateTime.isAfter(LocalDateTime.now())) {
                // 最新截止工夫 大于零碎工夫休眠后跳出循环
                Thread.sleep(timeStep * 60000);
                continue;
            }
            String endDateTimeStr = dtf.format(endDateTime);
            // 设置参数
            statement.setString(1, startDateTimeStr);
            statement.setString(2, endDateTimeStr);

            // 查问数据且封装参数
            List<TraceSegmentRecordInfo> containerList = new ArrayList<>();
            ResultSet resultSet = statement.executeQuery();
            while (resultSet.next()) {TraceSegmentRecordInfo info = new TraceSegmentRecordInfo();
                info.setSegmentId(resultSet.getString("segment_id"));
                info.setTraceId(resultSet.getString("trace_id"));
                info.setServiceName(resultSet.getString("service_name"));
                info.setServiceIp(resultSet.getString("service_ip"));
                info.setEndpointName(resultSet.getString("endpoint_name"));
                info.setDataBinary(resultSet.getString("data_binary"));
                info.setTimeBucket(resultSet.getLong("time_bucket"));
                info.setStartTime(resultSet.getLong("start_time"));
                info.setEndTime(resultSet.getLong("end_time"));
                info.setLatency(resultSet.getInt("latency"));
                info.setIsError(resultSet.getInt("is_error"));
                info.setCreateDate(resultSet.getDate("create_time"));
                info.setStatement(resultSet.getString("statement"));
                containerList.add(info);
            }
            System.out.println(String.format("执行后果[%s - %s] 查问条数 %s", startDateTimeStr, endDateTimeStr, containerList.size()));
            if (CollectionUtils.isNotEmpty(containerList)) {for (Iterator<TraceSegmentRecordInfo> iterator = containerList.iterator(); iterator.hasNext();) {
                    // 发送数据
                    ctx.collect(iterator.next());
                }
            }

            synchronized (lock) {
                // 推送结束 最初应用 endDateTime 设置为起始工夫
                startDateTime = endDateTime;
            }
        }
    }

    @Override
    public void cancel() {System.err.println("勾销时候执行 cancel---------------");
        // 设置进行 run 办法循环 
        isRunning = false;

    }

    /**
     * 检查点快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
        // 快照最新状态
        startDateTimeState.clear();
        startDateTimeState.add(startDateTime);
        System.out.println("快照后果:" + startDateTime);
    }

    /**
     * 启动时执行快照复原
     *
     * @param context
     * @throws Exception
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
        // 定义状态形容
        ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
        startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
        // 从状态中复原,if (context.isRestored()) {
            // 只有重状态复原 才会进来,失常启动 isRestored  = false
            for (LocalDateTime localDateTime : startDateTimeState.get()) {startDateTime = localDateTime;}
            System.out.println("复原后果:" + startDateTime);
        }
    }
}
public class DemoMain {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 本人实现带环境的零碎参数解析
        ParameterTool parameterTool = ParameterToolEnvironmentUtils.createParameterTool(args);
        env.getConfig().setGlobalJobParameters(parameterTool);
        // 开启检查点
        env.enableCheckpointing(2000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 设置文件系统作为状态后端
        FsStateBackend fsStateBackend = new FsStateBackend("file:///usr/flink-1.12.1/tmp");
        env.setStateBackend(fsStateBackend);
        // 执行后果[2021-02-09 07:39:01 - 2021-02-09 07:40:01] 查问条数 177
        String sql = "SELECT * from trace_segment_record where create_time >= ? AND create_time < ?";
        //source
        DataStreamSource<TraceSegmentRecordInfo> source2 = env.addSource(new E05ReadMysqlRichSource02(sql, LocalDateTime.of(2021, 01, 8, 0, 0, 01))).setParallelism(2);
        
        // 过滤
        SingleOutputStreamOperator<TraceSegmentRecordInfo> filter = source2.filter(new FilterFunction<TraceSegmentRecordInfo>() {
            @Override
            public boolean filter(TraceSegmentRecordInfo value) throws Exception {
                // 过滤等于 null 的数据
                return value.getEndpointName() != null;}
        });

        //sink
        JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder().withBatchIntervalMs(2000L).withBatchSize(5000).build();
        JdbcConnectionOptions connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder())
                .withUrl("jdbc:mysql://xxxx:3306/yto_data_pipeline_init?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("xxx")
                .withPassword("xxxx").build();
        final String sinkSql = "REPLACE INTO trace_segment_record (segment_id, trace_id, service_name, service_ip, endpoint_name, start_time, end_time,latency,is_error, data_binary,time_bucket,create_time,statement) VALUE(?,?,?,?,?,?,?,?,?,?,?,?,?)";

        filter.addSink(JdbcSink.sink(sinkSql, (ps, v) -> {ps.setString(1, v.getSegmentId());
            ps.setString(2, v.getTraceId());
            ps.setString(3, v.getServiceName());
            ps.setString(4, v.getServiceIp());
            ps.setString(5, v.getEndpointName());
            ps.setLong(6, v.getStartTime());
            ps.setLong(7, v.getEndTime());
            ps.setLong(8, v.getLatency());
            ps.setInt(9, v.getIsError());
            ps.setString(10, v.getDataBinary());
            ps.setLong(11, v.getTimeBucket());
            ps.setTimestamp(12, new Timestamp(v.getCreateDate().getTime()));
            ps.setString(13, v.getStatement());
        }, executionOptions, connectionOptions)).uid("sink_mysql");

        env.execute("aaa");
    }
}

3.1 中断写入演示


我在执行到 2021-01-08 00:32:01 勾销了工作,发现零碎曾经存储快照了检查点。

咱们应用检查点重启我的项目在察看日志是否从 2021-01-08 00:32:01 开始打印!


其实从日志能够看到 我打印了日志不代表那个工夫节点曾经长久化到数据状态后端,
这里获得上一次的工夫节点 2021-01-08 00:30:01
数据也确实是从上次检查点开始查问的。

其实大家也发现了,数据反复读取了,这个也是文章结尾起码一次读取,局部反复,最受是通过数据唯一性查问语句实现数据一致性的。

正文完
 0