乐趣区

关于数据挖掘:特征抽取并转换成Spark或Flink执行计划的思路

一、特色抽取并转换成 Spark 或 Flink 执行打算的思路

1、执行过程形象:

  1. 读取并解析配置文件:Text → Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]>;其中 feature key:特色名,aggreagate key:聚合的主键,rule text:特色的计算逻辑,即算子链路;
  2. 将 RuleText 转换成 Expression:Seq<FeatureConf[FeatureKey, AggregateKey, RuleText]> → Seq<FeatureConf[FeatureKey, AggregateKey, Expression]>;其中 Expression 为 Spark 或 Flink 中的 Expression 表达式对象;
  3. 合并雷同聚合维度的逻辑:Seq<FeatureConf[FeatureKey, AggregateKey, Expression]> → Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]>;
  4. 遍历生成执行打算:Seq<AggregateKey, FeatureConf[FeatureKey, _, Expression]> → SparkPlan & FlinkPlan;

2、Spark & Flink 中将 RuleText 转换成 Expression 办法

Spark:SparkSqlParser.parseExpression(“count(*)”)
Flink:ExpressionParser.parseExpression(“count(*)”)

3、Spark & Flink 中如何反对自定义算子

Spark:spark.udf.register(“len”, (_:String).length)
Flink:env.registerFunction(“log1”, new Log)

4、代码示例:

Spark:

package com.yang.spark.framework
 
import com.yang.spark.framework.udf.{Median, StandardDeviation}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
 
/**
  *
  * @author yangfan
  * @since 2021/5/24
  * @version 1.0.0
  */
object AggregateTestDataFrame {def main(args: Array[String]): Unit = {
 
 
    val source = Seq(("100", "100"),
      ("100", "101"),
      ("100", "105"),
      ("101", "102"),
      ("102", "103")
    )
     
    val groupKeys = Array("id1")
     
    val functions = Array("id2" -> "max", "id2" -> "min", "id2" -> "std")
 
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .config(new SparkConf())
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
 
    spark.udf.register("median", new Median)
    spark.udf.register("std", new StandardDeviation)
    spark.udf.register("len", (_: String).length)
 
    val inputDF = spark.sparkContext.makeRDD(source).toDF("id1", "id2")
 
    val w = Window.partitionBy("name").orderBy(sum(when(expr("id2/2 > 50"), $"id2")).desc)
    inputDF
      //.groupBy(groupKeys.head, groupKeys.tail:_*)
      .groupBy("id1")
      //.agg(functions.head, functions.tail:_*)
      /*.agg(sum(when(expr("id2/2 > 50"), $"id2")).alias("fk1"),
        expr("id2/2 > 50").alias("fk2"),
        sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
        avg("price").over(w.rowsBetween(Window.currentRow, 4)),
        row_number().over(w.rowsBetween(Window.currentRow, 4))
      )*/
      .agg(expr("count(*) as fk1"),
        expr("sum(cnt) as fk2"),
        expr("distinct(count(cnt)) as fk3")
      )
      .show()}
}

Flink:

package com.yang.stream
 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Tumble}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.types.Row
 
/**
  *
  * @author yangfan
  * @since 2021/5/24
  * @version 1.0.0
  */
object TableApiTest {def main(args: Array[String]): Unit = {
    val host: String = "127.0.0.1"
    val port: Int = 8888
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
 
    val inputDStream = env.socketTextStream(host, port)
    //inputDStream.print("input").setParallelism(1)
 
    val dataStream = inputDStream.flatMap(_.split("\\s"))
    //dataStream.print("input stream").setParallelism(1)
 
    val table = tableEnv.fromDataStream(dataStream)
 
    /*tableEnv.registerFunction("log1", new Log)
    val table1 = table.addColumns("log1(10) as t1").select("t1")
    val value = tableEnv.toAppendStream[Row](table1)
    println(tableEnv.explain(table1))
    value.print("table1")*/
 
    //tableEnv.registerFunction("log1", new Log)
    val table1 = table.as("id")
      .addColumns("1 as cnt")
      .window(Tumble over 5.minutes on 'rowtime as'w)
      .groupBy('id)
      //.aggregate("sum(cnt) as sum_cnt")
      //.select("id, sum(cnt) as sum_cnt, distinct(count(log1(cnt))) as count_cnt")
      //.select('id,'cnt.sum as 'sum_cnt,'cnt.count.distinct as 'count_cnt)
      .select(expr("id"),
        expr("count(*) as fk1"),
        expr("sum(cnt) as fk2"),
        expr("distinct(count(cnt)) as fk3")
      )
    val value = tableEnv.toRetractStream[Row](table1)
    println(table1.getSchema)
    println(tableEnv.explain(table1))
    value.print("table1")
 
    env.execute(this.getClass.getName)
  }
 
  private final def expr(expr: String) = ExpressionParser.parseExpression(expr)
}
退出移动版