第四篇 |Spark-Streaming 编程指南 (1) 对 Spark Streaming 执行机制、Transformations 与 Output Operations、Spark Streaming 数据源 (Sources)、Spark Streaming 数据汇(Sinks) 进行了探讨。本文将连续上篇内容,次要包含以下内容:
- 有状态的计算
- 基于工夫的窗口操作
- 长久化
- 检查点 Checkpoint
- 应用 DataFrames & SQL 解决流数据
有状态的计算
updateStateByKey
上一篇文章中介绍了常见的无状态的转换操作,比方在 WordCount 的例子中,输入的后果只与以后 batch interval 的数据无关,不会依赖于上一个 batch interval 的计算结果。spark Streaming 也提供了有状态的操作:updateStateByKey
,该算子会保护一个状态,同时进行信息更新。该操作会读取上一个 batch interval 的计算结果,而后将其后果作用到以后的 batch interval 数据统计中。其源码如下:
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {updateStateByKey(updateFunc, defaultPartitioner())
}
该算子只能在 key–value 对的 DStream 上应用,须要接管一个状态更新函数 updateFunc 作为参数。应用案例如下:
object StateWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(StateWordCount.getClass.getSimpleName)
val ssc = new StreamingContext(conf, Seconds(5))
// 必须开启 checkpoint, 否则会报错
ssc.checkpoint("file:///e:/checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
// 状态更新函数
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值
// 遍历以后数据,并更新状态
for (newValue <- newValues) {oldvalue += newValue}
// 返回最新的状态
Option(oldvalue)
}
val count = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.updateStateByKey(updateFunc)
count.print()
ssc.start()
ssc.awaitTermination()}
}
尖叫提醒:下面的代码必须要开启 checkpoint,否则会报错:
Exception in thread “main” java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()
updateStateByKey 毛病
运行下面的代码会发现一个景象:即使没有数据源输出,Spark 也会为新的 batch interval 更新状态,即如果没有数据源输出,则会一直地输入之前的计算状态后果。
updateStateByKey 能够在指定的批次距离内返回之前的全副历史数据,包含新增的,扭转的和没有扭转的。因为 updateStateByKey 在应用的时候肯定要做 checkpoint,当数据量过大的时候,checkpoint 会占据宏大的数据量,会影响性能,效率不高。
mapwithState
mapwithState 是 Spark 提供的另外一个有状态的算子,该操作克服了 updateStateByKey 的毛病,从 Spark 1.5 开始引入。源码如下:
def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
mapWithState 只返回发生变化的 key 的值,对于没有发生变化的 Key,则不返回。这样做能够只关怀那些曾经产生的变动的 key,对于没有数据输出,则不会返回那些没有变动的 key 的数据。这样的话,即便数据量很大,checkpint 也不会 updateBykey 那样,占用太多的存储,效率比拟高(生产环境中倡议应用)。
object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf()
.setAppName("StatefulNetworkWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("file:///e:/checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
/**
* word:以后 key 的值
* one:以后 key 对应的 value 值
* state:状态值
*/
val mappingFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
println(s">>> batchTime = $batchTime")
println(s">>> word = $word")
println(s">>> one = $one")
println(s">>> state = $state")
val output = (word, sum)
state.update(sum) // 更新以后 key 的状态值
Some(output) // 返回后果
}
// 通过 StateSpec.function 构建 StateSpec
val spec = StateSpec.function(mappingFunc)
val stateDstream = wordDstream.mapWithState(spec)
stateDstream.print()
ssc.start()
ssc.awaitTermination()}
}
基于工夫的窗口操作
Spark Streaming 提供了两种类型的窗口操作,别离是滚动窗口和滑动窗口。具体分析如下:
滚动窗口(Tumbling Windows)
滚动窗口的示意图如下:滚动窗口只须要传入一个固定的工夫距离,滚动窗口是不存在重叠的。
源码如下:
/**
* @param windowDuration: 窗口的长度; 必须是 batch interval 的整数倍.
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
滑动窗口(Sliding Windows)
滑动窗口的示意图如下:滑动窗口只须要传入两个参数,一个为窗口的长度,一个是滑动工夫距离。能够看出:滑动窗口是存在重叠的。
源码如下:
/**
* @param windowDuration 窗口长度; 必须是 batching interval 的整数倍
*
* @param slideDuration 滑动距离; 必须是 batching interval 的整数倍
*/
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {new WindowedDStream(this, windowDuration, slideDuration)
}
窗口操作
-
window(windowLength, slideInterval)
-
解释
基于源 DStream 产生的窗口化的批数据,计算失去一个新的 Dstream
-
源码
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {new WindowedDStream(this, windowDuration, slideDuration) }
-
-
countByWindow(windowLength, slideInterval)
- 解释
返回一个滑动窗口的元素个数
-
源码
/** * @param windowDuration window 长度,必须是 batch interval 的倍数 * @param slideDuration 滑动的工夫距离,必须是 batch interval 的倍数
*/
def countByWindow(
windowDuration: Duration,
slideDuration: Duration): DStream[Long] = ssc.withScope {this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
```
-
reduceByWindow(func, windowLength, slideInterval)
- 解释
返回一个单元素流。利用函数 func 汇集滑动工夫距离的流的元素创立这个单元素流。函数 func 必须满足结合律,从而能够反对并行计算
-
源码
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope {this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }
-
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
- 解释
利用到一个 (K,V) 键值对组成的 DStream 上时,会返回一个由 (K,V) 键值对组成的新的 DStream。每一个 key 的值均由给定的 reduce 函数 (func 函数) 进行聚合计算。留神:在默认状况下,这个算子利用了 Spark 默认的并发工作数去分组。能够通过 numTasks 参数的设置来指定不同的工作数
-
源码
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration ): DStream[(K, V)] = ssc.withScope {reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) }
-
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
- 解释
更加高效的 reduceByKeyAndWindow,每个窗口的 reduce 值,是基于先前窗口的 reduce 值进行增量计算失去的;它会对进入滑动窗口的新数据进行 reduce 操作,并对来到窗口的老数据进行
逆向 reduce
操作。然而,只能用于可逆 reduce 函数
,即那些 reduce 函数都有一个对应的逆向 reduce 函数
(以 InvFunc 参数传入)留神:必须开启 checkpointing-
源码
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner, filterFunc: ((K, V)) => Boolean ): DStream[(K, V)] = ssc.withScope {val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None new ReducedWindowedDStream[K, V]( self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, windowDuration, slideDuration, partitioner ) }
-
countByValueAndWindow(windowLength, slideInterval, [numTasks])
-
解释
当利用到一个 (K,V) 键值对组成的 DStream 上,返回一个由 (K,V) 键值对组成的新的 DStream。每个 key 的对应的 value 值都是它们在滑动窗口中呈现的频率
-
源码
def countByValueAndWindow( windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism) (implicit ord: Ordering[T] = null) : DStream[(T, Long)] = ssc.withScope {this.map((_, 1L)).reduceByKeyAndWindow((x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, windowDuration, slideDuration, numPartitions, (x: (T, Long)) => x._2 != 0L ) }
-
应用案例
val lines = ssc.socketTextStream("localhost", 9999)
val count = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.reduceByKeyAndWindow((w1: Int, w2: Int) => w1 + w2, Seconds(30), Seconds(10))
.print()
// 滚动窗口
/* lines.window(Seconds(20))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()*/
长久化
长久化是晋升 Spark 利用性能的一种形式,在第二篇 |Spark core 编程指南一文中解说了 RDD 长久化的应用形式。其实,DStream 也是反对长久化的,同样是应用 persist()与 cache()办法,长久化通常在有状态的算子中应用,比方窗口操作,默认状况下,尽管没有显性地调用长久化办法,然而底层曾经帮用户做了长久化操作,通过上面的源码能够看出。
private[streaming]
class WindowedDStream[T: ClassTag](parent: DStream[T],
_windowDuration: Duration,
_slideDuration: Duration)
extends DStream[T](parent.ssc) {
// 省略代码...
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
}
留神:与 RDD 的长久化不同,DStream 的默认持久性级别将数据序列化在内存中,通过上面的源码能够看出:
/** 给定一个持打算级别 */
def persist(level: StorageLevel): DStream[T] = {if (this.isInitialized) {
throw new UnsupportedOperationException("Cannot change storage level of a DStream after streaming context has started")
}
this.storageLevel = level
this
}
/** 默认的长久化级别为(MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
def cache(): DStream[T] = persist()
从下面的源码能够看出 persist()与 cache()的次要区别是:
- cache()办法底层调用的是 persist()办法
-
persist()办法有两个重载的办法
- 无参数的 persist(),默认是内存
- perisist(level: StorageLevel), 能够抉择与 RDD 长久化雷同的长久化级别
检查点 Checkpoint
简介
流应用程序通常是 24/ 7 运行的,因而必须对与利用程序逻辑无关的故障(例如系统故障,JVM 解体等)具备弹性的容错能力。为此,Spark Streaming 须要将足够的信息 checkpoint
到容错存储系统(比方 HDFS),以便能够从故障中复原。检查点包含两种类型:
-
元数据检查点
元数据检查点能够保障从 Driver 程序失败中复原。即如果运行 drive 的节点失败时,能够查看最近的 checkpoin 数据获取最新的状态。典型的应用程序元数据包含:
- 配置:用于创立流应用程序的配置。
- DStream 操作:定义流应用程序的 DStream 操作。
- 未实现的 batch:以后运行 batch 对应的 job 在队列中排队,还没有计算到该 batch 的数据。
- 数据检查点
将生成的 RDD 保留到牢靠的存储中。在某些 有状态 转换中,须要合并多个批次中的数据,所以须要开启检查点。在此类转换中,生成的 RDD 依赖于先前批次的 RDD,这导致依赖链的长度随工夫一直减少。为了防止复原工夫无限度的减少(与依赖链成比例),有状态转换的两头 RDD 定期 checkpoint到牢靠的存储(例如 HDFS),以切断依赖链,性能相似于长久化,只须要从以后的状态复原,而不须要从新计算整个 lineage。
总而言之,从 Driver 程序故障中复原时,次要须要元数据检查点。而如果应用有状态转换,则须要数据或 RDD 检查点。
什么时候启用检查点
必须为具备以下类型的应用程序启用检查点:
- 应用了有状态转换转换操作
如果在应用程序中应用
updateStateByKey
或reduceByKeyAndWindow
,则必须提供检查点目录以容许定期进行 RDD 检查点。 - 从运行应用程序的 Driver 程序故障中复原
元数据检查点用于复原进度信息。
留神,没有前述状态转换的简略流应用程序能够在不启用检查点的状况下运行。在这种状况下,从驱动程序故障中复原也将是局部的(某些失落但未解决的数据可能会失落)。这通常是能够承受的,并且许多都以这种形式运行 Spark Streaming 应用程序。预计未来会改善对非 Hadoop 环境的反对。
如何配置检查点
能够通过具备容错的、牢靠的文件系统(例如 HDFS,S3 等)中设置目录来启用检查点,将检查点信息保留到该目录中。开启检查点,须要开启上面的两个配置:
- streamingContext.checkpoint(<dir>):配置检查点的目录,比方 HDFS 门路
- dstream.checkpoint(<duration>):检查点的频率
其中配置检查点的工夫距离是可选的。如果不设置,会依据 DStream 的类型抉择一个默认值。对于 MapWithStateDStream,默认的检查点距离是 batch interval 的 10 倍。对于其余的 DStream,默认的检查点距离是 10S,或者是 batch interval 的间隔时间。须要留神的是:checkpoint 的频率必须是 batch interval 的整数倍,否则会报错。
此外,如果要使应用程序从 Driver 程序故障中复原,则须要应用上面的形式创立 StreamingContext:
def createStreamingContext (conf: SparkConf,checkpointPath: String):
StreamingContext = {val ssc = new StreamingContext( <ConfInfo>)
// .... other code ...
ssc.checkPoint(checkpointDirectory)
ssc
}
#创立一个新的 StreamingContext 或者从最近的 checkpoint 获取
val context = StreamingContext.getOrCreate(checkpointDirectory,
createStreamingContext _)
#启动
context.start()
context.awaitTermination()
- 程序首次启动时,它将创立一个新的 StreamingContext,而后调用 start()。
- 失败后重新启动程序时,它将依据检查点目录中的检查点数据从新创立 StreamingContext。
留神:
RDD 的检查点须要将数据保留到牢靠存储上,由此带来一些老本开销。这可能会导致 RDD 取得检查点的那些批次的解决工夫减少。因而,须要设置一个正当的检查点的距离。在 batch interval 较小时 (例如 1 秒),每个 batch interval 都进行检查点可能会大大降低吞吐量。相同,检查点工夫距离太长会导致 lineage 和工作规模减少,这可能会产生不利影响。对于须要 RDD 检查点的有状态转换,默认距离为 batch interval 的倍数,至多应为 10 秒。能够应用 dstream.checkpoint(checkpointInterval) 进行配置。通常,DStream 的 5 -10 个 batch interval 的检查点距离是一个较好的抉择。
检查点和长久化之间的区别
-
长久化
- 当咱们将 RDD 放弃在 DISK_ONLY 存储级别时,RDD 将存储在一个地位,该 RDD 的后续应用将不会从新计算 lineage。
- 在调用 persist()之后,Spark 会记住 RDD 的 lineage,即便它没有调用它。
- 作业运行实现后,将革除缓存并销毁文件。
-
检查点
- 检查点将 RDD 存储在 HDFS 中,将会删除 lineage 血缘关系。
- 在实现作业运行后,与持打算不同,不会删除检查点文件。
- 当 checkpoint 一个 RDD 时,将导致双重计算。即该操作在实现理论的计算工作之前,首先会调用长久化办法,而后再将其写入检查点目录。
应用 DataFrames & SQL 解决流数据
在 Spark Streaming 利用中,能够轻松地对流数据应用 DataFrames 和 SQL 操作。应用案例如下:
object SqlStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf()
.setAppName(SqlStreaming.getClass.getSimpleName)
.setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.foreachRDD { rdd =>
// 调用 SparkSession 单例办法, 如果曾经创立了,则间接返回
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
val wordsDataFrame = rdd.toDF("word")
wordsDataFrame.show()
wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()}
ssc.start()
ssc.awaitTermination()}
}
/** SparkSession 单例 */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()}
instance
}
}
总结
本文是 Spark Streaming 编程指南的第二篇分享,次要包含有状态的计算、基于工夫的窗口操作、检查点等内容。下一篇将分享Spark MLLib 机器学习。
关注公众号 大数据技术与数仓,及时理解最新动静