序
本文主要研究一下 flink 的 Table Formats
实例
CSV Format
.withFormat(
new Csv()
.field(“field1”, Types.STRING) // required: ordered format fields
.field(“field2”, Types.TIMESTAMP)
.fieldDelimiter(“,”) // optional: string delimiter “,” by default
.lineDelimiter(“\n”) // optional: string delimiter “\n” by default
.quoteCharacter(‘”‘) // optional: single character for string values, empty by default
.commentPrefix(‘#’) // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
)
flink 内置支持 csv format,无需添加额外依赖
JSON Format
.withFormat(
new Json()
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// required: define the schema either by using type information which parses numbers to corresponding types
.schema(Type.ROW(…))
// or by using a JSON schema which parses to DECIMAL and TIMESTAMP
.jsonSchema(
“{” +
” type: ‘object’,” +
” properties: {” +
” lon: {” +
” type: ‘number'” +
” },” +
” rideTime: {” +
” type: ‘string’,” +
” format: ‘date-time'” +
” }” +
” }” +
“}”
)
// or use the table’s schema
.deriveSchema()
)
可以使用 schema 或者 jsonSchema 或者 deriveSchema 来定义 json format,需要额外添加 flink-json 依赖
Apache Avro Format
.withFormat(
new Avro()
// required: define the schema either by using an Avro specific record class
.recordClass(User.class)
// or by using an Avro schema
.avroSchema(
“{” +
” \”type\”: \”record\”,” +
” \”name\”: \”test\”,” +
” \”fields\” : [” +
” {\”name\”: \”a\”, \”type\”: \”long\”},” +
” {\”name\”: \”b\”, \”type\”: \”string\”}” +
” ]” +
“}”
)
)
可以使用 recordClass 或者 avroSchema 来定义 Avro schema,需要添加 flink-avro 依赖
ConnectTableDescriptor
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
private val tableEnv: TableEnvironment,
private val connectorDescriptor: ConnectorDescriptor)
extends TableDescriptor
with SchematicDescriptor[D]
with RegistrableDescriptor {this: D =>
private var formatDescriptor: Option[FormatDescriptor] = None
private var schemaDescriptor: Option[Schema] = None
//……
override def withFormat(format: FormatDescriptor): D = {
formatDescriptor = Some(format)
this
}
//……
}
StreamTableEnvironment 的 connect 方法创建 StreamTableDescriptor;StreamTableDescriptor 继承了 ConnectTableDescriptor;ConnectTableDescriptor 提供了 withFormat 方法,返回 FormatDescriptor
FormatDescriptor
flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java
@PublicEvolving
public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {
private String type;
private int version;
/**
* Constructs a {@link FormatDescriptor}.
*
* @param type string that identifies this format
* @param version property version for backwards compatibility
*/
public FormatDescriptor(String type, int version) {
this.type = type;
this.version = version;
}
@Override
public final Map<String, String> toProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
properties.putProperties(toFormatProperties());
return properties.asMap();
}
/**
* Converts this descriptor into a set of format properties. Usually prefixed with
* {@link FormatDescriptorValidator#FORMAT}.
*/
protected abstract Map<String, String> toFormatProperties();
}
FormatDescriptor 是个抽象类,Csv、Json、Avro 都是它的子类
Csv
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scala
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
private var fieldDelim: Option[String] = None
private var lineDelim: Option[String] = None
private val schema: mutable.LinkedHashMap[String, String] =
mutable.LinkedHashMap[String, String]()
private var quoteCharacter: Option[Character] = None
private var commentPrefix: Option[String] = None
private var isIgnoreFirstLine: Option[Boolean] = None
private var lenient: Option[Boolean] = None
def fieldDelimiter(delim: String): Csv = {
this.fieldDelim = Some(delim)
this
}
def lineDelimiter(delim: String): Csv = {
this.lineDelim = Some(delim)
this
}
def schema(schema: TableSchema): Csv = {
this.schema.clear()
schema.getFieldNames.zip(schema.getFieldTypes).foreach {case (n, t) =>
field(n, t)
}
this
}
def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
this
}
def field(fieldName: String, fieldType: String): Csv = {
if (schema.contains(fieldName)) {
throw new ValidationException(s”Duplicate field name $fieldName.”)
}
schema += (fieldName -> fieldType)
this
}
def quoteCharacter(quote: Character): Csv = {
this.quoteCharacter = Option(quote)
this
}
def commentPrefix(prefix: String): Csv = {
this.commentPrefix = Option(prefix)
this
}
def ignoreFirstLine(): Csv = {
this.isIgnoreFirstLine = Some(true)
this
}
def ignoreParseErrors(): Csv = {
this.lenient = Some(true)
this
}
override protected def toFormatProperties: util.Map[String, String] = {
val properties = new DescriptorProperties()
fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
val subKeys = util.Arrays.asList(
DescriptorProperties.TABLE_SCHEMA_NAME,
DescriptorProperties.TABLE_SCHEMA_TYPE)
val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava
properties.putIndexedFixedProperties(
FORMAT_FIELDS,
subKeys,
subValues)
quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
properties.asMap()
}
}
Csv 提供了 field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors 等方法
Json
flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.java
public class Json extends FormatDescriptor {
private Boolean failOnMissingField;
private Boolean deriveSchema;
private String jsonSchema;
private String schema;
public Json() {
super(FORMAT_TYPE_VALUE, 1);
}
public Json failOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
return this;
}
public Json jsonSchema(String jsonSchema) {
Preconditions.checkNotNull(jsonSchema);
this.jsonSchema = jsonSchema;
this.schema = null;
this.deriveSchema = null;
return this;
}
public Json schema(TypeInformation<Row> schemaType) {
Preconditions.checkNotNull(schemaType);
this.schema = TypeStringUtils.writeTypeInfo(schemaType);
this.jsonSchema = null;
this.deriveSchema = null;
return this;
}
public Json deriveSchema() {
this.deriveSchema = true;
this.schema = null;
this.jsonSchema = null;
return this;
}
@Override
protected Map<String, String> toFormatProperties() {
final DescriptorProperties properties = new DescriptorProperties();
if (deriveSchema != null) {
properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema);
}
if (jsonSchema != null) {
properties.putString(FORMAT_JSON_SCHEMA, jsonSchema);
}
if (schema != null) {
properties.putString(FORMAT_SCHEMA, schema);
}
if (failOnMissingField != null) {
properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);
}
return properties.asMap();
}
}
Json 提供了 schema、jsonSchema、deriveSchema 三种方式来定义 json format
Avro
flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.java
public class Avro extends FormatDescriptor {
private Class<? extends SpecificRecord> recordClass;
private String avroSchema;
public Avro() {
super(AvroValidator.FORMAT_TYPE_VALUE, 1);
}
public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
Preconditions.checkNotNull(recordClass);
this.recordClass = recordClass;
return this;
}
public Avro avroSchema(String avroSchema) {
Preconditions.checkNotNull(avroSchema);
this.avroSchema = avroSchema;
return this;
}
@Override
protected Map<String, String> toFormatProperties() {
final DescriptorProperties properties = new DescriptorProperties();
if (null != recordClass) {
properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
}
if (null != avroSchema) {
properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);
}
return properties.asMap();
}
}
Avro 提供了 recordClass、avroSchema 两种方式来定义 avro format
小结
StreamTableEnvironment 的 connect 方法创建 StreamTableDescriptor;StreamTableDescriptor 继承了 ConnectTableDescriptor
ConnectTableDescriptor 提供了 withFormat 方法,返回 FormatDescriptor;FormatDescriptor 是个抽象类,Csv、Json、Avro 都是它的子类
Csv 提供了 field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors 等方法;Json 提供了 schema、jsonSchema、deriveSchema 三种方式来定义 json format;Avro 提供了 recordClass、avroSchema 两种方式来定义 avro format
doc
Table Formats