1、JdbcSink
用于DataStream减少Jdbc的Sink输入,次要两个接口:sink()和exactlyOnceSink()。其中exactlyOnceSink()是13版本新增的反对事务性的接口,本次次要介绍sink()接口。
public static <T> SinkFunction<T> sink( String sql, JdbcStatementBuilder<T> statementBuilder, JdbcExecutionOptions executionOptions, JdbcConnectionOptions connectionOptions) { return new GenericJdbcSinkFunction<>( new JdbcBatchingOutputFormat<>( new SimpleJdbcConnectionProvider(connectionOptions), executionOptions, context -> { Preconditions.checkState( !context.getExecutionConfig().isObjectReuseEnabled(), "objects can not be reused with JDBC sink function"); return JdbcBatchStatementExecutor.simple( sql, statementBuilder, Function.identity()); }, JdbcBatchingOutputFormat.RecordExtractor.identity()));}
1.1、参数
接口有四个参数,其中第三个参数executionOptions能够省略应用默认值,具体样例参看1、JdbcSink形式
sql
String类型,一个SQL语句模板,就是通常应用的PreparedStatement那种模式,例如:insert into wordcount (wordcl, countcl) values (?,?)
statementBuilder
JdbcStatementBuilder类型,作用是实现流数据与SQL具体列的对应,基于上一个参数的PreparedStatement模式,实现对应关系
executionOptions
Flink Jdbc输入的执行规定,次要设置执行触发机制,次要设置三个参数:数据量触发阈值、工夫触发阈值、最大重试次数。其中,数据量触发默认为5000,工夫触发默认为0,即敞开工夫触发。留神触发阈值不要设置的过低,否则可能造成数据库的阻塞。
connectionOptions
JdbcConnectionOptions类型,用于设置数据库连贯属性,包含Url、Driver、Username、Password等
1.2、返回
接口返回的是一个基于SinkFunction实现的GenericJdbcSinkFunction类,其外围是参数JdbcBatchingOutputFormat。
GenericJdbcSinkFunction的后果外围办法如下,都是基于JdbcBatchingOutputFormat的操作。
@Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext ctx = getRuntimeContext(); outputFormat.setRuntimeContext(ctx); outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());}@Overridepublic void invoke(T value, Context context) throws IOException { outputFormat.writeRecord(value);}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception { outputFormat.flush();}
2、JdbcBatchingOutputFormat
JdbcBatchingOutputFormat是进行Jdbc交互的实现类,在向Jdbc输入前进行数据聚合
2.1、参数
接口有四个参数
JdbcConnectionProvider
提供Jdbc连贯
JdbcExecutionOptions
执行参数
StatementExecutorFactory
Statement执行工厂,也就是流数据与数据库字段对应关系的解决
RecordExtractor
数据提取的执行类
2.2、open办法
Open办法是进行数据库连贯初始化及后期筹备的接口,存在调用关系
Task.doRun() ->invokable.invoke()->DataSinkTask.invoke() ->format.open()->JdbcBatchingOutputFormat.open()
2.2.1、连贯数据库
Open办法的第一步是连贯数据库,调用下层办法AbstractJdbcOutputFormat的open办法,之后调用JdbcConnectionProvider的实现类SimpleJdbcConnectionProvider的getOrEstablishConnection()办法建设连贯,getOrEstablishConnection的具体操作如下
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException { if (connection != null) { return connection; } if (jdbcOptions.getDriverName() == null) { connection = DriverManager.getConnection( jdbcOptions.getDbURL(), jdbcOptions.getUsername().orElse(null), jdbcOptions.getPassword().orElse(null)); } else { Driver driver = getLoadedDriver(); Properties info = new Properties(); jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user)); jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password)); connection = driver.connect(jdbcOptions.getDbURL(), info); if (connection == null) { // Throw same exception as DriverManager.getConnection when no driver found to match // caller expectation. throw new SQLException( "No suitable driver found for " + jdbcOptions.getDbURL(), "08001"); } } return connection;}
此处依据有没有设置Drive有两种解决。如果没有设置,会依据设置的URL主动解析,用到了Java的DriverManager,这个类用于治理Jdbc驱动。DriverManager会自动识别classpath里的Driver,而后能够依据URL主动解析配对。如果设置了Driver,那就间接加载这个Driver来进行连贯解决。
2.2.2、JdbcExec
这个是基于StatementExecutorFactory创立的,此处最初应用的实现类是JdbcBatchStatementExecutor,在sink()接口中设定。这一步理论的操作就是做一个prepareStatements
@Overridepublic void prepareStatements(Connection connection) throws SQLException { this.st = connection.prepareStatement(sql);}
2.2.3、scheduler
数据库性能无限,所以Flink写数据库通常采纳批的形式,此处就是设置工夫调度的,具体参数能够参看第一章。须要留神的是两个非凡配置值:工夫为0或者条数为1则不创立这个调度器。
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
此处创立的调度线程池只蕴含一个线程
this.scheduler = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("jdbc-upsert-output-format"))
调度器最终执行的操作就是整个类最大的一点,flush数据到数据库
synchronized (JdbcBatchingOutputFormat.this) { if (!closed) { try { flush(); } catch (Exception e) { flushException = e; } }}
2.3、writeRecord办法
writeRecord是类的外围办法,进行数据的写入。次要进行两个操作,将数据退出列表,达到条件时flush到数据库中。
try { addToBatch(record, jdbcRecordExtractor.apply(record)); batchCount++; if (executionOptions.getBatchSize() > 0 && batchCount >= executionOptions.getBatchSize()) { flush(); }} catch (Exception e) { throw new IOException("Writing records to JDBC failed.", e);}
2.3.1、缓存数据
缓存数据应用的就是一个简略的ArrayList,其定义在SimpleBatchStatementExecutor
SimpleBatchStatementExecutor( String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) { this.sql = sql; this.parameterSetter = statementBuilder; this.valueTransformer = valueTransformer; this.batch = new ArrayList<>();}
如上,batch就是用于缓存数据的,增加数据操作如下。
@Overridepublic void addToBatch(T record) { batch.add(valueTransformer.apply(record));}
其中valueTransformer的作用就是返回输出,在sink初始时定义:
return JdbcBatchStatementExecutor.simple( sql, statementBuilder, Function.identity()); /** * Returns a function that always returns its input argument. * * @param <T> the type of the input and output objects to the function * @return a function that always returns its input argument */static <T> Function<T, T> identity() { return t -> t;}
2.3.2、flush
flush就是把缓存数据向数据库刷出,最终调用的是SimpleBatchStatementExecutor的executeBatch办法
@Overridepublic void executeBatch() throws SQLException { if (!batch.isEmpty()) { for (V r : batch) { parameterSetter.accept(st, r); st.addBatch(); } st.executeBatch(); batch.clear(); }}