共计 2788 个字符,预计需要花费 7 分钟才能阅读完成。
Spark SQL 中用户自定义函数,用法和 Spark SQL 中的内置函数相似;是 saprk SQL 中内置函数无奈满足要求,用户依据业务需要自定义的函数。
首先定义一个 UDF 函数:
package com.udf;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import scala.collection.mutable.WrappedArray;
/**
* Created by lj on 2022-07-25.
*/
public class TestUDF implements UDF1<String, String> {
@Override
public String call(String s) throws Exception {return s+"_udf";}
}
应用 UDF 函数:
package com.examples;
import com.pojo.WaterSensor;
import com.udf.TestUDF;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-25.
*/
public class SparkSql_Socket_UDF {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
// 初始化 sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
// 取得 JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));
/**
* 设置日志的级别:防止日志反复
*/
ssc.sparkContext().setLogLevel("ERROR");
// 从 socket 源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(6), Durations.minutes(9)); // 指定窗口大小 和 滑动频率 必须是批处理工夫的整数倍
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
spark.udf().register("TestUDF", new TestUDF(), DataTypes.StringType);
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 创立长期表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select *,TestUDF(id) as udftest from log");
System.out.println("=========" + time + "=========");
// 输入前 20 条数据
result.show();}
});
// 开始作业
ssc.start();
try {ssc.awaitTermination();
} catch (Exception e) {e.printStackTrace();
} finally {ssc.close();
}
}
}
代码阐明:
利用成果展现:
正文完