一、特色抽取并转换成Spark或Flink执行打算的思路
1、执行过程形象:
- 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特色名,aggreagate key:聚合的主键,rule text:特色的计算逻辑,即算子链路;
- 将RuleText转换成Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中Expression为Spark或Flink中的Expression表达式对象;
- 合并雷同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
- 遍历生成执行打算:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;
2、Spark & Flink中将RuleText转换成Expression办法
Spark:SparkSqlParser.parseExpression("count(*)")
Flink:ExpressionParser.parseExpression("count(*)")
3、Spark & Flink中如何反对自定义算子
Spark:spark.udf.register("len", (_:String).length)
Flink:env.registerFunction("log1", new Log)
4、代码示例:
Spark:
package com.yang.spark.framework import com.yang.spark.framework.udf.{Median, StandardDeviation}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._ /** * * @author yangfan * @since 2021/5/24 * @version 1.0.0 */object AggregateTestDataFrame { def main(args: Array[String]): Unit = { val source = Seq( ("100", "100"), ("100", "101"), ("100", "105"), ("101", "102"), ("102", "103") ) val groupKeys = Array("id1") val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std") val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .config(new SparkConf()) //.enableHiveSupport() .getOrCreate() import spark.implicits._ spark.udf.register("median", new Median) spark.udf.register("std", new StandardDeviation) spark.udf.register("len", (_: String).length) val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2") val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc) inputDF //.groupBy(groupKeys.head, groupKeys.tail:_*) .groupBy("id1") //.agg(functions.head, functions.tail:_*) /*.agg( sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"), expr("id2/2 > 50").alias("fk2"), sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)), avg("price").over(w.rowsBetween(Window.currentRow, 4)), row_number().over(w.rowsBetween(Window.currentRow, 4)) )*/ .agg( expr("count(*) as fk1"), expr("sum(cnt) as fk2"), expr("distinct(count(cnt)) as fk3") ) .show() }}
Flink:
package com.yang.stream import org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.table.api.scala._import org.apache.flink.table.api.{EnvironmentSettings, Tumble}import org.apache.flink.table.expressions.ExpressionParserimport org.apache.flink.types.Row /** * * @author yangfan * @since 2021/5/24 * @version 1.0.0 */object TableApiTest { def main(args: Array[String]): Unit = { val host: String = "127.0.0.1" val port: Int = 8888 val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) val inputDStream = env.socketTextStream(host, port) //inputDStream.print("input").setParallelism(1) val dataStream = inputDStream.flatMap(_.split("\\s")) //dataStream.print("input stream").setParallelism(1) val table = tableEnv.fromDataStream(dataStream) /*tableEnv.registerFunction("log1", new Log) val table1 = table.addColumns("log1(10) as t1").select("t1") val value = tableEnv.toAppendStream[Row](table1) println(tableEnv.explain(table1)) value.print("table1")*/ //tableEnv.registerFunction("log1", new Log) val table1 = table.as("id") .addColumns("1 as cnt") .window(Tumble over 5.minutes on 'rowtime as 'w) .groupBy('id) //.aggregate("sum(cnt) as sum_cnt") //.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt") //.select('id, 'cnt.sum as 'sum_cnt, 'cnt.count.distinct as 'count_cnt) .select( expr("id"), expr("count(*) as fk1"), expr("sum(cnt) as fk2"), expr("distinct(count(cnt)) as fk3") ) val value = tableEnv.toRetractStream[Row](table1) println(table1.getSchema) println(tableEnv.explain(table1)) value.print("table1") env.execute(this.getClass.getName) } private final def expr(expr: String) = ExpressionParser.parseExpression(expr)}