Flink 手写 AT_LEAST_ONCE 语义的 Source
需要
老王交给个工作,心愿我用 Flink 实现一个简略的数据 ETL。须要实时拉取数据,保证数据不丢。
我想了想整顿这篇文章。仅供提供思路,并把相干常识串起来。
Flink 自身提供了 CDC 性能更加弱小不便。尽管如此,然而我想通过 SourceFunction 实现 Mysql 实时数据拉取。
简略思路:
为了保障起码一次读取数据(数据不丢),应用 状态存储 查问到最新截止工夫。这里须要自定义实现数据库连贯。
应用官网 jdbc-sink 写入 mysql
应用到知识点
- 自定义 SourceFunction
- 状态 Operator State(_non-keyed state_)
- 检查点 CheckPoint
- 状态后端 StateBackend
- 数据库写入 Mysql Sink
1. 自定义 SourceFunction
Flink 对数据的产生通过提供了对立的接口,不便用户应用。
首先看一张类继承依赖图:
咱们只须要重点关注上面 3 个类就好,首先看下他们的接口定义:
- SourceFunction:Flink 中所有流数据源的根本接口
- RichSourceFunction:用于实现能够拜访上下文信息的数据源的基类
- RichParallelSourceFunction:用于实现并行数据源的基类
1.1 SourceFunction 接口
再来看看 SourceFunction 接口的罕用办法:
@Public
public interface SourceFunction<T> extends Function, Serializable {
/**
* 启动该数据源发送元素
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* 勾销这个源的数据发送
*/
void cancel();}
SourceFunction 实现例子:
/**
* 不带上下文 不反对并发
*/
public class E02CustomizeSource implements SourceFunction<OrderInfo> {
// 运行标记位
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception {while (isRunning) {OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderNo(System.currentTimeMillis() + "" + new Random().nextInt(10));
orderInfo.setItemId(1000);
orderInfo.setItemName("iphone 12pro max");
orderInfo.setPrice(9800D);
orderInfo.setQty(1);
ctx.collect(orderInfo);
Thread.sleep(new Random().nextInt(10) * 1000);
}
}
@Override
public void cancel() {isRunning = false;}
}
从例子能够看出 run
办法中只有 isRunning!=false
就能够始终运行下,cancel
能够批改这个标记位,
-
SourceContext<OrderInfo> ctx
- ctx.collect(元素); 咱们通过该办法发送元素数据
1.2 RichSourceFunction 形象办法
@Public
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {private static final long serialVersionUID = 1L;}
该办法外围办法继承自 SourceFunction
然而为了提供上下文信息又继承了 AbstractRichFunction
且额定提供了 资源初始化:open()
和 资源清理 close()
别离在 run()
运行前执行和运行后运行,罕用比方 Mysql 连贯关上和敞开。
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
private transient RuntimeContext runtimeContext;
// 该办法能够获取上下文
@Override
public RuntimeContext getRuntimeContext() {if (this.runtimeContext != null) {return this.runtimeContext;} else {throw new IllegalStateException("The runtime context has not been initialized.");
}
}
// 资源初始化
@Override
public void open(Configuration parameters) throws Exception {}
// 资源敞开
@Override
public void close() throws Exception {}
}
RichSourceFunction 实现实例:
/**
* 带上下文 不反对并发
* 而且减少了两个资源关上敞开办法 能够再 run 之前调用
*/
public class E03CustomizeRichSource extends RichSourceFunction<OrderInfo> {
private transient Connection connection;
private transient PreparedStatement ps;
// 运行标记位
private volatile boolean isRunning = true;
/**
* run 之前调用
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
// 通过高低问获取全局配置对象且读取配置信息
ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String url = parameterTool.get("url");
String username = parameterTool.get("username");
String password = parameterTool.get("password");
// 初始化
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection(url, username, password);
ps = connection.prepareStatement("select * from order_info");
}
/**
* run 之后调用
*
* @throws Exception
*/
@Override
public void close() throws Exception {super.close();
ps.close();
connection.close();}
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception {try (ResultSet resultSet = ps.executeQuery()) {while (resultSet.next()) {
//todo ...
//ctx.
}
}
}
@Override
public void cancel() {isRunning = false;}
}
1.3 RichParallelSourceFunction 形象办法
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
implements ParallelSourceFunction<OUT> {private static final long serialVersionUID = 1L;}
从名字就能够晓得这是一个并行数据源,他只是把 SourceFunction
替换了 ParallelSourceFunction
。SourceFunction
和 RichSourceFunction
并行度设置只能为 1 否则会报错 The parallelism of non parallel operator must be 1. _。_ RichParallelSourceFunction
则不受影响。
java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:37)
at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:103)
at flinklearning._1source.DemoMain.main(DemoMain.java:30)
1.4 需要选型
咱们晓得咱们查询数据库并行查问,会查问反复数据,所以没必须要应用 RichParallelSourceFunction
。
咱们须要应用 mysql。须要进行资源操作,所以最好应用 RichSourceFunction
。
2. 检查点 CheckPoint & 状态 Operator State
Flink 是有状态的流解决框架。自己了解:状态是某个工夫节点元素产生事件的形容。
比方:你想治理历史数据的时候,你能够将该历史数据存储在状态外面,这样你能够在将来某个工夫拜访。
2.1 状态的持久性
Flink 通过 CheckPoint(检查点) 将 state(状态) 长久化在状态后端。如果程序失败或异样重启,能够通过长久化的检查点复原到过后状态。
罕用后端:
- MemoryStateBackend(默认)
- FsStateBackend
- RocksDBStateBackend
_
_
_
2.2 检查点触发
Flink 默认是不开启检查点,用户能够 StreamExecutionEnvironment.enableCheckpointing(2000
`)` 设置。
Flink 会依据用户设置的工夫 周期有序的从 source 往数据流中插入 barrier(栏栅)
如:通过下图咱们初步理解下 检查点机制 并行度为一状况
-
工夫线 1
Source
端网数据流中插入checkPoint 1
-
工夫线 2
checkPoint 1
进入Map
触发State
长久化,并持续往上游算子数据流中传送。
-
工夫线 3
checkPoint 1
进入Sink
触发State
长久化,整个检查点触发结束。
2.3 Operator State 应用
Operator State
,只反对 ListState
状态存储构造和播送状态构造。其实还有另一种类型状态,这里咱们先只记住这个,有机会前面独自讲讲区别和应用。
应用 Operator State
须要联合CheckpointedFunction
接口应用。算子实现该接口,重写其中的两个办法:
- snapshotState 快照状态 触发检查点快照的时候
- initializeState 初始化状态 再构造函数后调用
public interface CheckpointedFunction{
// 触发检查点的时会调用此办法
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 创立函数的时候回嗲用此办法,能够获取状态信息 复原嘻嘻
void initializeState(FunctionInitializationContext context) throws Exception;
}
代码示例:
public class E05ReadMysqlRichSource02 implements CheckpointedFunction {
/**
* 存储 state 的变量.
*/
private ListState<LocalDateTime> startDateTimeState;
/**
* 检查点快照复原
*
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
startDateTimeState.clear();
startDateTimeState.add(startDateTime);
System.out.println("快照后果:" + startDateTime);
}
/**
* 启动时执行快照复原
*
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
// 定义状态形容
ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
// 从状态中复原
if (context.isRestored()) {for (LocalDateTime localDateTime : startDateTimeState.get()) {startDateTime = localDateTime;}
System.out.println("复原后果:" + startDateTime);
}
}
}
2.4 需要剖析
下面咱们理解快照的作用,和怎么应用状态。咱们就能够解决需要中 最新查问截止工夫 不会因为程序故障失落,实现起码一次读取。
3. 了解自定义实现的 SourceFunction 生命周期
咱们尝试来编写 MysqlReadSourceFunction 首先继承 RichSourceFunction
实现数据发送;实现**CheckpointedFunction**
来保障数据完整性。然而提供了那么办法谁先执行谁后执行,关系是什么?
/**
* 实现自定实现 Mysql 数据读取器
* 反对 定点读取和切好一次读取
*/
public class MysqlReadSourceFunction extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {public MysqlReadSourceFunction() {super();
System.err.println("第一执行 1 构造方法");
}
@Override
public void open(Configuration parameters) throws Exception {System.err.println("第三执行 open---------------");
super.open(parameters);
}
@Override
public void close() throws Exception {System.err.println("第五执行 close---------------");
super.close();}
@Override
public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {System.err.println("第三四 run---------------");
}
@Override
public void cancel() {System.err.println("勾销时候执行 cancel---------------");
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
}
}
- 首先必定是结构器
- 其次会调用 initializeState 办法,进行数据状态复原
- 再而 open() 办法 初始化资源
- 紧接着 就进入真正执行 执行数据发送 run() 外围办法
- 此时如果程序勾销了,会调用 cancel() 能够再外面敞开 run() 外面的循环, 它会在线程中断前执行。
- 接着会调用 清理 close()
- 最初完结
3. 需要实现,就是填写代码了
/**
* 实现自定实现 Mysql 数据读取器
* 反对 定点读取和至多一次
*/
public class E05ReadMysqlRichSource02 extends RichSourceFunction<TraceSegmentRecordInfo> implements CheckpointedFunction {
// 常量名称
public static final String url = "url";
public static final String username = "username";
public static final String password = "password";
// 数据库连贯资源对象
private transient Connection connection;
private transient PreparedStatement statement;
// 查问 SQL
private String querySql;
// 是否持续运行
private volatile boolean isRunning = true;
// 查问开始工夫
private volatile LocalDateTime startDateTime;
// 工夫增长步长 单位分钟
private volatile int timeStep = 1;
/**
* 存储 state 的变量.
*/
private ListState<LocalDateTime> startDateTimeState;
public E05ReadMysqlRichSource02(String querySql, LocalDateTime startDateTime) {super();
// 设置查问 SQL
this.querySql = querySql;
// 初始化查问其实工夫
this.startDateTime = startDateTime;
}
/**
* 在 run 办法前执行
* 进行资源初始化
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
// 获取系统配置
ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// 创立连贯资源
Class.forName("com.mysql.cj.jdbc.Driver");
connection = DriverManager.getConnection(parameterTool.get(url), parameterTool.get(username), parameterTool.get(password));
statement = connection.prepareStatement(querySql);
}
/**
* 资源进行敞开 在线程中断前执行
*
* @throws Exception
*/
@Override
public void close() throws Exception {super.close();
System.err.println("第五执行 close---------------");
isRunning = false;
// 敞开资源连贯
if (statement != null) {statement.close();
}
if (connection != null) {connection.close();
}
}
/**
* 在反对只反对不会并行执行
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<TraceSegmentRecordInfo> ctx) throws Exception {final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 获取检查点锁
final Object lock = ctx.getCheckpointLock();
// 循环遍历
while (isRunning) {
// 依照 工夫步长生成 查问起止工夫
String startDateTimeStr = dtf.format(startDateTime);
LocalDateTime endDateTime = startDateTime.plusMinutes(timeStep);
if (endDateTime.isAfter(LocalDateTime.now())) {
// 最新截止工夫 大于零碎工夫休眠后跳出循环
Thread.sleep(timeStep * 60000);
continue;
}
String endDateTimeStr = dtf.format(endDateTime);
// 设置参数
statement.setString(1, startDateTimeStr);
statement.setString(2, endDateTimeStr);
// 查问数据且封装参数
List<TraceSegmentRecordInfo> containerList = new ArrayList<>();
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {TraceSegmentRecordInfo info = new TraceSegmentRecordInfo();
info.setSegmentId(resultSet.getString("segment_id"));
info.setTraceId(resultSet.getString("trace_id"));
info.setServiceName(resultSet.getString("service_name"));
info.setServiceIp(resultSet.getString("service_ip"));
info.setEndpointName(resultSet.getString("endpoint_name"));
info.setDataBinary(resultSet.getString("data_binary"));
info.setTimeBucket(resultSet.getLong("time_bucket"));
info.setStartTime(resultSet.getLong("start_time"));
info.setEndTime(resultSet.getLong("end_time"));
info.setLatency(resultSet.getInt("latency"));
info.setIsError(resultSet.getInt("is_error"));
info.setCreateDate(resultSet.getDate("create_time"));
info.setStatement(resultSet.getString("statement"));
containerList.add(info);
}
System.out.println(String.format("执行后果[%s - %s] 查问条数 %s", startDateTimeStr, endDateTimeStr, containerList.size()));
if (CollectionUtils.isNotEmpty(containerList)) {for (Iterator<TraceSegmentRecordInfo> iterator = containerList.iterator(); iterator.hasNext();) {
// 发送数据
ctx.collect(iterator.next());
}
}
synchronized (lock) {
// 推送结束 最初应用 endDateTime 设置为起始工夫
startDateTime = endDateTime;
}
}
}
@Override
public void cancel() {System.err.println("勾销时候执行 cancel---------------");
// 设置进行 run 办法循环
isRunning = false;
}
/**
* 检查点快照复原
*
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {System.err.println("快照时候执行 snapshotState---------------");
// 快照最新状态
startDateTimeState.clear();
startDateTimeState.add(startDateTime);
System.out.println("快照后果:" + startDateTime);
}
/**
* 启动时执行快照复原
*
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {System.err.println("第二执行 initializeState---------------");
// 定义状态形容
ListStateDescriptor<LocalDateTime> startDateTimeStateDescriptor = new ListStateDescriptor<LocalDateTime>("startDateTimeState", TypeInformation.of(LocalDateTime.class));
startDateTimeState = context.getOperatorStateStore().getListState(startDateTimeStateDescriptor);
// 从状态中复原,if (context.isRestored()) {
// 只有重状态复原 才会进来,失常启动 isRestored = false
for (LocalDateTime localDateTime : startDateTimeState.get()) {startDateTime = localDateTime;}
System.out.println("复原后果:" + startDateTime);
}
}
}
public class DemoMain {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 本人实现带环境的零碎参数解析
ParameterTool parameterTool = ParameterToolEnvironmentUtils.createParameterTool(args);
env.getConfig().setGlobalJobParameters(parameterTool);
// 开启检查点
env.enableCheckpointing(2000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置文件系统作为状态后端
FsStateBackend fsStateBackend = new FsStateBackend("file:///usr/flink-1.12.1/tmp");
env.setStateBackend(fsStateBackend);
// 执行后果[2021-02-09 07:39:01 - 2021-02-09 07:40:01] 查问条数 177
String sql = "SELECT * from trace_segment_record where create_time >= ? AND create_time < ?";
//source
DataStreamSource<TraceSegmentRecordInfo> source2 = env.addSource(new E05ReadMysqlRichSource02(sql, LocalDateTime.of(2021, 01, 8, 0, 0, 01))).setParallelism(2);
// 过滤
SingleOutputStreamOperator<TraceSegmentRecordInfo> filter = source2.filter(new FilterFunction<TraceSegmentRecordInfo>() {
@Override
public boolean filter(TraceSegmentRecordInfo value) throws Exception {
// 过滤等于 null 的数据
return value.getEndpointName() != null;}
});
//sink
JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder().withBatchIntervalMs(2000L).withBatchSize(5000).build();
JdbcConnectionOptions connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder())
.withUrl("jdbc:mysql://xxxx:3306/yto_data_pipeline_init?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("xxx")
.withPassword("xxxx").build();
final String sinkSql = "REPLACE INTO trace_segment_record (segment_id, trace_id, service_name, service_ip, endpoint_name, start_time, end_time,latency,is_error, data_binary,time_bucket,create_time,statement) VALUE(?,?,?,?,?,?,?,?,?,?,?,?,?)";
filter.addSink(JdbcSink.sink(sinkSql, (ps, v) -> {ps.setString(1, v.getSegmentId());
ps.setString(2, v.getTraceId());
ps.setString(3, v.getServiceName());
ps.setString(4, v.getServiceIp());
ps.setString(5, v.getEndpointName());
ps.setLong(6, v.getStartTime());
ps.setLong(7, v.getEndTime());
ps.setLong(8, v.getLatency());
ps.setInt(9, v.getIsError());
ps.setString(10, v.getDataBinary());
ps.setLong(11, v.getTimeBucket());
ps.setTimestamp(12, new Timestamp(v.getCreateDate().getTime()));
ps.setString(13, v.getStatement());
}, executionOptions, connectionOptions)).uid("sink_mysql");
env.execute("aaa");
}
}
3.1 中断写入演示
我在执行到 2021-01-08 00:32:01
勾销了工作,发现零碎曾经存储快照了检查点。
咱们应用检查点重启我的项目在察看日志是否从 2021-01-08 00:32:01
开始打印!
其实从日志能够看到 我打印了日志不代表那个工夫节点曾经长久化到数据状态后端,
这里获得上一次的工夫节点 2021-01-08 00:30:01
数据也确实是从上次检查点开始查问的。
其实大家也发现了,数据反复读取了,这个也是文章结尾起码一次读取,局部反复,最受是通过数据唯一性查问语句实现数据一致性的。