聊聊flink Table的Over Windows

36次阅读

共计 8458 个字符,预计需要花费 22 分钟才能阅读完成。


本文主要研究一下 flink Table 的 Over Windows
实例
Table table = input
.window([OverWindow w].as(“w”)) // define over window with alias w
.select(“a, b.sum over w, c.min over w”); // aggregate over the over window w
Over Windows 类似 SQL 的 over 子句,它可以基于 event-time、processing-time 或者 row-count;具体可以通过 Over 类来构造,其中必须设置 orderBy、preceding 及 as 方法;它有 Unbounded 及 Bounded 两大类
Unbounded Over Windows 实例

// Unbounded Event-time over window (assuming an event-time attribute “rowtime”)
.window(Over.partitionBy(“a”).orderBy(“rowtime”).preceding(“unbounded_range”).as(“w”));

// Unbounded Processing-time over window (assuming a processing-time attribute “proctime”)
.window(Over.partitionBy(“a”).orderBy(“proctime”).preceding(“unbounded_range”).as(“w”));

// Unbounded Event-time Row-count over window (assuming an event-time attribute “rowtime”)
.window(Over.partitionBy(“a”).orderBy(“rowtime”).preceding(“unbounded_row”).as(“w”));

// Unbounded Processing-time Row-count over window (assuming a processing-time attribute “proctime”)
.window(Over.partitionBy(“a”).orderBy(“proctime”).preceding(“unbounded_row”).as(“w”));
对于 event-time 及 processing-time 使用 unbounded_range 来表示 Unbounded,对于 row-count 使用 unbounded_row 来表示 Unbounded
Bounded Over Windows 实例
// Bounded Event-time over window (assuming an event-time attribute “rowtime”)
.window(Over.partitionBy(“a”).orderBy(“rowtime”).preceding(“1.minutes”).as(“w”))

// Bounded Processing-time over window (assuming a processing-time attribute “proctime”)
.window(Over.partitionBy(“a”).orderBy(“proctime”).preceding(“1.minutes”).as(“w”))

// Bounded Event-time Row-count over window (assuming an event-time attribute “rowtime”)
.window(Over.partitionBy(“a”).orderBy(“rowtime”).preceding(“10.rows”).as(“w”))

// Bounded Processing-time Row-count over window (assuming a processing-time attribute “proctime”)
.window(Over.partitionBy(“a”).orderBy(“proctime”).preceding(“10.rows”).as(“w”))
对于 event-time 及 processing-time 使用诸如 1.minutes 来表示 Bounded,对于 row-count 使用诸如 10.rows 来表示 Bounded
Table.window
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {

//……

@varargs
def window(overWindows: OverWindow*): OverWindowedTable = {

if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
throw new TableException(“Over-windows for batch tables are currently not supported.”)
}

if (overWindows.size != 1) {
throw new TableException(“Over-Windows are currently only supported single window.”)
}

new OverWindowedTable(this, overWindows.toArray)
}

//……

}
Table 提供了 OverWindow 参数的 window 方法,用来进行 Over Windows 操作,它创建的是 OverWindowedTable
OverWindow
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/windows.scala
/**
* Over window is similar to the traditional OVER SQL.
*/
case class OverWindow(
private[flink] val alias: Expression,
private[flink] val partitionBy: Seq[Expression],
private[flink] val orderBy: Expression,
private[flink] val preceding: Expression,
private[flink] val following: Expression)
OverWindow 定义了 alias、partitionBy、orderBy、preceding、following 属性
Over
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/java/windows.scala
object Over {

/**
* Specifies the time attribute on which rows are grouped.
*
* For streaming tables call [[orderBy ‘rowtime or orderBy ‘proctime]] to specify time mode.
*
* For batch tables, refer to a timestamp or long attribute.
*/
def orderBy(orderBy: String): OverWindowWithOrderBy = {
val orderByExpr = ExpressionParser.parseExpression(orderBy)
new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
}

/**
* Partitions the elements on some partition keys.
*
* @param partitionBy some partition keys.
* @return A partitionedOver instance that only contains the orderBy method.
*/
def partitionBy(partitionBy: String): PartitionedOver = {
val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray
new PartitionedOver(partitionByExpr)
}
}

class OverWindowWithOrderBy(
private val partitionByExpr: Array[Expression],
private val orderByExpr: Expression) {

/**
* Set the preceding offset (based on time or row-count intervals) for over window.
*
* @param preceding preceding offset relative to the current row.
* @return this over window
*/
def preceding(preceding: String): OverWindowWithPreceding = {
val precedingExpr = ExpressionParser.parseExpression(preceding)
new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
}

}

class PartitionedOver(private val partitionByExpr: Array[Expression]) {

/**
* Specifies the time attribute on which rows are grouped.
*
* For streaming tables call [[orderBy ‘rowtime or orderBy ‘proctime]] to specify time mode.
*
* For batch tables, refer to a timestamp or long attribute.
*/
def orderBy(orderBy: String): OverWindowWithOrderBy = {
val orderByExpr = ExpressionParser.parseExpression(orderBy)
new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
}
}

class OverWindowWithPreceding(
private val partitionBy: Seq[Expression],
private val orderBy: Expression,
private val preceding: Expression) {

private[flink] var following: Expression = _

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))

/**
* Assigns an alias for this window that the following `select()` clause can refer to.
*
* @param alias alias for this over window
* @return over window
*/
def as(alias: Expression): OverWindow = {

// set following to CURRENT_ROW / CURRENT_RANGE if not defined
if (null == following) {
if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
following = CURRENT_ROW
} else {
following = CURRENT_RANGE
}
}
OverWindow(alias, partitionBy, orderBy, preceding, following)
}

/**
* Set the following offset (based on time or row-count intervals) for over window.
*
* @param following following offset that relative to the current row.
* @return this over window
*/
def following(following: String): OverWindowWithPreceding = {
this.following(ExpressionParser.parseExpression(following))
}

/**
* Set the following offset (based on time or row-count intervals) for over window.
*
* @param following following offset that relative to the current row.
* @return this over window
*/
def following(following: Expression): OverWindowWithPreceding = {
this.following = following
this
}
}

Over 类是创建 over window 的帮助类,它提供了 orderBy 及 partitionBy 两个方法,分别创建的是 OverWindowWithOrderBy 及 PartitionedOver
PartitionedOver 提供了 orderBy 方法,创建的是 OverWindowWithOrderBy;OverWindowWithOrderBy 提供了 preceding 方法,创建的是 OverWindowWithPreceding
OverWindowWithPreceding 则包含了 partitionBy、orderBy、preceding 属性,它提供了 as 方法创建 OverWindow,另外还提供了 following 方法用于设置 following offset

OverWindowedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class OverWindowedTable(
private[flink] val table: Table,
private[flink] val overWindows: Array[OverWindow]) {

def select(fields: Expression*): Table = {
val expandedFields = expandProjectList(
fields,
table.logicalPlan,
table.tableEnv)

if(fields.exists(_.isInstanceOf[WindowProperty])){
throw new ValidationException(
“Window start and end properties are not available for Over windows.”)
}

val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)

new Table(
table.tableEnv,
Project(
expandedOverFields.map(UnresolvedAlias),
table.logicalPlan,
// required for proper projection push down
explicitAlias = true)
.validate(table.tableEnv)
)
}

def select(fields: String): Table = {
val fieldExprs = ExpressionParser.parseExpressionList(fields)
//get the correct expression for AggFunctionCall
val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
select(withResolvedAggFunctionCall: _*)
}
}
OverWindowedTable 构造器需要 overWindows 参数;它只提供 select 操作,其中 select 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 select 方法;select 方法创建了新的 Table,其 Project 的 projectList 为 expandedOverFields.map(UnresolvedAlias),而 expandedOverFields 则通过 resolveOverWindows(expandedFields, overWindows, table.tableEnv) 得到
小结

Over Windows 类似 SQL 的 over 子句,它可以基于 event-time、processing-time 或者 row-count;具体可以通过 Over 类来构造,其中必须设置 orderBy、preceding 及 as 方法;它有 Unbounded 及 Bounded 两大类 (对于 event-time 及 processing-time 使用 unbounded_range 来表示 Unbounded,对于 row-count 使用 unbounded_row 来表示 Unbounded;对于 event-time 及 processing-time 使用诸如 1.minutes 来表示 Bounded,对于 row-count 使用诸如 10.rows 来表示 Bounded)
Table 提供了 OverWindow 参数的 window 方法,用来进行 Over Windows 操作,它创建的是 OverWindowedTable;OverWindow 定义了 alias、partitionBy、orderBy、preceding、following 属性;Over 类是创建 over window 的帮助类,它提供了 orderBy 及 partitionBy 两个方法,分别创建的是 OverWindowWithOrderBy 及 PartitionedOver,而 PartitionedOver 提供了 orderBy 方法,创建的是 OverWindowWithOrderBy;OverWindowWithOrderBy 提供了 preceding 方法,创建的是 OverWindowWithPreceding;OverWindowWithPreceding 则包含了 partitionBy、orderBy、preceding 属性,它提供了 as 方法创建 OverWindow,另外还提供了 following 方法用于设置 following offset
OverWindowedTable 构造器需要 overWindows 参数;它只提供 select 操作,其中 select 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 select 方法;select 方法创建了新的 Table,其 Project 的 projectList 为 expandedOverFields.map(UnresolvedAlias),而 expandedOverFields 则通过 resolveOverWindows(expandedFields, overWindows, table.tableEnv) 得到

doc
Over Windows

正文完
 0