乐趣区

聊聊flink Table的Joins


本文主要研究一下 flink Table 的 Joins
实例
Inner Join
Table left = tableEnv.fromDataSet(ds1, “a, b, c”);
Table right = tableEnv.fromDataSet(ds2, “d, e, f”);
Table result = left.join(right).where(“a = d”).select(“a, b, e”);
join 方法即 inner join
Outer Join
Table left = tableEnv.fromDataSet(ds1, “a, b, c”);
Table right = tableEnv.fromDataSet(ds2, “d, e, f”);

Table leftOuterResult = left.leftOuterJoin(right, “a = d”).select(“a, b, e”);
Table rightOuterResult = left.rightOuterJoin(right, “a = d”).select(“a, b, e”);
Table fullOuterResult = left.fullOuterJoin(right, “a = d”).select(“a, b, e”);
outer join 分为 leftOuterJoin、rightOuterJoin、fullOuterJoin 三种
Time-windowed Join
Table left = tableEnv.fromDataSet(ds1, “a, b, c, ltime.rowtime”);
Table right = tableEnv.fromDataSet(ds2, “d, e, f, rtime.rowtime”);

Table result = left.join(right)
.where(“a = d && ltime >= rtime – 5.minutes && ltime < rtime + 10.minutes”)
.select(“a, b, e, ltime”);
time-windowed join 需要至少一个等值条件,然后还需要一个与两边时间相关的条件 (可以使用 <, <=, >=, >)
Inner Join with Table Function
// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction(“split”, split);

// join
Table orders = tableEnv.scan(“Orders”);
Table result = orders
.join(new Table(tableEnv, “split(c)”).as(“s”, “t”, “v”))
.select(“a, b, s, t, v”);
Table 也可以跟 table function 进行 inner join,如果 table function 返回空,则 table 的记录被丢弃
Left Outer Join with Table Function
// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction(“split”, split);

// join
Table orders = tableEnv.scan(“Orders”);
Table result = orders
.leftOuterJoin(new Table(tableEnv, “split(c)”).as(“s”, “t”, “v”))
.select(“a, b, s, t, v”);
Table 也可以跟 table function 进行 left outer join,如果 table function 返回空,则 table 的记录保留,空的部分为 null 值
Join with Temporal Table
Table ratesHistory = tableEnv.scan(“RatesHistory”);

// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
“r_proctime”,
“r_currency”);
tableEnv.registerFunction(“rates”, rates);

// join with “Orders” based on the time attribute and key
Table orders = tableEnv.scan(“Orders”);
Table result = orders
.join(new Table(tEnv, “rates(o_proctime)”), “o_currency = r_currency”)
Table 也可以跟 Temporal tables 进行 join,Temporal tables 通过 Table 的 createTemporalTableFunction 而来,目前仅仅支持 inner join 的方式
Table
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
//……

def join(right: Table): Table = {
join(right, None, JoinType.INNER)
}

def join(right: Table, joinPredicate: String): Table = {
join(right, joinPredicate, JoinType.INNER)
}

def join(right: Table, joinPredicate: Expression): Table = {
join(right, Some(joinPredicate), JoinType.INNER)
}

def leftOuterJoin(right: Table): Table = {
join(right, None, JoinType.LEFT_OUTER)
}

def leftOuterJoin(right: Table, joinPredicate: String): Table = {
join(right, joinPredicate, JoinType.LEFT_OUTER)
}

def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
}

def rightOuterJoin(right: Table, joinPredicate: String): Table = {
join(right, joinPredicate, JoinType.RIGHT_OUTER)
}

def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
}

def fullOuterJoin(right: Table, joinPredicate: String): Table = {
join(right, joinPredicate, JoinType.FULL_OUTER)
}

def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
join(right, Some(joinPredicate), JoinType.FULL_OUTER)
}

private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
join(right, Some(joinPredicateExpr), joinType)
}

private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {

// check if we join with a table or a table function
if (!containsUnboundedUDTFCall(right.logicalPlan)) {
// regular table-table join

// check that the TableEnvironment of right table is not null
// and right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException(“Only tables from the same TableEnvironment can be joined.”)
}

new Table(
tableEnv,
Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
.validate(tableEnv))

} else {
// join with a table function

// check join type
if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
throw new ValidationException(
“TableFunctions are currently supported for join and leftOuterJoin.”)
}

val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
val udtfCall = LogicalTableFunctionCall(
udtf.functionName,
udtf.tableFunction,
udtf.parameters,
udtf.resultType,
udtf.fieldNames,
this.logicalPlan
).validate(tableEnv)

new Table(
tableEnv,
Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
.validate(tableEnv))
}
}

//……
}
Table 定义了 join、leftOuterJoin、rightOuterJoin、fullOuterJoin 方法,其最后都是调用的私有的 join 方法,其中 JoinType 用于表达 join 类型,分别有 INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER 这几种;另外接收 String 类型或者 Expression 的条件表达式,其中 String 类型最后是被解析为 Expression 类型;join 方法最后是使用 Join 创建了新的 Table
Join
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Join(
left: LogicalNode,
right: LogicalNode,
joinType: JoinType,
condition: Option[Expression],
correlated: Boolean) extends BinaryNode {

override def output: Seq[Attribute] = {
left.output ++ right.output
}

private case class JoinFieldReference(
name: String,
resultType: TypeInformation[_],
left: LogicalNode,
right: LogicalNode) extends Attribute {

val isFromLeftInput: Boolean = left.output.map(_.name).contains(name)

val (indexInInput, indexInJoin) = if (isFromLeftInput) {
val indexInLeft = left.output.map(_.name).indexOf(name)
(indexInLeft, indexInLeft)
} else {
val indexInRight = right.output.map(_.name).indexOf(name)
(indexInRight, indexInRight + left.output.length)
}

override def toString = s”‘$name”

override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
// look up type of field
val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
// create a new RexInputRef with index offset
new RexInputRef(indexInJoin, fieldType)
}

override def withName(newName: String): Attribute = {
if (newName == name) {
this
} else {
JoinFieldReference(newName, resultType, left, right)
}
}
}

override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
val partialFunction: PartialFunction[Expression, Expression] = {
case field: ResolvedFieldReference => JoinFieldReference(
field.name,
field.resultType,
left,
right)
}
val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
}

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)

val corSet = mutable.Set[CorrelationId]()
if (correlated) {
corSet += relBuilder.peek().getCluster.createCorrel()
}

relBuilder.join(
convertJoinType(joinType),
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
corSet.asJava)
}

private def convertJoinType(joinType: JoinType) = joinType match {
case JoinType.INNER => JoinRelType.INNER
case JoinType.LEFT_OUTER => JoinRelType.LEFT
case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
case JoinType.FULL_OUTER => JoinRelType.FULL
}

private def ambiguousName: Set[String] =
left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)

override def validate(tableEnv: TableEnvironment): LogicalNode = {
val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
failValidation(s”Filter operator requires a boolean expression as input, ” +
s”but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}”)
} else if (ambiguousName.nonEmpty) {
failValidation(s”join relations with ambiguous names: ${ambiguousName.mkString(“, “)}”)
}

resolvedJoin.condition.foreach(testJoinCondition)
resolvedJoin
}

private def testJoinCondition(expression: Expression): Unit = {

def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
if x.isFromLeftInput != y.isFromLeftInput => true
case _ => false
}

def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
case (x: JoinFieldReference) :: (_) :: Nil => true
case (_) :: (y: JoinFieldReference) :: Nil => true
case _ => false
}

var equiJoinPredicateFound = false
// Whether the predicate is literal true.
val alwaysTrue = expression match {
case x: Literal if x.value.equals(true) => true
case _ => false
}

def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
case x: And => x.children.foreach(validateConditions(_, isAndBranch))
case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
case x: EqualTo =>
if (isAndBranch && checkIfJoinCondition(x)) {
equiJoinPredicateFound = true
}
case x: BinaryComparison =>
// The boolean literal should be a valid condition type.
case x: Literal if x.resultType == Types.BOOLEAN =>
case x => failValidation(
s”Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x”)
}

validateConditions(expression, isAndBranch = true)

// Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join
// predicates except literal true for TableFunction left outer join.
if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
if (!alwaysTrue) failValidation(“TableFunction left outer join predicate can only be ” +
“empty or literal true.”)
} else {
if (!equiJoinPredicateFound) {
failValidation(
s”Invalid join condition: $expression. At least one equi-join predicate is ” +
s”required.”)
}
}
}
}
Join 继承了 BinaryNode,它内部将 flink 的 JoinType 转为 calcite 的 JoinRelType 类型,construct 方法通过 relBuilder.join 来构建 join 关系
小结

Table 支持多种形式的 join,其中包括 Inner Join、Outer Join、Time-windowed Join、Inner Join with Table Function、Left Outer Join with Table Function、Join with Temporal Table
Table 定义了 join、leftOuterJoin、rightOuterJoin、fullOuterJoin 方法,其最后都是调用的私有的 join 方法,其中 JoinType 用于表达 join 类型,分别有 INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER 这几种;另外接收 String 类型或者 Expression 的条件表达式,其中 String 类型最后是被解析为 Expression 类型;join 方法最后是使用 Join 创建了新的 Table
Join 继承了 BinaryNode,它内部将 flink 的 JoinType 转为 calcite 的 JoinRelType 类型,construct 方法通过 relBuilder.join 来构建 join 关系

doc
Joins

退出移动版