聊聊flink的JDBCOutputFormat

3次阅读

共计 15386 个字符,预计需要花费 39 分钟才能阅读完成。


本文主要研究一下 flink 的 JDBCOutputFormat
JDBCOutputFormat
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
/**
* OutputFormat to write Rows into a JDBC database.
* The OutputFormat has to be configured using the supplied OutputFormatBuilder.
*
* @see Row
* @see DriverManager
*/
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;

private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);

private String username;
private String password;
private String drivername;
private String dbURL;
private String query;
private int batchInterval = DEFAULT_BATCH_INTERVAL;

private Connection dbConn;
private PreparedStatement upload;

private int batchCount = 0;

private int[] typesArray;

public JDBCOutputFormat() {
}

@Override
public void configure(Configuration parameters) {
}

/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
upload = dbConn.prepareStatement(query);
} catch (SQLException sqe) {
throw new IllegalArgumentException(“open() failed.”, sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException(“JDBC driver class not found.”, cnfe);
}
}

private void establishConnection() throws SQLException, ClassNotFoundException {
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
}

/**
* Adds a record to the prepared statement.
*
* <p>When this method is called, the output format is guaranteed to be opened.
*
* <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
* insert a null value but it’s not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
*
* @param row The records to add to the output.
* @see PreparedStatement
* @throws IOException Thrown, if the records could not be added due to an I/O problem.
*/
@Override
public void writeRecord(Row row) throws IOException {

if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
LOG.warn(“Column SQL types array doesn’t match arity of passed Row! Check the passed array…”);
}
try {

if (typesArray == null) {
// no types provided
for (int index = 0; index < row.getArity(); index++) {
LOG.warn(“Unknown column type for column {}. Best effort approach to set its value: {}.”, index + 1, row.getField(index));
upload.setObject(index + 1, row.getField(index));
}
} else {
// types provided
for (int index = 0; index < row.getArity(); index++) {

if (row.getField(index) == null) {
upload.setNull(index + 1, typesArray[index]);
} else {
// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
switch (typesArray[index]) {
case java.sql.Types.NULL:
upload.setNull(index + 1, typesArray[index]);
break;
case java.sql.Types.BOOLEAN:
case java.sql.Types.BIT:
upload.setBoolean(index + 1, (boolean) row.getField(index));
break;
case java.sql.Types.CHAR:
case java.sql.Types.NCHAR:
case java.sql.Types.VARCHAR:
case java.sql.Types.LONGVARCHAR:
case java.sql.Types.LONGNVARCHAR:
upload.setString(index + 1, (String) row.getField(index));
break;
case java.sql.Types.TINYINT:
upload.setByte(index + 1, (byte) row.getField(index));
break;
case java.sql.Types.SMALLINT:
upload.setShort(index + 1, (short) row.getField(index));
break;
case java.sql.Types.INTEGER:
upload.setInt(index + 1, (int) row.getField(index));
break;
case java.sql.Types.BIGINT:
upload.setLong(index + 1, (long) row.getField(index));
break;
case java.sql.Types.REAL:
upload.setFloat(index + 1, (float) row.getField(index));
break;
case java.sql.Types.FLOAT:
case java.sql.Types.DOUBLE:
upload.setDouble(index + 1, (double) row.getField(index));
break;
case java.sql.Types.DECIMAL:
case java.sql.Types.NUMERIC:
upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
break;
case java.sql.Types.DATE:
upload.setDate(index + 1, (java.sql.Date) row.getField(index));
break;
case java.sql.Types.TIME:
upload.setTime(index + 1, (java.sql.Time) row.getField(index));
break;
case java.sql.Types.TIMESTAMP:
upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
break;
case java.sql.Types.BINARY:
case java.sql.Types.VARBINARY:
case java.sql.Types.LONGVARBINARY:
upload.setBytes(index + 1, (byte[]) row.getField(index));
break;
default:
upload.setObject(index + 1, row.getField(index));
LOG.warn(“Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.”,
typesArray[index], index + 1, row.getField(index));
// case java.sql.Types.SQLXML
// case java.sql.Types.ARRAY:
// case java.sql.Types.JAVA_OBJECT:
// case java.sql.Types.BLOB:
// case java.sql.Types.CLOB:
// case java.sql.Types.NCLOB:
// case java.sql.Types.DATALINK:
// case java.sql.Types.DISTINCT:
// case java.sql.Types.OTHER:
// case java.sql.Types.REF:
// case java.sql.Types.ROWID:
// case java.sql.Types.STRUC
}
}
}
}
upload.addBatch();
batchCount++;
} catch (SQLException e) {
throw new RuntimeException(“Preparation of JDBC statement failed.”, e);
}

if (batchCount >= batchInterval) {
// execute batch
flush();
}
}

void flush() {
try {
upload.executeBatch();
batchCount = 0;
} catch (SQLException e) {
throw new RuntimeException(“Execution of JDBC statement failed.”, e);
}
}

int[] getTypesArray() {
return typesArray;
}

/**
* Executes prepared statement and closes all resources of this instance.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
@Override
public void close() throws IOException {
if (upload != null) {
flush();
// close the connection
try {
upload.close();
} catch (SQLException e) {
LOG.info(“JDBC statement could not be closed: ” + e.getMessage());
} finally {
upload = null;
}
}

if (dbConn != null) {
try {
dbConn.close();
} catch (SQLException se) {
LOG.info(“JDBC connection could not be closed: ” + se.getMessage());
} finally {
dbConn = null;
}
}
}

public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
return new JDBCOutputFormatBuilder();
}

//……
}

JDBCOutputFormat 继承了 RichOutputFormat,这里的泛型为 org.apache.flink.types.Row
open 的时候调用了 establishConnection 来加载驱动,初始化 dbConn,然后调用 dbConn.prepareStatement(query) 来获取 upload(PreparedStatement)
writeRecord 方法先判断是否有提供 typesArray,没有的话则使用 setObject 来设置值,有点话则根据对应的类型进行转换,这里支持了多种 java.sql.Types 里头的类型
writeRecord 采取的是 PreparedStatement.addBatch 操作,当 batchCount 大于等于 batchInterval(默认 5000),会执行 flush 操作,也就是调用 PreparedStatement.executeBatch 方法,然后重置 batchCount;为了以防数据没达到 batchInterval 而未能提交,在 close 的时候会再次执行 flush 操作,然后才关闭 PreparedStatement、Connection
JDBCOutputFormat 提供了一个 JDBCOutputFormatBuilder,可以用来方便构建 JDBCOutputFormat

Row
flink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java
/**
* A Row can have arbitrary number of fields and contain a set of fields, which may all be
* different types. The fields in Row can be null. Due to Row is not strongly typed, Flink’s
* type extraction mechanism can’t extract correct field types. So that users should manually
* tell Flink the type information via creating a {@link RowTypeInfo}.
*
* <p>
* The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
* set fields by {@link #setField(int, Object)}.
* <p>
* Row is in principle serializable. However, it may contain non-serializable fields,
* in which case serialization will fail.
*
*/
@PublicEvolving
public class Row implements Serializable{

private static final long serialVersionUID = 1L;

/** The array to store actual values. */
private final Object[] fields;

/**
* Create a new Row instance.
* @param arity The number of fields in the Row
*/
public Row(int arity) {
this.fields = new Object[arity];
}

/**
* Get the number of fields in the Row.
* @return The number of fields in the Row.
*/
public int getArity() {
return fields.length;
}

/**
* Gets the field at the specified position.
* @param pos The position of the field, 0-based.
* @return The field at the specified position.
* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
*/
public Object getField(int pos) {
return fields[pos];
}

/**
* Sets the field at the specified position.
*
* @param pos The position of the field, 0-based.
* @param value The value to be assigned to the field at the specified position.
* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
*/
public void setField(int pos, Object value) {
fields[pos] = value;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fields.length; i++) {
if (i > 0) {
sb.append(“,”);
}
sb.append(StringUtils.arrayAwareToString(fields[i]));
}
return sb.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Row row = (Row) o;

return Arrays.deepEquals(fields, row.fields);
}

@Override
public int hashCode() {
return Arrays.deepHashCode(fields);
}

/**
* Creates a new Row and assigns the given values to the Row’s fields.
* This is more convenient than using the constructor.
*
* <p>For example:
*
* <pre>
* Row.of(“hello”, true, 1L);}
* </pre>
* instead of
* <pre>
* Row row = new Row(3);
* row.setField(0, “hello”);
* row.setField(1, true);
* row.setField(2, 1L);
* </pre>
*
*/
public static Row of(Object… values) {
Row row = new Row(values.length);
for (int i = 0; i < values.length; i++) {
row.setField(i, values[i]);
}
return row;
}

/**
* Creates a new Row which copied from another row.
* This method does not perform a deep copy.
*
* @param row The row being copied.
* @return The cloned new Row
*/
public static Row copy(Row row) {
final Row newRow = new Row(row.fields.length);
System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
return newRow;
}

/**
* Creates a new Row with projected fields from another row.
* This method does not perform a deep copy.
*
* @param fields fields to be projected
* @return the new projected Row
*/
public static Row project(Row row, int[] fields) {
final Row newRow = new Row(fields.length);
for (int i = 0; i < fields.length; i++) {
newRow.fields[i] = row.fields[fields[i]];
}
return newRow;
}
}
Row 是 JDBCOutputFormat 的 writeRecord 的类型,它里头使用 Object 数据来存取字段值,同时也提供了诸如 of、copy、project 等静态方法
JDBCOutputFormatBuilder
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
/**
* Builder for a {@link JDBCOutputFormat}.
*/
public static class JDBCOutputFormatBuilder {
private final JDBCOutputFormat format;

protected JDBCOutputFormatBuilder() {
this.format = new JDBCOutputFormat();
}

public JDBCOutputFormatBuilder setUsername(String username) {
format.username = username;
return this;
}

public JDBCOutputFormatBuilder setPassword(String password) {
format.password = password;
return this;
}

public JDBCOutputFormatBuilder setDrivername(String drivername) {
format.drivername = drivername;
return this;
}

public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
format.dbURL = dbURL;
return this;
}

public JDBCOutputFormatBuilder setQuery(String query) {
format.query = query;
return this;
}

public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
format.batchInterval = batchInterval;
return this;
}

public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
format.typesArray = typesArray;
return this;
}

/**
* Finalizes the configuration and checks validity.
*
* @return Configured JDBCOutputFormat
*/
public JDBCOutputFormat finish() {
if (format.username == null) {
LOG.info(“Username was not supplied.”);
}
if (format.password == null) {
LOG.info(“Password was not supplied.”);
}
if (format.dbURL == null) {
throw new IllegalArgumentException(“No database URL supplied.”);
}
if (format.query == null) {
throw new IllegalArgumentException(“No query supplied.”);
}
if (format.drivername == null) {
throw new IllegalArgumentException(“No driver supplied.”);
}

return format;
}
}
JDBCOutputFormatBuilder 提供了对 username、password、dbURL、query、drivername、batchInterval、typesArray 这几个属性的 builder 方法
JDBCAppendTableSink
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
/**
* An at-least-once Table sink for JDBC.
*
* <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
* checkpointing is enabled). However, one common use case is to run idempotent queries
* (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
* achieve exactly-once semantic.</p>
*/
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

private final JDBCOutputFormat outputFormat;

private String[] fieldNames;
private TypeInformation[] fieldTypes;

JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
this.outputFormat = outputFormat;
}

public static JDBCAppendTableSinkBuilder builder() {
return new JDBCAppendTableSinkBuilder();
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
dataStream
.addSink(new JDBCSinkFunction(outputFormat))
.name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet.output(outputFormat);
}

@Override
public TypeInformation<Row> getOutputType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}

@Override
public String[] getFieldNames() {
return fieldNames;
}

@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}

@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
int[] types = outputFormat.getTypesArray();

String sinkSchema =
String.join(“, “, IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
String tableSchema =
String.join(“, “, Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
String msg = String.format(“Schema of output table is incompatible with JDBCAppendTableSink schema. ” +
“Table schema: [%s], sink schema: [%s]”, tableSchema, sinkSchema);

Preconditions.checkArgument(fieldTypes.length == types.length, msg);
for (int i = 0; i < types.length; ++i) {
Preconditions.checkArgument(
JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
msg);
}

JDBCAppendTableSink copy;
try {
copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}

copy.fieldNames = fieldNames;
copy.fieldTypes = fieldTypes;
return copy;
}

@VisibleForTesting
JDBCOutputFormat getOutputFormat() {
return outputFormat;
}
}

JDBCAppendTableSink 里头用到了 JDBCOutputFormat,它实现了 AppendStreamTableSink 以及 BatchTableSink 接口
它的 emitDataStream 方法会给传入的 dataStream 设置 JDBCSinkFunction 的 sink(JDBCSinkFunction);而 emitDataSet 方法则对 dataSet 设置 output
这里实现了 TableSink(BatchTableSink 声明实现 TableSink) 的 getOutputType、getFieldNames、getFieldTypes、configure 方法;configure 方法这里主要是根据 JDBCOutputFormat 创建了 JDBCAppendTableSink

JDBCSinkFunction
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
final JDBCOutputFormat outputFormat;

JDBCSinkFunction(JDBCOutputFormat outputFormat) {
this.outputFormat = outputFormat;
}

@Override
public void invoke(Row value) throws Exception {
outputFormat.writeRecord(value);
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}

@Override
public void close() throws Exception {
outputFormat.close();
super.close();
}
}
JDBCSinkFunction 继承了 RichSinkFunction,同时也实现了 CheckpointedFunction 接口;invoke 方法使用的是 JDBCOutputFormat.writeRecord 方法,而 snapshotState 则是调用了 JDBCOutputFormat.flush 来及时提交记录
小结

JDBCOutputFormat 继承了 RichOutputFormat,open 的时候调用了 establishConnection 来加载驱动,初始化 dbConn,然后调用 dbConn.prepareStatement(query) 来获取 upload(PreparedStatement);writeRecord 采取的是 PreparedStatement.addBatch 操作,当 batchCount 大于等于 batchInterval(默认 5000),会执行 flush 操作,也就是调用 PreparedStatement.executeBatch 方法,然后重置 batchCount;为了以防数据没达到 batchInterval 而未能提交,在 close 的时候会再次执行 flush 操作,然后才关闭 PreparedStatement、Connection
Row 是 JDBCOutputFormat 的 writeRecord 的类型,它里头使用 Object 数据来存取字段值
JDBCOutputFormatBuilder 提供了对 username、password、dbURL、query、drivername、batchInterval、typesArray 这几个属性的 builder 方法
JDBCAppendTableSink 里头用到了 JDBCOutputFormat,它的 emitDataStream 方法会给传入的 dataStream 设置 JDBCSinkFunction 的 sink(JDBCSinkFunction);而 emitDataSet 方法则对 dataSet 设置 output
JDBCSinkFunction 继承了 RichSinkFunction,同时也实现了 CheckpointedFunction 接口;invoke 方法使用的是 JDBCOutputFormat.writeRecord 方法,而 snapshotState 则是调用了 JDBCOutputFormat.flush 来及时提交记录

doc
JDBCOutputFormat

正文完
 0