乐趣区

大数据系列Spark学习笔记Spark-Streaming

1. Spark Streaming

  • Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理
  • Spark Streaing 中有一个最基本的抽象叫 DStream(代理), 本质上就是一系列连续的 RDD,DStream 其实就是对 RDD 的封装
  • DStream 可以认为是一个 RDD 的工厂,该 DStream 里面生产都是相同业务逻辑的 RDD,只不过是 RDD 里面要读取数据的不相同
  • 在一个批次的处理时间间隔里,DStream 只产生一个 RDD
  • DStream 就相当于一个 “ 模板 ”, 我们可以根据这个“ 模板 ” 来处理一段时间间隔之内产生的这个 rdd, 以此为依据来构建 rdd 的 DAG

2. 当下比较流行的实时计算引擎

吞吐量 编程语言 处理速度 生态

Storm 较低 clojure 非常快(亚秒) 阿里(JStorm)

Flink 较高 scala 较快(亚秒) 国内使用较少

Spark Streaming 非常高 scala 快(毫秒) 完善的生态圈

3. Spark Streaming 处理网络数据

// 创建 StreamingContext  至少要有两个线程  一个线程用于接收数据  一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
pairRetDS.print()
// 开启流计算
ssc.start()
// 优雅的关闭
ssc.awaitTermination()

4. Spark Streaming 接收数据的两种方式(Kafka)

  • Receiver

    • 偏移量是由 zookeeper 来维护的
    • 使用的是 Kafka 高级的 API(消费者的 API)
    • 编程简单
    • 效率低(为了保证数据的安全性, 会开启 WAL)
    • kafka0.10 的版本中已经彻底弃用 Receiver 了
    • 生产环境一般不会使用这种方式
  • Direct

    • 偏移量是有我们来手动维护
    • 效率高(我们直接把 spark streaming 接入到 kafka 的分区中了)
    • 编程比较复杂
    • 生产环境一般使用这种方式

5. Spark Streaming 整合 Kafka

  • 基于 Receiver 的方式整合 kafka(生产环境不建议使用, 在 0.10 中已经移除了)

    // 创建 StreamingContext  至少要有两个线程  一个线程用于接收数据  一个线程用于处理数据
    val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))
    val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
    val groupId = "myid"
    val topics = Map("hadoop" -> 3)
    val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
    receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  • 基于 Direct 的方式(生产环境使用)

    // 创建 StreamingContext  至少要有两个线程  一个线程用于接收数据  一个线程用于处理数据
    val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))
    val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
    val topics = Set("hadoop")
    val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()

6. 实时流计算的架构

1. 生成日志(模拟用户访问 web 应用的日志)

public class GenerateAccessLog {public static void main(String[] args) throws IOException, InterruptedException {
        // 准备数据
        int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
        String[] requesTypes = {"GET", "POST"};
        String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};

        String[] courseNames = {"大数据", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
        String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
        FileWriter fw = new FileWriter(args[0]);
        PrintWriter printWriter = new PrintWriter(fw);
        while (true) {//            Thread.sleep(1000);
            // 产生字段
            String date = new Date().toLocaleString();
            String method = requesTypes[getRandomNum(0, requesTypes.length)];
            String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
            String HTTPVERSION = "HTTP/1.1";
            String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
            String reference = references[getRandomNum(0, references.length)];
            String rowLog = date + "" + method +" "+ url +" "+ HTTPVERSION +" "+ ip +" " + reference;
            printWriter.println(rowLog);
            printWriter.flush();}
    }


    //[start,end)
    public static int getRandomNum(int start, int end) {int i = new Random().nextInt(end - start) + start;
        return i;
    }
}

2. flume 使用 avro 采集 web 应用服务器的日志数据

  • 采集命令执行的结果到 avro 中
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -F /logs/access.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactionCapacity = 100

#define sink  采集日志到 uplooking03
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = uplooking03
f1.sinks.k1.port = 44444

#bind sources and sink to channel 
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
  • 从 avro 采集到控制台
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = logger

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
  • 从 avro 采集到 kafka 中
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.topic = hadoop
f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
f2.sinks.k2.requiredAcks = 1
f2.sinks.k2.batchSize = 2

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
退出移动版