关于spark:1sparkstreaming结合sparksql读取socket实时数据流

4次阅读

共计 4801 个字符,预计需要花费 13 分钟才能阅读完成。

Spark Streaming 是构建在 Spark Core 的 RDD 根底之上的,与此同时 Spark Streaming 引入了一个新的概念:DStream(Discretized Stream,离散化数据流),示意连续不断的数据流。DStream 形象是 Spark Streaming 的流解决模型,在外部实现上,Spark Streaming 会对输出数据依照工夫距离(如 1 秒)分段,每一段数据转换为 Spark 中的 RDD,这些分段就是 Dstream,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。Spark SQL 是 Spark 用于结构化数据 (structured data) 解决的 Spark 模块。Spark SQL 的前身是 Shark,Shark 是基于 Hive 所开发的工具,它批改了下图所示的右下角的内存治理、物理打算、执行三个模块,并使之能运行在 Spark 引擎上。

(1)pom 依赖:<dependencies>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.11</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.66</version>
</dependency>

</dependencies>(2)定义音讯对象 package com.pojo;

import java.io.Serializable;
import java.util.Date;

/**

  • Created by lj on 2022-07-13.
    */

public class WaterSensor implements Serializable {

public String id;
public long ts;
public int vc;

public WaterSensor(){}

public WaterSensor(String id,long ts,int vc){
    this.id = id;
    this.ts = ts;
    this.vc = vc;
}

public int getVc() {return vc;}

public void setVc(int vc) {this.vc = vc;}

public String getId() {return id;}

public void setId(String id) {this.id = id;}

public long getTs() {return ts;}

public void setTs(long ts) {this.ts = ts;}

}(3)构建数据生产者 package com.producers;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**

  • Created by lj on 2022-07-12.
    */

public class Socket_Producer {

public static void main(String[] args) throws IOException {

    try {ServerSocket ss = new ServerSocket(9999);
        System.out.println("启动 server ....");
        Socket s = ss.accept();
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
        String response = "java,1,2";

        // 每 2s 发送一次音讯
        int i = 0;
        Random r=new Random();   // 不传入种子
        String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

        while(true){response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n";
            System.out.println(response);
            try{bw.write(response);
                bw.flush();
                i++;
            }catch (Exception ex){System.out.println(ex.getMessage());
            }
            Thread.sleep(1000 * 30);
        }
    } catch (IOException | InterruptedException e) {e.printStackTrace();
    }
}

}(4)通过 sparkstreaming 接入 socket 数据源,sparksql 计算结果打印输出:package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**

  • Created by lj on 2022-07-16.
    */

public class SparkSql_Socket1 {

private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;

public static void main(String[] args) {
    // 初始化 sparkConf
    SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

    // 取得 JavaStreamingContext
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

    // 从 socket 源获取数据
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

    // 将 DStream 转换成 DataFrame 并且运行 sql 查问
    lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
        @Override
        public void call(JavaRDD<String> rdd, Time time) {SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

            // 通过反射将 RDD 转换为 DataFrame
            JavaRDD<WaterSensor> rowRDD = rdd.map(new Function<String, WaterSensor>() {
                @Override
                public WaterSensor call(String line) {String[] cols = line.split(",");
                    WaterSensor waterSensor = new WaterSensor(cols[0],Long.parseLong(cols[1]),Integer.parseInt(cols[2]));
                    return waterSensor;
                }
            });

            Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, WaterSensor.class);
            // 创立长期表
            dataFrame.createOrReplaceTempView("log");
            Dataset<Row> result = spark.sql("select * from log");
            System.out.println("=========" + time + "=========");
            // 输入前 20 条数据
            result.show();}
    });

    // 开始作业
    ssc.start();
    try {ssc.awaitTermination();
    } catch (Exception e) {e.printStackTrace();
    } finally {ssc.close();
    }
}

}(5)成果演示:

代码中定义的是 1 分钟的批处理距离,所以每 1 分钟会触发一次计算:

正文完
 0