乐趣区

关于spark:Spark-StreamingSpark第一代实时计算引擎

尽管 SparkStreaming 曾经进行更新,Spark 的重点也放到了 Structured Streaming,但因为 Spark 版本过低或者其余技术选型问题,可能还是会抉择 SparkStreaming。
SparkStreaming 对于工夫窗口,事件工夫尽管撑持较少,但还是能够满足局部的实时计算场景的,SparkStreaming 材料较多,这里也做一个简略介绍。

一. 什么是 Spark Streaming

Spark Streaming 在过后是为了与过后的 Apache Storm 竞争,也让 Spark 能够用于流式数据的解决。依据其官网文档介绍,Spark Streaming 有高吞吐量和容错能力强等特点。Spark Streaming 反对的数据输出源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简略的 TCP 套接字等等。数据输出后能够用 Spark 的高度形象原语如:map、reduce、join、window 等进行运算。而后果也能保留在很多中央,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完满交融。
当然 Storm 目前曾经慢慢淡出,Flink 开始大放异彩。

Spark 与 Storm 的比照

二、SparkStreaming 入门

Spark Streaming 是 Spark Core API 的扩大,它反对弹性的,高吞吐的,容错的实时数据流的解决。数据能够通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也能够通过例如 mapreducejoinwindow 等的高级函数组成的简单算法解决。最终,解决后的数据能够输入到文件系统,数据库以及实时仪表盘中。事实上,你还能够在 data streams(数据流)上应用 [机器学习] 以及 [图计算] 算法。
在外部,它工作原理如下,Spark Streaming 接管实时输出数据流并将数据切分成多个 batch(批)数据,而后由 Spark 引擎解决它们以生成最终的 stream of results in batches(分批流后果)。

Spark Streaming 提供了一个名为 discretized streamDStream 的高级形象,它代表一个间断的数据流。DStream 能够从数据源的输出数据流创立,例如 Kafka,Flume 以及 Kinesis,或者在其余 DStream 上进行高层次的操作以创立。在外部,一个 DStream 是通过一系列的 [RDDs] 来示意。

本指南通知你如何应用 DStream 来编写一个 Spark Streaming 程序。你能够应用 Scala,Java 或者 Python(Spark 1.2 版本后引进)来编写 Spark Streaming 程序。

在 idea 中新建 maven 我的项目

引入依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

Project Structure —— Global Libraries —— 把 scala 增加到 add module

新建 Scala Class

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Demo {

  // 屏蔽日志
  Logger.getLogger("org.apache")setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    //local 会有问题  起码两个线程  一个拿数据 一个计算
    //val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")

    // 工夫距离
    val ssc = new StreamingContext(conf,Seconds(1))

    // 接收数据 解决

    //socket  demo
    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)


    val words: DStream[String] = value.flatMap(_.split(" "))

    val wordsTuple: DStream[(String, Int)] = words.map((_, 1))

    val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _)

    // 触发 action
    wordcount.print()

    ssc.start()

    // 放弃流的运行  期待程序被终止
    ssc.awaitTermination()}

}

测试

下载一个 win10 用的 netcat

https://eternallybored.org/mi…

下载 netcat 1.12

解压 在目录下启动 cmd

输出

nc  -L -p 9999

开始输出单词 在 idea 中验证接管

原理

初始化 StreamingContext

为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创立进去,它是所有的 Spark Streaming 性能的主入口点。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName 参数是展现在集群 UI 界面上的应用程序的名称

master 是 local 或者 spark 集群的 url(mesos yarn)

本地测试能够用local[*] 留神要多于两个线程

Second(1)定义的是 batch interval 批处理距离 就是距离多久去拿一次数据

在定义一个 context 之后, 您必须执行以下操作。

  1. 通过创立输出 DStreams 来定义输出源。
  2. 通过利用转换和输入操作 DStreams 定义流计算(streaming computations)。
  3. 开始接管输出并且应用 streamingContext.start() 来解决数据。
  4. 应用 streamingContext.awaitTermination() 期待解决被终止(手动或者因为任何谬误)。
  5. 应用 streamingContext.stop() 来手动的进行解决。

须要记住的几点:

  • 一旦一个 context 曾经启动,将不会有新的数据流的计算能够被创立或者增加到它。
  • 一旦一个 context 曾经进行,它不会被重新启动。
  • 同一时间外在 JVM 中只有一个 StreamingContext 能够被激活。
  • 在 StreamingContext 上的 stop() 同样也进行了 SparkContext。为了只进行 StreamingContext,设置 stop() 的可选参数,名叫 stopSparkContext 为 false。
  • 一个 SparkContext 就能够被重用以创立多个 StreamingContexts,只有前一个 StreamingContext 在下一个 StreamingContext 被创立之前进行(不进行 SparkContext)。
Discretized Stream or DStream

Discretized Stream or DStream 是 Spark Streaming 提供的根本形象。它代表了一个间断的数据流。可能是数据源接管的流,也可能是转换后的流。

DStream 就是多个和工夫相干的一系列间断 RDD 的汇合,比方本例就是距离一秒的一堆 RDD 的汇合

DStream 也是有依赖关系的

flatMap 操作也是间接作用在 DStream 上的,就和作用于 RDD 一样 这样很好了解

咱们先来看数据源接管的流 这种叫做 Input DStreams 他会通过 Receivers 接收器去不同的数据源接收数据。

Spark Streaming 内置了两种数据源:

  • 根底的数据源:比方方才用的 socket 接管 还有 file systems
  • 高级的数据源:比方 kafka 还有 flume kinesis 等等

留神本地运行时,不要用 local 或者 local[1], 一个线程不够。放到集群上时调配给 SparkStreaming 的核数必须大于接收器的数量,留一个核去解决数据。

咱们也能够自定义数据源,那咱们就须要本人开发一个接收器。

Transformations

在咱们接管到 Dstreams 之后能够进行转换操作,常见转换如下:

Transformation(转换) Meaning(含意)
map(func) 利用函数 func 解决原 DStream 的每个元素,返回一个新的 DStream。
flatMap(func) 与 map 类似,然而每个输出项可用被映射为 0 个或者多个输入项。。
filter(func) 返回一个新的 DStream,它仅仅蕴含原 DStream 中函数 func 返回值为 true 的项。
repartition(numPartitions) 通过创立更多或者更少的 partition 以扭转这个 DStream 的并行级别(level of parallelism)。
union(otherStream) 返回一个新的 DStream,它蕴含源 DStream 和 otherDStream 的所有元素。
count() 通过 count 源 DStream 中每个 RDD 的元素数量,返回一个蕴含单元素(single-element)RDDs 的新 DStream。
reduce(func) 利用函数 func 汇集源 DStream 中每个 RDD 的元素,返回一个蕴含单元素(single-element)RDDs 的新 DStream。函数应该是相关联的,以使计算能够并行化。
countByValue() 在元素类型为 K 的 DStream 上,返回一个(K,long)pair 的新的 DStream,每个 key 的值是在原 DStream 的每个 RDD 中的次数。
reduceByKey(func, [_numTasks_]) 当在一个由 (K,V) pairs 组成的 DStream 上调用这个算子时,返回一个新的,由 (K,V) pairs 组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚合起来。留神:在默认状况下,这个算子利用了 Spark 默认的并发工作数去分组。你能够用 numTasks 参数设置不同的工作数。
join(otherStream, [_numTasks_]) 当利用于两个 DStream(一个蕴含(K,V)对,一个蕴含 (K,W) 对),返回一个蕴含 (K, (V, W)) 对的新 DStream。
cogroup(otherStream, [_numTasks_]) 当利用于两个 DStream(一个蕴含(K,V)对,一个蕴含 (K,W) 对),返回一个蕴含 (K, Seq[V], Seq[W]) 的 tuples(元组)。
transform(func) 通过对源 DStream 的每个 RDD 利用 RDD-to-RDD 函数,创立一个新的 DStream。这个能够在 DStream 中的任何 RDD 操作中应用。
updateStateByKey(func) 返回一个新的 “ 状态 ” 的 DStream,其中每个 key 的状态通过在 key 的先前状态利用给定的函数和 key 的新 valyes 来更新。这能够用于保护每个 key 的任意状态数据。

这里咱们特地介绍一下 updateStateByKey

咱们如果须要对历史数据进行统计,可能须要去 kafka 里拿一下之前留存的数据,也能够用 updateStateByKey 这个办法。

// 保留状态  聚合雷同的单词
    val  wordcount = wordsTuple.updateStateByKey[Int](
      //updateFunction _
      (newValues: Seq[Int], runningCount: Option[Int])=> {val newCount = Some(newValues.sum + runningCount.getOrElse(0))
        newCount
      }
    )

比方方才的单词计数,咱们只能统计每一次发过来的音讯,然而如果心愿统计屡次音讯就须要用到这个,咱们要指定一个 checkpoint,就是从哪开始算。

// 减少成员变量
val checkpointDir = "./ckp"

// 在办法中退出 checkpoint
ssc.checkpoint(checkpointDir)
    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    value.checkpoint(Seconds(4))// 官网倡议批次工夫的 1 - 5 倍

这时候咱们建设 StreamingContext 的办法就要扭转了 咱们把方才的创立过程提取成办法。

def creatingFunc():StreamingContext = {val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))

    ssc.checkpoint(checkpointDir)


    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    value.checkpoint(Seconds(4))// 官网倡议批次工夫的 1 - 5 倍

    val words: DStream[String] = value.flatMap(_.split(" "))

    val wordsTuple: DStream[(String, Int)] = words.map((_, 1))


    // 保留状态  聚合雷同的单词
    val  wordcount = wordsTuple.updateStateByKey[Int](
      //updateFunction _
      (newValues: Seq[Int], runningCount: Option[Int])=> {val newCount = Some(newValues.sum + runningCount.getOrElse(0))
        newCount
      }
    )

    // 触发 action
    wordcount.print()
    ssc
  }

在 mian 函数中批改为:

def main(args: Array[String]): Unit = {val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _)
      ssc.start()
      // 放弃流的运行  期待程序被终止
      ssc.awaitTermination()}

这样就是,如果有 checkpoint,程序会在 checkpoint 中把程序加载回来(程序被保留为二进制),没有 checkpoint 的话才会创立。

将目录下的 checkpoint 删除,就能够将状态删除。

生产中 updateStateByKey 因为会将数据备份要谨慎应用,能够思考用 hbase,redis 等做代替。或者借助 kafka 做聚合解决。

// 如果不必 updatestateByKey  能够思考 redis
    wordsTuple.foreachRDD(rdd => {
      rdd.foreachPartition(i =>
        {//redis}
      )
    })
窗口操作

Spark Streaming 也反对 _windowed computations(窗口计算),它容许你在数据的一个滑动窗口上利用 transformation(转换)。

如上图显示,窗口在源 DStream 上 _slides(滑动),任何一个窗口操作都须要指定两个参数:

  • window length(窗口长度) – 窗口的持续时间。
  • sliding interval(滑动距离) – 执行窗口操作的距离。

比方计算过来 30 秒的词频:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些罕用的窗口操作如下所示,这些操作都须要用到上文提到的两个参数 – windowLength(窗口长度) 和 _slideInterval(滑动的工夫距离)_。

Transformation(转换) Meaning(含意)
window(windowLength, slideInterval) 返回一个新的 DStream,它是基于 source DStream 的窗口 batch 进行计算的。
countByWindow(windowLength, slideInterval) 返回 stream(流)中滑动窗口元素的数
reduceByWindow(func, windowLength, slideInterval) 返回一个新的单元素 stream(流),它通过在一个滑动距离的 stream 中应用 func 来聚合以创立。该函数应该是 associative(关联的)且 commutative(可替换的),以便它能够并行计算
reduceByKeyAndWindow(func, windowLength, slideInterval, [_numTasks_]) 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, V) pairs 的 Stream,其中的每个 key 的 values 是在滑动窗口上的 batch 应用给定的函数 func 来聚合产生的。Note(留神): 默认状况下,该操作应用 Spark 的默认并行任务数量(local model 是 2,在 cluster mode 中的数量通过 spark.default.parallelism 来确定)来做 grouping。您能够通过一个可选的 numTasks 参数来设置一个不同的 tasks(工作)数量。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [_numTasks_]) 上述 reduceByKeyAndWindow() 的更无效的一个版本,其中应用前一窗口的 reduce 值逐步计算每个窗口的 reduce 值。这是通过缩小进入滑动窗口的新数据,以及“inverse reducing(逆减)”来到窗口的旧数据来实现的。一个例子是当窗口滑动时”增加”和“减”keys 的数量。然而,它仅实用于“invertible reduce functions(可逆缩小函数)”,即具备相应“inverse reduce(反向缩小)”函数的 reduce 函数(作为参数 invFunc </ i>)。像在 reduceByKeyAndWindow 中的那样,reduce 工作的数量能够通过可选参数进行配置。请留神,针对该操作的应用必须启用 checkpointing.
countByValueAndWindow(windowLength, slideInterval, [_numTasks_]) 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, Long) pairs 的 DStream,其中每个 key 的 value 是它在一个滑动窗口之内的频次。像 code>reduceByKeyAndWindow 中的那样,reduce 工作的数量能够通过可选参数进行配置。
Join 操作

在 Spark Streaming 中能够执行不同类型的 join

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
// 也能够用窗口
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
DStreams 输入操作

输入操作容许将 DStream 的数据推送到内部零碎,如数据库或文件系统。

会触发所有变换的执行,相似 RDD 的 action 操作。有如下操作:

Output Operation Meaning
print() 在运行流应用程序的 driver 节点上的 DStream 中打印每批数据的前十个元素。这对于开发和调试很有用。
Python API 这在 Python API 中称为 pprint()
saveAsTextFiles(prefix, [_suffix_]) 将此 DStream 的内容另存为文本文件。每个批处理距离的文件名是依据 前缀 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。
saveAsObjectFiles(prefix, [_suffix_]) 将此 DStream 的内容另存为序列化 Java 对象的 SequenceFiles。每个批处理距离的文件名是依据 前缀 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。
Python API 这在 Python API 中是不可用的。
saveAsHadoopFiles(prefix, [_suffix_]) 将此 DStream 的内容另存为 Hadoop 文件。每个批处理距离的文件名是依据 前缀 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。
Python API 这在 Python API 中是不可用的。
foreachRDD(func) 对从流中生成的每个 RDD 利用函数 func 的最通用的输入运算符。此性能应将每个 RDD 中的数据推送到内部零碎,例如将 RDD 保留到文件,或将其通过网络写入数据库。请留神,函数 func 在运行流应用程序的 driver 过程中执行,通常会在其中具备 RDD 动作,这将强制流式传输 RDD 的计算。
foreachRDD 设计模式应用

dstream.foreachRDD 容许将数据发送到内部零碎。

但咱们不要每次都创立一个连贯,解决方案如下:

缩小开销,分区摊派开销

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()}
}

更好的做法是用动态资源池:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

连贯 Kafka

Apache Kafka 是一个高性能的音讯零碎,由 Scala 写成。是由 Apache 软件基金会开发的一个开源音讯零碎我的项目。

Kafka 最后是由 LinkedIn 开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目标指标是为解决实时数据提供一个对立、高通量、低期待(低延时)的平台。

更多 kafka 相干请查看 Kafka 入门宝典(具体截图版)

Spark Streaming 2.4.4 兼容 kafka 0.10.0 或者更高的版本

Spark Streaming 在 2.3.0 版本之前是提供了对 kafka 0.8 和 0.10 的反对的,不过在 2.3.0 当前对 0.8 的反对勾销了。

Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
API Maturity Deprecated Stable
Language Support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit API No Yes
Dynamic Topic Subscription No Yes
Receiver

这里简略介绍一下对 kafka0.8 的一种反对形式:基于 Receiver

依赖:

groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.12
 version = 2.4.4
import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

这种状况 程序停掉数据会失落,为了不失落本人又写了一份,这种是很多余的。

因为采纳了 kafka 高阶 api,偏移量 offset 不可控。

Direct

Kafka 0.10.0 版本当前, 采纳了更好的一种 Direct 形式,这种咱们须要本人保护偏移量 offset。

直连形式 并行度会更高 生产环境用的最多,0.8 版本须要在 zk 或者 redis 等中央本人保护偏移量。咱们应用 0.10 以上版本反对本人设置偏移量,咱们只须要本人将偏移量写回 kafka 就能够。

依赖

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4

kafka 0.10 当前 能够将 offset 写回 kafka 咱们不须要本人保护 offset 了,具体代码如下:

val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(2))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      //latest  none   earliest
      "auto.offset.reset" -> "earliest",
      // 主动提交偏移量 false
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //val topics = Array("topicA", "topicB")
    val topics = Array("test_topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      // 与 kafka broker 不在一个节点上  用不同策略
      // 在一个节点用 PreferBrokers 策略  很少见
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      // 一般的 RDD 不能强转 HasOffsetRanges   但 kafkaRDD 有 with 这个个性 能够强转
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // 解决数据 计算逻辑
      rdd.foreachPartition { iter =>
        // 一次解决一个分区的数据  获取这个分区的偏移量
        // 计算完当前批改偏移量  要开启事务 相似数据库 connection -> conn.setAutoCommit(false) 各种操作  conn.commit(); conn.rollback()
        // 获取偏移量  如果要本人记录的话这个
        //val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        // 解决数据
         iter.foreach(println)
      }
      //kafka 0.10 新个性  解决完数据后  将偏移量写回 kafka
      // some time later, after outputs have completed
      //kafka 有一个非凡的 topic  保留偏移量
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })

更多 Flink,Kafka,Spark 等相干技术博文,科技资讯,欢送关注实时流式计算 公众号后盾回复“电子书”下载 300 页 Flink 实战电子书

退出移动版