一、特色抽取并转换成Spark或Flink执行打算的思路

1、执行过程形象:

  1. 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特色名,aggreagate key:聚合的主键,rule text:特色的计算逻辑,即算子链路;
  2. 将RuleText转换成Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中Expression为Spark或Flink中的Expression表达式对象;
  3. 合并雷同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
  4. 遍历生成执行打算:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;

2、Spark & Flink中将RuleText转换成Expression办法

Spark:SparkSqlParser.parseExpression("count(*)")
Flink:ExpressionParser.parseExpression("count(*)")

3、Spark & Flink中如何反对自定义算子

Spark:spark.udf.register("len", (_:String).length)
Flink:env.registerFunction("log1", new Log)

4、代码示例:

Spark:

package com.yang.spark.framework import com.yang.spark.framework.udf.{Median, StandardDeviation}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._ /**  *  * @author yangfan  * @since 2021/5/24  * @version 1.0.0  */object AggregateTestDataFrame {   def main(args: Array[String]): Unit = {      val source = Seq(      ("100", "100"),      ("100", "101"),      ("100", "105"),      ("101", "102"),      ("102", "103")    )         val groupKeys = Array("id1")         val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std")     val spark = SparkSession.builder()      .master("local[*]")      .appName(this.getClass.getSimpleName)      .config(new SparkConf())      //.enableHiveSupport()      .getOrCreate()    import spark.implicits._     spark.udf.register("median", new Median)    spark.udf.register("std", new StandardDeviation)    spark.udf.register("len", (_: String).length)     val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2")     val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc)    inputDF      //.groupBy(groupKeys.head, groupKeys.tail:_*)      .groupBy("id1")      //.agg(functions.head, functions.tail:_*)      /*.agg(        sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"),        expr("id2/2 > 50").alias("fk2"),        sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),        avg("price").over(w.rowsBetween(Window.currentRow, 4)),        row_number().over(w.rowsBetween(Window.currentRow, 4))      )*/      .agg(        expr("count(*) as fk1"),        expr("sum(cnt) as fk2"),        expr("distinct(count(cnt)) as fk3")      )      .show()  }}

Flink:

package com.yang.stream import org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.table.api.scala._import org.apache.flink.table.api.{EnvironmentSettings, Tumble}import org.apache.flink.table.expressions.ExpressionParserimport org.apache.flink.types.Row /**  *  * @author yangfan  * @since 2021/5/24  * @version 1.0.0  */object TableApiTest {   def main(args: Array[String]): Unit = {    val host: String = "127.0.0.1"    val port: Int = 8888     val env = StreamExecutionEnvironment.getExecutionEnvironment    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()    val tableEnv = StreamTableEnvironment.create(env, settings)     val inputDStream = env.socketTextStream(host, port)    //inputDStream.print("input").setParallelism(1)     val dataStream = inputDStream.flatMap(_.split("\\s"))    //dataStream.print("input stream").setParallelism(1)     val table = tableEnv.fromDataStream(dataStream)     /*tableEnv.registerFunction("log1", new Log)    val table1 = table.addColumns("log1(10) as t1").select("t1")    val value = tableEnv.toAppendStream[Row](table1)    println(tableEnv.explain(table1))    value.print("table1")*/     //tableEnv.registerFunction("log1", new Log)    val table1 = table.as("id")      .addColumns("1 as cnt")      .window(Tumble over 5.minutes on 'rowtime as 'w)      .groupBy('id)      //.aggregate("sum(cnt) as sum_cnt")      //.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt")      //.select('id, 'cnt.sum as 'sum_cnt, 'cnt.count.distinct as 'count_cnt)      .select(        expr("id"),        expr("count(*) as fk1"),        expr("sum(cnt) as fk2"),        expr("distinct(count(cnt)) as fk3")      )    val value = tableEnv.toRetractStream[Row](table1)    println(table1.getSchema)    println(tableEnv.explain(table1))    value.print("table1")     env.execute(this.getClass.getName)  }   private final def expr(expr: String) = ExpressionParser.parseExpression(expr)}