1、JdbcSink
用于 DataStream 减少 Jdbc 的 Sink 输入,次要两个接口:sink() 和 exactlyOnceSink()。其中 exactlyOnceSink() 是 13 版本新增的反对事务性的接口,本次次要介绍 sink() 接口。
public static <T> SinkFunction<T> sink(
String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
return new GenericJdbcSinkFunction<>(
new JdbcBatchingOutputFormat<>(new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
context -> {
Preconditions.checkState(!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
return JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity());
},
JdbcBatchingOutputFormat.RecordExtractor.identity()));
}
1.1、参数
接口有四个参数,其中第三个参数 executionOptions 能够省略应用默认值,具体样例参看 1、JdbcSink 形式
-
sql
String 类型,一个 SQL 语句模板,就是通常应用的 PreparedStatement 那种模式,例如:insert into wordcount (wordcl, countcl) values (?,?)
-
statementBuilder
JdbcStatementBuilder 类型,作用是实现流数据与 SQL 具体列的对应,基于上一个参数的 PreparedStatement 模式,实现对应关系
-
executionOptions
Flink Jdbc 输入的执行规定,次要设置执行触发机制,次要设置三个参数:数据量触发阈值、工夫触发阈值、最大重试次数。其中,数据量触发默认为 5000,工夫触发默认为 0,即敞开工夫触发。留神触发阈值不要设置的过低,否则可能造成数据库的阻塞。
-
connectionOptions
JdbcConnectionOptions 类型,用于设置数据库连贯属性,包含 Url、Driver、Username、Password 等
1.2、返回
接口返回的是一个基于 SinkFunction 实现的 GenericJdbcSinkFunction 类,其外围是参数 JdbcBatchingOutputFormat。
GenericJdbcSinkFunction 的后果外围办法如下,都是基于 JdbcBatchingOutputFormat 的操作。
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {outputFormat.writeRecord(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {outputFormat.flush();
}
2、JdbcBatchingOutputFormat
JdbcBatchingOutputFormat 是进行 Jdbc 交互的实现类,在向 Jdbc 输入前进行数据聚合
2.1、参数
接口有四个参数
-
JdbcConnectionProvider
提供 Jdbc 连贯
-
JdbcExecutionOptions
执行参数
-
StatementExecutorFactory
Statement 执行工厂,也就是流数据与数据库字段对应关系的解决
-
RecordExtractor
数据提取的执行类
2.2、open 办法
Open 办法是进行数据库连贯初始化及后期筹备的接口,存在调用关系
Task.doRun()
->invokable.invoke()->DataSinkTask.invoke()
->format.open()->JdbcBatchingOutputFormat.open()
2.2.1、连贯数据库
Open 办法的第一步是连贯数据库,调用下层办法 AbstractJdbcOutputFormat 的 open 办法,之后调用 JdbcConnectionProvider 的实现类 SimpleJdbcConnectionProvider 的 getOrEstablishConnection() 办法建设连贯,getOrEstablishConnection 的具体操作如下
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {if (connection != null) {return connection;}
if (jdbcOptions.getDriverName() == null) {
connection =
DriverManager.getConnection(jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
// Throw same exception as DriverManager.getConnection when no driver found to match
// caller expectation.
throw new SQLException("No suitable driver found for" + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
}
此处依据有没有设置 Drive 有两种解决。如果没有设置,会依据设置的 URL 主动解析,用到了 Java 的 DriverManager,这个类用于治理 Jdbc 驱动。DriverManager 会自动识别 classpath 里的 Driver,而后能够依据 URL 主动解析配对。如果设置了 Driver,那就间接加载这个 Driver 来进行连贯解决。
2.2.2、JdbcExec
这个是基于 StatementExecutorFactory 创立的,此处最初应用的实现类是 JdbcBatchStatementExecutor,在 sink() 接口中设定。这一步理论的操作就是做一个 prepareStatements
@Override
public void prepareStatements(Connection connection) throws SQLException {this.st = connection.prepareStatement(sql);
}
2.2.3、scheduler
数据库性能无限,所以 Flink 写数据库通常采纳批的形式,此处就是设置工夫调度的,具体参数能够参看第一章。须要留神的是两个非凡配置值:工夫为 0 或者条数为 1 则不创立这个调度器。
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
此处创立的调度线程池只蕴含一个线程
this.scheduler =
Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"))
调度器最终执行的操作就是整个类最大的一点,flush 数据到数据库
synchronized (JdbcBatchingOutputFormat.this) {if (!closed) {
try {flush();
} catch (Exception e) {flushException = e;}
}
}
2.3、writeRecord 办法
writeRecord 是类的外围办法,进行数据的写入。次要进行两个操作,将数据退出列表,达到条件时 flush 到数据库中。
try {addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {flush();
}
} catch (Exception e) {throw new IOException("Writing records to JDBC failed.", e);
}
2.3.1、缓存数据
缓存数据应用的就是一个简略的 ArrayList,其定义在 SimpleBatchStatementExecutor
SimpleBatchStatementExecutor(String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
this.sql = sql;
this.parameterSetter = statementBuilder;
this.valueTransformer = valueTransformer;
this.batch = new ArrayList<>();}
如上,batch 就是用于缓存数据的,增加数据操作如下。
@Override
public void addToBatch(T record) {batch.add(valueTransformer.apply(record));
}
其中 valueTransformer 的作用就是返回输出,在 sink 初始时定义:
return JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity());
/**
* Returns a function that always returns its input argument.
*
* @param <T> the type of the input and output objects to the function
* @return a function that always returns its input argument
*/
static <T> Function<T, T> identity() {return t -> t;}
2.3.2、flush
flush 就是把缓存数据向数据库刷出,最终调用的是 SimpleBatchStatementExecutor 的 executeBatch 办法
@Override
public void executeBatch() throws SQLException {if (!batch.isEmpty()) {for (V r : batch) {parameterSetter.accept(st, r);
st.addBatch();}
st.executeBatch();
batch.clear();}
}