关于spark:第四篇Spark-Streaming编程指南1

5次阅读

共计 17056 个字符,预计需要花费 43 分钟才能阅读完成。

Spark Streaming 是构建在 Spark Core 根底之上的流解决框架,是 Spark 十分重要的组成部分。Spark Streaming 于 2013 年 2 月在 Spark0.7.0 版本中引入,倒退至今曾经成为了在企业中宽泛应用的流解决平台。在 2016 年 7 月,Spark2.0 版本中引入了 Structured Streaming,并在 Spark2.2 版本中达到了生产级别,Structured Streaming 是构建在 Spark SQL 之上的流解决引擎,用户能够应用 DataSet/DataFreame API 进行流解决,目前 Structured Streaming 在不同的版本中倒退速度很快。值得注意的是,本文不会对 Structured Streaming 做过多解说,次要针对 Spark Streaming 进行探讨,包含以下内容:

  • Spark Streaming 介绍
  • Transformations 与 Output Operations
  • Spark Streaming 数据源(Sources)
  • Spark Streaming 数据汇(Sinks)

Spark Streaming 介绍

什么是 DStream

Spark Streaming 是构建在 Spark Core 的 RDD 根底之上的,与此同时 Spark Streaming 引入了一个新的概念:DStream(Discretized Stream,离散化数据流),示意连续不断的数据流。DStream 形象是 Spark Streaming 的流解决模型,在外部实现上,Spark Streaming 会对输出数据依照工夫距离(如 1 秒)分段,每一段数据转换为 Spark 中的 RDD,这些分段就是 Dstream,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。如下图所示:

如上图,这些底层的 RDD 转换操作是由 Spark 引擎来实现的,DStream 的操作屏蔽了许多底层的细节,为用户提供了比拟方便使用的高级 API。

计算模型

在 Flink 中,批处理是流解决的特例,所以 Flink 是人造的流解决引擎。而 Spark Streaming 则不然,Spark Streaming 认为流解决是批处理的特例,即 Spark Streaming 并不是纯实时的流解决引擎,在其外部应用的是 microBatch 模型,行将流解决看做是在较小工夫距离内 (batch interval) 的一些列的批处理。对于工夫距离的设定,须要联合具体的业务提早需要,能够实现秒级或者分钟级的距离。

Spark Streaming 会将每个短时间距离内接管的数据存储在集群中,而后对其作用一系列的算子操作(map,reduce, groupBy 等)。执行过程见下图:

如上图:Spark Streaming 会将输出的数据流宰割成一个个小的 batch,每一个 batch 都代表这一些列的 RDD,而后将这些 batch 存储在内存中。通过启动 Spark 作业来解决这些 batch 数据,从而实现一个流解决利用。

Spark Streaming 的工作机制

概览

  • 在 Spark Streaming 中,会有一个组件 Receiver,作为一个长期运行的 task 跑在一个 Executor 上
  • 每个 Receiver 都会负责一个 input DStream(比方从文件中读取数据的文件流,比方套接字流,或者从 Kafka 中读取的一个输出流等等)
  • Spark Streaming 通过 input DStream 与内部数据源进行连贯,读取相干数据

执行细节

  • 1. 启动 StreamingContext
  • 2.StreamingContext 启动 receiver,该 receiver 会始终运行在 Executor 的 task 中。用于连续不断地接收数据源,有两种次要的 reciver,一种是牢靠的 reciver,当数据被接管并且存储到 spark,发送回执确认,另一种是不牢靠的 reciver,对于数据源不发送回执确认。接管的数据会被缓存到 work 节点内存中,也会被复制到其余 executor 的所在的节点内存中,用于容错解决。
  • 3.Streaming context 周期触发 job(依据 batch-interval 工夫距离)进行数据处理。
  • 4. 将数据输入。

Spark Streaming 编程步骤

通过下面的剖析,对 Spark Streaming 有了初步的意识。那么该如何编写一个 Spark Streaming 应用程序呢?一个 Spark Streaming 个别包含一下几个步骤:

  • 1. 创立StreamingContext
  • 2. 创立输出 DStream 来定义输出源
  • 3. 通过对 DStream 利用转换操作和输入操作来定义解决逻辑
  • 4. 用 streamingContext.start()来开始接收数据和解决流程
  • 5.streamingContext.awaitTermination()办法来期待解决完结
  object StartSparkStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf()
        .setMaster("local[2]")
        .setAppName("Streaming")
      // 1. 创立 StreamingContext
      val ssc = new StreamingContext(conf, Seconds(5))
      Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
      Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
      // 2. 创立 DStream
      val lines = ssc.socketTextStream("localhost", 9999)
      // 3. 定义流计算解决逻辑
      val count = lines.flatMap(_.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
      // 4. 输入后果
      count.print()
      // 5. 启动
      ssc.start()
      // 6. 期待执行
      ssc.awaitTermination()}
  }

Transformations 与 Output Operations

DStream 是不可变的,这意味着不能间接扭转它们的内容,而是通过对 DStream 进行一系列转换 (Transformation) 来实现预期的利用程序逻辑。每次转换都会创立一个新的 DStream,该 DStream 示意来自父 DStream 的转换后的数据。DStream 转换是惰性 (lazy) 的,这象征只有执行 output 操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation

Transformations

Spark Streaming 提供了丰盛的 transformation 操作,这些 transformation 又分为了 有状态的 transformation 无状态的 transformation。除此之外,Spark Streaming 也提供了一些 window 操作,值得注意的是 window 操作也是有状态的。具体细节如下:

无状态的 transformation

无状态的 transformation 是指每一个 micro-batch 的解决是互相独立的,即以后的计算结果不受之前计算结果的影响,Spark Streaming 的大部分算子都是无状态的,比方常见的 map(),flatMap(),reduceByKey()等等。

  • map(func)

对源 DStream 的每个元素,采纳 func 函数进行转换,失去一个新的 Dstream

    /** Return a new DStream by applying a function to all elements of this DStream. */
    def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {new MappedDStream(this, context.sparkContext.clean(mapFunc))
    }
  • flatMap(func)

与 map 类似,然而每个输出项可用被映射为 0 个或者多个输入项

  /**
   * Return a new DStream by applying a function to all elements of this DStream,
   * and then flattening the results
   */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }
  • filter(func)

返回一个新的 DStream,仅蕴含源 DStream 中满足函数 func 的项

  /** Return a new DStream containing only the elements that satisfy a predicate. */
  def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {new FilteredDStream(this, context.sparkContext.clean(filterFunc))
  }
  • repartition(numPartitions)

通过创立更多或者更少的分区扭转 DStream 的并行水平

/**
   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
   * returned DStream has exactly numPartitions partitions.
   */
  def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))
  }
  • reduce(func)

利用函数 func 汇集源 DStream 中每个 RDD 的元素,返回一个蕴含单元素 RDDs 的新 DStream

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
   * of this DStream.
   */
  def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
  }
  • count()

统计源 DStream 中每个 RDD 的元素数量

/**
   * Return a new DStream in which each RDD has a single element generated by counting each RDD
   * of this DStream.
   */
  def count(): DStream[Long] = ssc.withScope {this.map(_ => (null, 1L))
        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
        .reduceByKey(_ + _)
        .map(_._2)
  }
  • union(otherStream)

返回一个新的 DStream,蕴含源 DStream 和其余 DStream 的元素

/**
   * Return a new DStream by unifying data of another DStream with this DStream.
   * @param that Another DStream having the same slideDuration as this DStream.
   */
  def union(that: DStream[T]): DStream[T] = ssc.withScope {new UnionDStream[T](Array(this, that))
  }
  • countByValue()

利用于元素类型为 K 的 DStream 上,返回一个(K,V)键值对类型的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的呈现次数, 比方lines.flatMap(_.split(" ")).countByValue().print(), 对于输出:spark spark flink, 将输入:(spark,2),(flink,1), 即依照元素值进行分组,而后统计每个分组的元素个数。

从源码能够看出:底层实现为 map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按以后的元素映射为一个 tuple,其中 key 即为以后元素的值,而后再依照 key 做汇总。

/**
   * Return a new DStream in which each RDD contains the counts of each distinct value in
   * each RDD of this DStream. Hash partitioning is used to generate
   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
   * `numPartitions` not specified).
   */
  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
  }
  • reduceByKey(func, [numTasks])

当在一个由 (K,V) 键值对组成的 DStream 上执行该操作时,返回一个新的由 (K,V) 键值对组成的 DStream,每一个 key 的值均由给定的 recuce 函数(func)汇集起来

比方:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()

对于输出:spark spark flink,将输入:(spark,2),(flink,1)

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the associative and commutative reduce function. Hash partitioning is used to
   * generate the RDDs with Spark's default number of partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {reduceByKey(reduceFunc, defaultPartitioner())
  }
  • join(otherStream, [numTasks])

当利用于两个 DStream(一个蕴含(K,V)键值对, 一个蕴含 (K,W) 键值对),返回一个蕴含 (K, (V, W)) 键值对的新 Dstream

  /**
   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   */
  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {join[W](other, defaultPartitioner())
  }
  • cogroup(otherStream, [numTasks])

当利用于两个 DStream(一个蕴含(K,V)键值对, 一个蕴含 (K,W) 键值对),返回一个蕴含 (K, Seq[V], Seq[W]) 的元组

// 输出:spark
// 输入:(spark,(CompactBuffer(1),CompactBuffer(1)))
val DS1 = lines.flatMap(_.split(" ")).map((_,1))
val DS2 = lines.flatMap(_.split(" ")).map((_,1))
DS1.cogroup(DS2).print()
  /**
   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with Spark's default number
   * of partitions.
   */
  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {cogroup(other, defaultPartitioner())
  }
  • transform(func)

通过对源 DStream 的每个 RDD 利用 RDD-to-RDD 函数,创立一个新的 DStream。反对在新的 DStream 中做任何 RDD 操作

// 输出:spark spark flink
// 输入:(spark,2)、(flink,1)
val lines = ssc.socketTextStream("localhost", 9999)
val resultDStream = lines.transform(rdd => {rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
})
resultDStream.print()
  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {val cleanedF = context.sparkContext.clean(transformFunc, false)
    transform((r: RDD[T], _: Time) => cleanedF(r))
  }

有状态的 transformation

有状态的 transformation 是指每个 micro-batch 的解决不是互相独立的,即以后的 micro-batch 解决依赖于之前的 micro-batch 计算结果。常见的有状态的 transformation 次要有 countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey 等等。其实所有的基于 window 的操作都是有状态的,因为追踪整个窗口内的数据。

对于有状态的 transformation 和 Window Operations,参见下文。

Output Operations

应用 Output operations 能够将 DStream 写入多内部存储设备或打印到控制台。上文提到,Spark Streaming 的 transformation 是 lazy 的,因而须要 Output Operation 进行触发计算,其性能相似于 RDD 的 action 操作。具体详见下文 Spark Streaming 数据汇(Sinks)。

Spark Streaming 数据源

Spark Streaming 的目标是成为一个通用的流解决框架,为了实现这一指标,Spark Streaming 应用 Receiver 来集成各种各样的数据源。然而,对于有些数据源 (如 kafka),Spark Streaming 反对应用Direct 的形式去接收数据,这种形式比 Receiver 形式性能要好。

基于 Receiver 的形式

Receiver 的作用是从数据源收集数据,而后将数据传送给 Spark Streaming。基本原理是:随着数据的一直到来,在绝对应的 batch interval 工夫距离内,这些数据会被收集并且打包成 block,只有等到 batch interval 工夫实现了,收集的数据 block 会被发送给 spark 进行解决。

如上图:当 Spark Streaming 启动时,receiver 开始收集数据。在 t0 的 batch interval 完结时 (即收集完了该时间段内的数据),收集到的 block #0 会被发送到 Spark 进行解决。在 t2 时刻,Spark 会解决 t1 的 batch interval 的数据 block,与此同时会不停地收集 t2 的 batch interval 对应的 block#2

常见的基于 Receiver 的数据源包含:Kafka, Kinesis, Flume,Twitter。除此之外,用户也能够通过继承 Receiver抽象类,实现 onStart()onStop()两个办法,进行自定义 Receiver。本文不会对基于 Receiver 的数据源做过多探讨,次要针对基于 Direct 的 Kafka 数据源进行具体解释。

基于 Direct 的形式

Spark 1.3 中引入了这种新的无 Receiver 的 Direct 办法,以确保更强的端到端保障。该办法不是应用 Receiver 来接收数据,而是定期查问 Kafka 每个 topic+partition 中的最新偏移量,并相应地定义要在每个批次中解决的偏移量范畴。启动用于解决数据的作业时,Kafka 的简略 consumer API 用于读取 Kafka 定义的偏移量范畴(相似于从文件系统读取文件)。请留神,此性能是在 Scala 和 Java API 的 Spark 1.3 引入的,在 Python API 的 Spark 1.4 中引入的。

基于 Direct 的形式具备以下长处:

  • 简化并行读取

如果要读取多个 partition,不须要创立多个输出 DStream 而后对他们进行 union 操作。Spark 会创立跟 Kafka partition 一样多的 RDD partition,并且会并行从 kafka 中读取数据。所以在 kafka partition 和 RDD partition 之间,有一一对应的关系。

  • 高性能

如果要保证数据零失落,在基于 Receiver 的形式中,须要开启 WAL 机制。这种形式其实效率很低,因为数据理论被复制了两份,kafka 本人自身就有高牢靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 Direct 的形式,不依赖于 Receiver,不须要开启 WAL 机制,只有 kafka 中做了数据的复制,那么就能够通过 kafka 的正本进行复原。

  • Exactly-once 语义

基于 Receiver 的形式,应用 kafka 的高阶 API 来在 Zookeeper 中保留生产过的 offset。这是生产 kafka 数据的传统形式。这种形式配合 WAL 机制,能够保证数据零失落的高可靠性,然而却无奈保障 Exactly-once 语义(Spark 和 Zookeeper 之间可能是不同步的)。基于 Direct 的形式,应用 kafka 的简略 API,Spark Streaming 本人就负责追踪生产的 offset,并保留在 checkpoint 中。Spark 本人肯定是同步的,因而能够保证数据时生产一次且仅生产一次。

Spark Streaming 集成 kafka

应用形式

应用 KafkaUtils 增加 Kafka 数据源,源码如下:

  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]
    ): InputDStream[ConsumerRecord[K, V]] = {val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
    createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
  }

具体参数解释:

  • K:Kafka 音讯 key 的类型
  • V:Kafka 音讯 value 的类型
  • ssc:StreamingContext
  • locationStrategy: LocationStrategy,依据 Executor 中的主题的分区来调度 consumer,即尽可能地让 consumer 凑近 leader partition。该配置能够晋升性能,但对于 location 的抉择只是一种参考,并不是相对的。能够抉择如下形式:

    • PreferBrokers:Spark 和 Kafka 运行在同一个节点上,能够应用此种形式
    • PreferConsistent:大部分状况应用此形式,它将统一地在所有 Executor 之间调配分区
    • PreferFixed:将特定的主题分区搁置到特定的主机上,在数据负载不平衡时应用

留神:少数状况下应用 PreferConsisten,其余两种形式只是在特定的场景应用。这种配置只是一种参考,具体的状况还是会依据集群的资源主动调整。

  • consumerStrategy:生产策略,次要有上面三种形式:

    • Subscribe:订阅指定主题名称的主题汇合
    • SubscribePattern:通过正则匹配,订阅相匹配的主题数据
    • Assign:订阅一个主题 + 分区的汇合

留神:大多数状况下应用 Subscribe 形式。

应用案例

object TolerateWCTest {def createContext(checkpointDirectory: String): StreamingContext = {val sparkConf = new SparkConf()
      .set("spark.streaming.backpressure.enabled", "true")
      // 每秒钟从 kafka 分区中读取的 records 数量, 默认 not set
      .set("spark.streaming.kafka.maxRatePerPartition", "1000") //
      //Driver 为了获取每个 leader 分区的最近 offsets,间断进行重试的次数,// 默认是 1,示意最多重试 2 次,仅仅实用于 new Kafka direct stream API
      .set("spark.streaming.kafka.maxRetries", "2")
      .setAppName("TolerateWCTest")

    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint(checkpointDirectory)
    val topic = Array("testkafkasource2")
    val kafkaParam = Map[String, Object](
      "bootstrap.servers" -> "kms-1:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group0",
      "auto.offset.reset" -> "latest", // 默认 latest,"enable.auto.commit" -> (false: java.lang.Boolean)) // 默认 true,false: 手动提交

    val lines = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParam))

    val words = lines.flatMap(_.value().split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.reduceByKey(_ + _)

    stateDstream.cache()
    // 参照 batch interval 设置,// 不得低于 batch interval,否则会报错,// 设为 batch interval 的 2 倍
    stateDstream.checkpoint(Seconds(6))

    // 把 DStream 保留到 MySQL 数据库中
    stateDstream.foreachRDD(rdd =>
      rdd.foreachPartition { record =>
        var conn: Connection = null
        var stmt: PreparedStatement = null
        // 给每个 partition,获取一个连贯
        conn = ConnectionPool.getConnection
        // 遍历 partition 中的数据,应用一个连贯,插入数据库

        while (record.hasNext) {val wordcounts = record.next()
          val sql = "insert into wctbl(word,count) values (?,?)"
          stmt = conn.prepareStatement(sql);
          stmt.setString(1, wordcounts._1.trim)
          stmt.setInt(2, wordcounts._2.toInt)
          stmt.executeUpdate()}
        // 用完当前,将连贯还回去
        ConnectionPool.returnConnection(conn)
      })
    ssc
  }

  def main(args: Array[String]) {

    val checkpointDirectory = "hdfs://kms-1:8020/docheckpoint"

    val ssc = StreamingContext.getOrCreate(
      checkpointDirectory,
      () => createContext(checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()}
}

Spark Streaming 数据汇(Sinks)

Output Operation 介绍

Spark Streaming 提供了上面内置的 Output Operation,如下:

  • print()

打印数据数据到规范输入,如果不传递参数,默认打印前 10 个元素

  • saveAsTextFiles(prefix, [suffix])

将 DStream 内容存储到文件系统,每个 batch interval 的文件名称为 `prefix-TIME_IN_MS[.suffix]

  • saveAsObjectFiles(prefix, [suffix])

将 DStream 的内容保留为序列化的 java 对象的 SequenceFile,每个 batch interval 的文件名称为prefix-TIME_IN_MS[.suffix],Python API 不反对此办法。

  • saveAsHadoopFiles(prefix, [suffix])

将 DStream 内容保留为 Hadoop 文件,每个 batch interval 的文件名称为prefix-TIME_IN_MS[.suffix],Python API 不反对此办法。

  • foreachRDD(func)

通用的数据输入算子,func 函数将每个 RDD 的数据输入到内部存储设备,比方将 RDD 写入到文件或者数据库。

 /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {val cleanedF = context.sparkContext.clean(foreachFunc, false)
    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    foreachRDD(foreachFunc, displayInnerRDDOps = true)
  }

  private def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}

foreachRDD是一个十分重要的操作,用户能够应用它将解决的数据输入到内部存储设备。对于 foreachRDD 的应用,须要特点别留神一些细节问题。具体分析如下:

如果将数据写入到 MySQL,须要获取连贯 Connection。用户可能不经意的在 Spark Driver 中创立一个连贯对象,而后在 Work 中应用它将数据写入外部设备,代码如下:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // ①留神:该段代码在 driver 上执行
  rdd.foreach { record =>
    connection.send(record) // ②留神:该段代码在 worker 上执行
  }
}

尖叫提醒:下面的应用形式是谬误的,因为须要将 connection 对象进行序列化,而后发送到 driver 节点,而这种 connection 对象是不能被序列化,所以不能跨节点传输。下面代码会报序列化谬误,正确的应用形式是在 worker 节点创立 connection,即在 rdd.foreach 外部创立 connection。形式如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()}
}

下面的形式解决了不能序列化的问题,然而会为每个 RDD 的 record 创立一个 connection,通常创立一个 connection 对象是会存在肯定性能开销的,所以频繁创立和销毁 connection 对象会造成整体的吞吐量升高。一个比拟好的做法是将 rdd.foreach 替换为`rdd.foreachPartition `, 这样就不必频繁为每个 record 创立 connection,而是为 RDD 的 partition 创立 connection,大大减少了创立 connection 带来的开销。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()}
}

其实下面的应用形式还能够进一步优化,能够通过在多个 RDD 或者批数据间重用连贯对象。用户能够保护一个动态的连贯对象池,重复使用池中的对象将多批次的 RDD 推送到内部零碎,以进一步节俭开销:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  
  }
}

应用案例

  • 模仿数据库连接池
/**
 * 简易版的连接池
 */
public class ConnectionPool {

    // 动态的 Connection 队列
    private static LinkedList<Connection> connectionQueue;

    /**
     * 加载驱动
     */
    static {
        try {Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {e.printStackTrace();
        }
    }

    /**
     * 获取连贯,多线程拜访并发管制
     *
     * @return
     */
    public synchronized static Connection getConnection() {
        try {if (connectionQueue == null) {connectionQueue = new LinkedList<Connection>();
                for (int i = 0; i < 10; i++) {
                    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/wordcount", "root",
                            "123qwe");
                    connectionQueue.push(conn);
                }
            }
        } catch (Exception e) {e.printStackTrace();
        }
        return connectionQueue.poll();}

    /**
     * 用完之后,返回一个连贯
     */
    public static void returnConnection(Connection conn) {connectionQueue.push(conn);
    }

}
  • 实时统计写入 MySQL
object WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    // 存储到 MySQL
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partition =>
        var conn: Connection = null
        var stmt: PreparedStatement = null
        // 给每个 partition,获取一个连贯
        conn = ConnectionPool.getConnection
        // 遍历 partition 中的数据,应用一个连贯,插入数据库
        while (partition.hasNext) {val wordcounts = partition.next()
          val sql = "insert into wctbl(word,count) values (?,?)"
          stmt = conn.prepareStatement(sql);
          stmt.setString(1, wordcounts._1.trim)
          stmt.setInt(2, wordcounts._2.toInt)
          stmt.executeUpdate()}
        // 用完当前,将连贯还回去
        ConnectionPool.returnConnection(conn)
      }
    }
    ssc.start()
    ssc.awaitTermination()}
}

总结

因为篇幅限度,本文次要对 Spark Streaming 执行机制、Transformations 与 Output Operations、Spark Streaming 数据源 (Sources)、Spark Streaming 数据汇(Sinks) 进行了探讨。下一篇将分享 基于工夫的窗口操作 有状态的计算 检查点 Checkpoint性能调优 等内容。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

正文完
 0