乐趣区

聊聊flink的CsvTableSource


本文主要研究一下 flink 的 CsvTableSource
TableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala
trait TableSource[T] {

/** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
* The fields of the return type are mapped to the table schema based on their name.
*
* @return The type of the returned [[DataSet]] or [[DataStream]].
*/
def getReturnType: TypeInformation[T]

/**
* Returns the schema of the produced table.
*
* @return The [[TableSchema]] of the produced table.
*/
def getTableSchema: TableSchema

/**
* Describes the table source.
*
* @return A String explaining the [[TableSource]].
*/
def explainSource(): String =
TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
}
TableSource 定义了三个方法,分别是 getReturnType、getTableSchema、explainSource
BatchTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala
trait BatchTableSource[T] extends TableSource[T] {

/**
* Returns the data of the table as a [[DataSet]].
*
* NOTE: This method is for internal use only for defining a [[TableSource]].
* Do not use it in Table API programs.
*/
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
BatchTableSource 继承了 TableSource,它定义了 getDataSet 方法
StreamTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala
trait StreamTableSource[T] extends TableSource[T] {

/**
* Returns the data of the table as a [[DataStream]].
*
* NOTE: This method is for internal use only for defining a [[TableSource]].
* Do not use it in Table API programs.
*/
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
StreamTableSource 继承了 TableSource,它定义了 getDataStream 方法
CsvTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala
class CsvTableSource private (
private val path: String,
private val fieldNames: Array[String],
private val fieldTypes: Array[TypeInformation[_]],
private val selectedFields: Array[Int],
private val fieldDelim: String,
private val rowDelim: String,
private val quoteCharacter: Character,
private val ignoreFirstLine: Boolean,
private val ignoreComments: String,
private val lenient: Boolean)
extends BatchTableSource[Row]
with StreamTableSource[Row]
with ProjectableTableSource[Row] {

def this(
path: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
quoteCharacter: Character = null,
ignoreFirstLine: Boolean = false,
ignoreComments: String = null,
lenient: Boolean = false) = {

this(
path,
fieldNames,
fieldTypes,
fieldTypes.indices.toArray, // initially, all fields are returned
fieldDelim,
rowDelim,
quoteCharacter,
ignoreFirstLine,
ignoreComments,
lenient)

}

def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
}

if (fieldNames.length != fieldTypes.length) {
throw new TableException(“Number of field names and field types must be equal.”)
}

private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
private val selectedFieldNames = selectedFields.map(fieldNames(_))

private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)

override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
execEnv.createInput(createCsvInput(), returnType).name(explainSource())
}

/** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
override def getReturnType: RowTypeInfo = returnType

override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
}

/** Returns the schema of the produced table. */
override def getTableSchema = new TableSchema(fieldNames, fieldTypes)

/** Returns a copy of [[TableSource]] with ability to project fields */
override def projectFields(fields: Array[Int]): CsvTableSource = {

val selectedFields = if (fields.isEmpty) Array(0) else fields

new CsvTableSource(
path,
fieldNames,
fieldTypes,
selectedFields,
fieldDelim,
rowDelim,
quoteCharacter,
ignoreFirstLine,
ignoreComments,
lenient)
}

private def createCsvInput(): RowCsvInputFormat = {
val inputFormat = new RowCsvInputFormat(
new Path(path),
selectedFieldTypes,
rowDelim,
fieldDelim,
selectedFields)

inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
inputFormat.setLenient(lenient)
if (quoteCharacter != null) {
inputFormat.enableQuotedStringParsing(quoteCharacter)
}
if (ignoreComments != null) {
inputFormat.setCommentPrefix(ignoreComments)
}

inputFormat
}

override def equals(other: Any): Boolean = other match {
case that: CsvTableSource => returnType == that.returnType &&
path == that.path &&
fieldDelim == that.fieldDelim &&
rowDelim == that.rowDelim &&
quoteCharacter == that.quoteCharacter &&
ignoreFirstLine == that.ignoreFirstLine &&
ignoreComments == that.ignoreComments &&
lenient == that.lenient
case _ => false
}

override def hashCode(): Int = {
returnType.hashCode()
}

override def explainSource(): String = {
s”CsvTableSource(” +
s”read fields: ${getReturnType.getFieldNames.mkString(“, “)})”
}
}

CsvTableSource 同时实现了 BatchTableSource 及 StreamTableSource 接口;getDataSet 方法使用 ExecutionEnvironment.createInput 创建 DataSet;getDataStream 方法使用 StreamExecutionEnvironment.createInput 创建 DataStream
ExecutionEnvironment.createInput 及 StreamExecutionEnvironment.createInput 接收的 InputFormat 为 RowCsvInputFormat,通过 createCsvInput 创建而来
getTableSchema 方法返回的 TableSchema 通过 fieldNames 及 fieldTypes 创建;getReturnType 方法返回的 RowTypeInfo 通过 selectedFieldTypes 及 selectedFieldNames 创建;explainSource 方法这里返回的是 CsvTableSource 开头的字符串

小结

TableSource 定义了三个方法,分别是 getReturnType、getTableSchema、explainSource;BatchTableSource 继承了 TableSource,它定义了 getDataSet 方法;StreamTableSource 继承了 TableSource,它定义了 getDataStream 方法
CsvTableSource 同时实现了 BatchTableSource 及 StreamTableSource 接口;getDataSet 方法使用 ExecutionEnvironment.createInput 创建 DataSet;getDataStream 方法使用 StreamExecutionEnvironment.createInput 创建 DataStream
ExecutionEnvironment.createInput 及 StreamExecutionEnvironment.createInput 接收的 InputFormat 为 RowCsvInputFormat,通过 createCsvInput 创建而来;getTableSchema 方法返回的 TableSchema 通过 fieldNames 及 fieldTypes 创建;getReturnType 方法返回的 RowTypeInfo 通过 selectedFieldTypes 及 selectedFieldNames 创建;explainSource 方法这里返回的是 CsvTableSource 开头的字符串

doc
Define a TableSource

退出移动版