序本文主要研究一下flink的CsvTableSourceTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scalatrait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. * The fields of the return type are mapped to the table schema based on their name. * * @return The type of the returned [[DataSet]] or [[DataStream]]. / def getReturnType: TypeInformation[T] /* * Returns the schema of the produced table. * * @return The [[TableSchema]] of the produced table. / def getTableSchema: TableSchema /* * Describes the table source. * * @return A String explaining the [[TableSource]]. / def explainSource(): String = TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)}TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSourceBatchTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scalatrait BatchTableSource[T] extends TableSource[T] { /* * Returns the data of the table as a [[DataSet]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. / def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]}BatchTableSource继承了TableSource,它定义了getDataSet方法StreamTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scalatrait StreamTableSource[T] extends TableSource[T] { /* * Returns the data of the table as a [[DataStream]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. / def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]}StreamTableSource继承了TableSource,它定义了getDataStream方法CsvTableSourceflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scalaclass CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[]], private val selectedFields: Array[Int], private val fieldDelim: String, private val rowDelim: String, private val quoteCharacter: Character, private val ignoreFirstLine: Boolean, private val ignoreComments: String, private val lenient: Boolean) extends BatchTableSource[Row] with StreamTableSource[Row] with ProjectableTableSource[Row] { def this( path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[]], fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) = { this( path, fieldNames, fieldTypes, fieldTypes.indices.toArray, // initially, all fields are returned fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[]]) = { this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) } if (fieldNames.length != fieldTypes.length) { throw new TableException(“Number of field names and field types must be equal.”) } private val selectedFieldTypes = selectedFields.map(fieldTypes()) private val selectedFieldNames = selectedFields.map(fieldNames(_)) private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames) override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /* Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. / override def getReturnType: RowTypeInfo = returnType override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /* Returns the schema of the produced table. / override def getTableSchema = new TableSchema(fieldNames, fieldTypes) /* Returns a copy of [[TableSource]] with ability to project fields */ override def projectFields(fields: Array[Int]): CsvTableSource = { val selectedFields = if (fields.isEmpty) Array(0) else fields new CsvTableSource( path, fieldNames, fieldTypes, selectedFields, fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } private def createCsvInput(): RowCsvInputFormat = { val inputFormat = new RowCsvInputFormat( new Path(path), selectedFieldTypes, rowDelim, fieldDelim, selectedFields) inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) inputFormat.setLenient(lenient) if (quoteCharacter != null) { inputFormat.enableQuotedStringParsing(quoteCharacter) } if (ignoreComments != null) { inputFormat.setCommentPrefix(ignoreComments) } inputFormat } override def equals(other: Any): Boolean = other match { case that: CsvTableSource => returnType == that.returnType && path == that.path && fieldDelim == that.fieldDelim && rowDelim == that.rowDelim && quoteCharacter == that.quoteCharacter && ignoreFirstLine == that.ignoreFirstLine && ignoreComments == that.ignoreComments && lenient == that.lenient case _ => false } override def hashCode(): Int = { returnType.hashCode() } override def explainSource(): String = { s"CsvTableSource(" + s"read fields: ${getReturnType.getFieldNames.mkString(", “)})” }}CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStreamExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串小结TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStreamExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串docDefine a TableSource