共计 3965 个字符,预计需要花费 10 分钟才能阅读完成。
一、特色抽取并转换成 Spark 或 Flink 执行打算的思路
1、执行过程形象:
- 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特色名,aggreagate key:聚合的主键,rule text:特色的计算逻辑,即算子链路;
- 将 RuleText 转换成 Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中 Expression 为 Spark 或 Flink 中的 Expression 表达式对象;
- 合并雷同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
- 遍历生成执行打算:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;
2、Spark & Flink 中将 RuleText 转换成 Expression 办法
Spark:SparkSqlParser.parseExpression(“count(*)”)
Flink:ExpressionParser.parseExpression(“count(*)”)
3、Spark & Flink 中如何反对自定义算子
Spark:spark.udf.register(“len”, (_:String).length)
Flink:env.registerFunction(“log1”, new Log)
4、代码示例:
Spark:
package com.yang.spark.framework
import com.yang.spark.framework.udf.{Median, StandardDeviation}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
/**
*
* @author yangfan
* @since 2021/5/24
* @version 1.0.0
*/
object AggregateTestDataFrame {def main(args: Array[String]): Unit = {
val source = Seq(("100", "100"),
("100", "101"),
("100", "105"),
("101", "102"),
("102", "103")
)
val groupKeys = Array("id1")
val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std")
val spark = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.config(new SparkConf())
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
spark.udf.register("median", new Median)
spark.udf.register("std", new StandardDeviation)
spark.udf.register("len", (_: String).length)
val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2")
val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc)
inputDF
//.groupBy(groupKeys.head, groupKeys.tail:_*)
.groupBy("id1")
//.agg(functions.head, functions.tail:_*)
/*.agg(sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"),
expr("id2/2 > 50").alias("fk2"),
sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
avg("price").over(w.rowsBetween(Window.currentRow, 4)),
row_number().over(w.rowsBetween(Window.currentRow, 4))
)*/
.agg(expr("count(*) as fk1"),
expr("sum(cnt) as fk2"),
expr("distinct(count(cnt)) as fk3")
)
.show()}
}
Flink:
package com.yang.stream
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Tumble}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.types.Row
/**
*
* @author yangfan
* @since 2021/5/24
* @version 1.0.0
*/
object TableApiTest {def main(args: Array[String]): Unit = {
val host: String = "127.0.0.1"
val port: Int = 8888
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val inputDStream = env.socketTextStream(host, port)
//inputDStream.print("input").setParallelism(1)
val dataStream = inputDStream.flatMap(_.split("\\s"))
//dataStream.print("input stream").setParallelism(1)
val table = tableEnv.fromDataStream(dataStream)
/*tableEnv.registerFunction("log1", new Log)
val table1 = table.addColumns("log1(10) as t1").select("t1")
val value = tableEnv.toAppendStream[Row](table1)
println(tableEnv.explain(table1))
value.print("table1")*/
//tableEnv.registerFunction("log1", new Log)
val table1 = table.as("id")
.addColumns("1 as cnt")
.window(Tumble over 5.minutes on 'rowtime as'w)
.groupBy('id)
//.aggregate("sum(cnt) as sum_cnt")
//.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt")
//.select('id,'cnt.sum as 'sum_cnt,'cnt.count.distinct as 'count_cnt)
.select(expr("id"),
expr("count(*) as fk1"),
expr("sum(cnt) as fk2"),
expr("distinct(count(cnt)) as fk3")
)
val value = tableEnv.toRetractStream[Row](table1)
println(table1.getSchema)
println(tableEnv.explain(table1))
value.print("table1")
env.execute(this.getClass.getName)
}
private final def expr(expr: String) = ExpressionParser.parseExpression(expr)
}
正文完