共计 5517 个字符,预计需要花费 14 分钟才能阅读完成。
序
本文主要研究一下 flink Table 的 Group Windows
实例
Table table = input
.window([Window w].as(“w”)) // define window with alias w
.groupBy(“w”) // group the table by window w
.select(“b.sum”); // aggregate
Table table = input
.window([Window w].as(“w”)) // define window with alias w
.groupBy(“w, a”) // group the table by attribute a and window w
.select(“a, b.sum”); // aggregate
Table table = input
.window([Window w].as(“w”)) // define window with alias w
.groupBy(“w, a”) // group the table by attribute a and window w
.select(“a, w.start, w.end, w.rowtime, b.count”); // aggregate and add window start, end, and rowtime timestamps
window 操作可以对 Window 进行别名,然后可以在 groupBy 及 select 中引用,window 有 start、end、rowtime 属性可以用,其中 start 及 rowtime 是 inclusive 的,而 end 为 exclusive
Tumbling Windows 实例
// Tumbling Event-time Window
.window(Tumble.over(“10.minutes”).on(“rowtime”).as(“w”));
// Tumbling Processing-time Window (assuming a processing-time attribute “proctime”)
.window(Tumble.over(“10.minutes”).on(“proctime”).as(“w”));
// Tumbling Row-count Window (assuming a processing-time attribute “proctime”)
.window(Tumble.over(“10.rows”).on(“proctime”).as(“w”));
Tumbling Windows 按固定窗口大小来移动,因而窗口不重叠;over 方法用于指定窗口大小;窗口大小可以基于 event-time、processing-time、row-count 来定义
Sliding Windows 实例
// Sliding Event-time Window
.window(Slide.over(“10.minutes”).every(“5.minutes”).on(“rowtime”).as(“w”));
// Sliding Processing-time window (assuming a processing-time attribute “proctime”)
.window(Slide.over(“10.minutes”).every(“5.minutes”).on(“proctime”).as(“w”));
// Sliding Row-count window (assuming a processing-time attribute “proctime”)
.window(Slide.over(“10.rows”).every(“5.rows”).on(“proctime”).as(“w”));
Sliding Windows 在 slide interval 小于 window size 的时候,窗口会有重叠,因而 rows 可能归属多个窗口;over 方法用于指定窗口大小,窗口大小可以基于 event-time、processing-time、row-count 来定义;every 方法用于指定 slide interval
Session Windows 实例
// Session Event-time Window
.window(Session.withGap(“10.minutes”).on(“rowtime”).as(“w”));
// Session Processing-time Window (assuming a processing-time attribute “proctime”)
.window(Session.withGap(“10.minutes”).on(“proctime”).as(“w”));
Session Windows 没有固定的窗口大小,它基于 inactivity 的程度来关闭窗口,withGap 方法用于指定两个窗口的 gap,作为 time interval;Session Windows 只能使用 event-time 或者 processing-time
Table.window
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
//……
def window(window: Window): WindowedTable = {
new WindowedTable(this, window)
}
//……
}
Table 提供了 window 操作,接收 Window 参数,创建的是 WindowedTable
WindowedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class WindowedTable(
private[flink] val table: Table,
private[flink] val window: Window) {
def groupBy(fields: Expression*): WindowGroupedTable = {
val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
if (fields.size != fieldsWithoutWindow.size + 1) {
throw new ValidationException(“GroupBy must contain exactly one window alias.”)
}
new WindowGroupedTable(table, fieldsWithoutWindow, window)
}
def groupBy(fields: String): WindowGroupedTable = {
val fieldsExpr = ExpressionParser.parseExpressionList(fields)
groupBy(fieldsExpr: _*)
}
}
WindowedTable 只提供 groupBy 操作,其中 groupBy 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 groupBy 方法;如果 groupBy 除了 window 没有其他属性,则其 parallelism 为 1,只会在单一 task 上执行;groupBy 方法创建的是 WindowGroupedTable
WindowGroupedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class WindowGroupedTable(
private[flink] val table: Table,
private[flink] val groupKeys: Seq[Expression],
private[flink] val window: Window) {
def select(fields: Expression*): Table = {
val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
val projectsOnAgg = replaceAggregationsAndProperties(
expandedFields, table.tableEnv, aggNames, propNames)
val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)
new Table(table.tableEnv,
Project(
projectsOnAgg,
WindowAggregate(
groupKeys,
window.toLogicalWindow,
propNames.map(a => Alias(a._1, a._2)).toSeq,
aggNames.map(a => Alias(a._1, a._2)).toSeq,
Project(projectFields, table.logicalPlan).validate(table.tableEnv)
).validate(table.tableEnv),
// required for proper resolution of the time attribute in multi-windows
explicitAlias = true
).validate(table.tableEnv))
}
def select(fields: String): Table = {
val fieldExprs = ExpressionParser.parseExpressionList(fields)
//get the correct expression for AggFunctionCall
val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
select(withResolvedAggFunctionCall: _*)
}
}
WindowGroupedTable 只提供 select 操作,其中 select 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 select 方法;select 方法创建了新的 Table,其 Project 的 child 为 WindowAggregate
小结
window 操作可以对 Window 进行别名,然后可以在 groupBy 及 select 中引用,window 有 start、end、rowtime 属性可以用,其中 start 及 rowtime 是 inclusive 的,而 end 为 exclusive
Tumbling Windows 按固定窗口大小来移动,因而窗口不重叠;over 方法用于指定窗口大小;窗口大小可以基于 event-time、processing-time、row-count 来定义;Sliding Windows 在 slide interval 小于 window size 的时候,窗口会有重叠,因而 rows 可能归属多个窗口;over 方法用于指定窗口大小,窗口大小可以基于 event-time、processing-time、row-count 来定义;every 方法用于指定 slide interval;Session Windows 没有固定的窗口大小,它基于 inactivity 的程度来关闭窗口,withGap 方法用于指定两个窗口的 gap,作为 time interval;Session Windows 只能使用 event-time 或者 processing-time
Table 提供了 window 操作,接收 Window 参数,创建的是 WindowedTable;WindowedTable 只提供 groupBy 操作,其中 groupBy 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 groupBy 方法;如果 groupBy 除了 window 没有其他属性,则其 parallelism 为 1,只会在单一 task 上执行;groupBy 方法创建的是 WindowGroupedTable;WindowGroupedTable 只提供 select 操作,其中 select 可以接收 String 类型的参数,也可以接收 Expression 类型的参数;String 类型的参数会被转换为 Expression 类型,最后调用的是 Expression 类型参数的 select 方法;select 方法创建了新的 Table,其 Project 的 child 为 WindowAggregate
doc
Group Windows