尽管 SparkStreaming 曾经进行更新,Spark 的重点也放到了 Structured Streaming,但因为 Spark 版本过低或者其余技术选型问题,可能还是会抉择 SparkStreaming。
SparkStreaming 对于工夫窗口,事件工夫尽管撑持较少,但还是能够满足局部的实时计算场景的,SparkStreaming 材料较多,这里也做一个简略介绍。
一. 什么是 Spark Streaming
Spark Streaming 在过后是为了与过后的 Apache Storm 竞争,也让 Spark 能够用于流式数据的解决。依据其官网文档介绍,Spark Streaming 有高吞吐量和容错能力强等特点。Spark Streaming 反对的数据输出源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简略的 TCP 套接字等等。数据输出后能够用 Spark 的高度形象原语如:map、reduce、join、window 等进行运算。而后果也能保留在很多中央,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完满交融。
当然 Storm 目前曾经慢慢淡出,Flink 开始大放异彩。
Spark 与 Storm 的比照
二、SparkStreaming 入门
Spark Streaming 是 Spark Core API 的扩大,它反对弹性的,高吞吐的,容错的实时数据流的解决。数据能够通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也能够通过例如 map
,reduce
,join
,window
等的高级函数组成的简单算法解决。最终,解决后的数据能够输入到文件系统,数据库以及实时仪表盘中。事实上,你还能够在 data streams(数据流)上应用 [机器学习] 以及 [图计算] 算法。
在外部,它工作原理如下,Spark Streaming 接管实时输出数据流并将数据切分成多个 batch(批)数据,而后由 Spark 引擎解决它们以生成最终的 stream of results in batches(分批流后果)。
Spark Streaming 提供了一个名为 discretized stream 或 DStream 的高级形象,它代表一个间断的数据流。DStream 能够从数据源的输出数据流创立,例如 Kafka,Flume 以及 Kinesis,或者在其余 DStream 上进行高层次的操作以创立。在外部,一个 DStream 是通过一系列的 [RDDs] 来示意。
本指南通知你如何应用 DStream 来编写一个 Spark Streaming 程序。你能够应用 Scala,Java 或者 Python(Spark 1.2 版本后引进)来编写 Spark Streaming 程序。
在 idea 中新建 maven 我的项目
引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.4</version>
</dependency>
Project Structure —— Global Libraries —— 把 scala 增加到 add module
新建 Scala Class
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Demo {
// 屏蔽日志
Logger.getLogger("org.apache")setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
//local 会有问题 起码两个线程 一个拿数据 一个计算
//val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
// 工夫距离
val ssc = new StreamingContext(conf,Seconds(1))
// 接收数据 解决
//socket demo
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = value.flatMap(_.split(" "))
val wordsTuple: DStream[(String, Int)] = words.map((_, 1))
val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _)
// 触发 action
wordcount.print()
ssc.start()
// 放弃流的运行 期待程序被终止
ssc.awaitTermination()}
}
测试
下载一个 win10 用的 netcat
https://eternallybored.org/mi…
下载 netcat 1.12
解压 在目录下启动 cmd
输出
nc -L -p 9999
开始输出单词 在 idea 中验证接管
原理
初始化 StreamingContext
为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创立进去,它是所有的 Spark Streaming 性能的主入口点。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
参数是展现在集群 UI 界面上的应用程序的名称
master
是 local 或者 spark 集群的 url(mesos yarn)
本地测试能够用local[*] 留神要多于两个线程
Second(1)定义的是 batch interval 批处理距离 就是距离多久去拿一次数据
在定义一个 context 之后, 您必须执行以下操作。
- 通过创立输出 DStreams 来定义输出源。
- 通过利用转换和输入操作 DStreams 定义流计算(streaming computations)。
- 开始接管输出并且应用
streamingContext.start()
来解决数据。 - 应用
streamingContext.awaitTermination()
期待解决被终止(手动或者因为任何谬误)。 - 应用
streamingContext.stop()
来手动的进行解决。
须要记住的几点:
- 一旦一个 context 曾经启动,将不会有新的数据流的计算能够被创立或者增加到它。
- 一旦一个 context 曾经进行,它不会被重新启动。
- 同一时间外在 JVM 中只有一个 StreamingContext 能够被激活。
- 在 StreamingContext 上的 stop() 同样也进行了 SparkContext。为了只进行 StreamingContext,设置
stop()
的可选参数,名叫stopSparkContext
为 false。 - 一个 SparkContext 就能够被重用以创立多个 StreamingContexts,只有前一个 StreamingContext 在下一个 StreamingContext 被创立之前进行(不进行 SparkContext)。
Discretized Stream or DStream
Discretized Stream or DStream 是 Spark Streaming 提供的根本形象。它代表了一个间断的数据流。可能是数据源接管的流,也可能是转换后的流。
DStream 就是多个和工夫相干的一系列间断 RDD 的汇合,比方本例就是距离一秒的一堆 RDD 的汇合
DStream 也是有依赖关系的
flatMap 操作也是间接作用在 DStream 上的,就和作用于 RDD 一样 这样很好了解
咱们先来看数据源接管的流 这种叫做 Input DStreams 他会通过 Receivers 接收器去不同的数据源接收数据。
Spark Streaming 内置了两种数据源:
- 根底的数据源:比方方才用的 socket 接管 还有 file systems
- 高级的数据源:比方 kafka 还有 flume kinesis 等等
留神本地运行时,不要用 local 或者 local[1], 一个线程不够。放到集群上时调配给 SparkStreaming 的核数必须大于接收器的数量,留一个核去解决数据。
咱们也能够自定义数据源,那咱们就须要本人开发一个接收器。
Transformations
在咱们接管到 Dstreams 之后能够进行转换操作,常见转换如下:
Transformation(转换) | Meaning(含意) |
---|---|
map(func) | 利用函数 func 解决原 DStream 的每个元素,返回一个新的 DStream。 |
flatMap(func) | 与 map 类似,然而每个输出项可用被映射为 0 个或者多个输入项。。 |
filter(func) | 返回一个新的 DStream,它仅仅蕴含原 DStream 中函数 func 返回值为 true 的项。 |
repartition(numPartitions) | 通过创立更多或者更少的 partition 以扭转这个 DStream 的并行级别(level of parallelism)。 |
union(otherStream) | 返回一个新的 DStream,它蕴含源 DStream 和 otherDStream 的所有元素。 |
count() | 通过 count 源 DStream 中每个 RDD 的元素数量,返回一个蕴含单元素(single-element)RDDs 的新 DStream。 |
reduce(func) | 利用函数 func 汇集源 DStream 中每个 RDD 的元素,返回一个蕴含单元素(single-element)RDDs 的新 DStream。函数应该是相关联的,以使计算能够并行化。 |
countByValue() | 在元素类型为 K 的 DStream 上,返回一个(K,long)pair 的新的 DStream,每个 key 的值是在原 DStream 的每个 RDD 中的次数。 |
reduceByKey(func, [_numTasks_]) | 当在一个由 (K,V) pairs 组成的 DStream 上调用这个算子时,返回一个新的,由 (K,V) pairs 组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚合起来。留神:在默认状况下,这个算子利用了 Spark 默认的并发工作数去分组。你能够用 numTasks 参数设置不同的工作数。 |
join(otherStream, [_numTasks_]) | 当利用于两个 DStream(一个蕴含(K,V)对,一个蕴含 (K,W) 对),返回一个蕴含 (K, (V, W)) 对的新 DStream。 |
cogroup(otherStream, [_numTasks_]) | 当利用于两个 DStream(一个蕴含(K,V)对,一个蕴含 (K,W) 对),返回一个蕴含 (K, Seq[V], Seq[W]) 的 tuples(元组)。 |
transform(func) | 通过对源 DStream 的每个 RDD 利用 RDD-to-RDD 函数,创立一个新的 DStream。这个能够在 DStream 中的任何 RDD 操作中应用。 |
updateStateByKey(func) | 返回一个新的 “ 状态 ” 的 DStream,其中每个 key 的状态通过在 key 的先前状态利用给定的函数和 key 的新 valyes 来更新。这能够用于保护每个 key 的任意状态数据。 |
这里咱们特地介绍一下 updateStateByKey
咱们如果须要对历史数据进行统计,可能须要去 kafka 里拿一下之前留存的数据,也能够用 updateStateByKey 这个办法。
// 保留状态 聚合雷同的单词
val wordcount = wordsTuple.updateStateByKey[Int](
//updateFunction _
(newValues: Seq[Int], runningCount: Option[Int])=> {val newCount = Some(newValues.sum + runningCount.getOrElse(0))
newCount
}
)
比方方才的单词计数,咱们只能统计每一次发过来的音讯,然而如果心愿统计屡次音讯就须要用到这个,咱们要指定一个 checkpoint,就是从哪开始算。
// 减少成员变量
val checkpointDir = "./ckp"
// 在办法中退出 checkpoint
ssc.checkpoint(checkpointDir)
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
value.checkpoint(Seconds(4))// 官网倡议批次工夫的 1 - 5 倍
这时候咱们建设 StreamingContext 的办法就要扭转了 咱们把方才的创立过程提取成办法。
def creatingFunc():StreamingContext = {val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(checkpointDir)
val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
value.checkpoint(Seconds(4))// 官网倡议批次工夫的 1 - 5 倍
val words: DStream[String] = value.flatMap(_.split(" "))
val wordsTuple: DStream[(String, Int)] = words.map((_, 1))
// 保留状态 聚合雷同的单词
val wordcount = wordsTuple.updateStateByKey[Int](
//updateFunction _
(newValues: Seq[Int], runningCount: Option[Int])=> {val newCount = Some(newValues.sum + runningCount.getOrElse(0))
newCount
}
)
// 触发 action
wordcount.print()
ssc
}
在 mian 函数中批改为:
def main(args: Array[String]): Unit = {val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _)
ssc.start()
// 放弃流的运行 期待程序被终止
ssc.awaitTermination()}
这样就是,如果有 checkpoint,程序会在 checkpoint 中把程序加载回来(程序被保留为二进制),没有 checkpoint 的话才会创立。
将目录下的 checkpoint 删除,就能够将状态删除。
生产中 updateStateByKey 因为会将数据备份要谨慎应用,能够思考用 hbase,redis 等做代替。或者借助 kafka 做聚合解决。
// 如果不必 updatestateByKey 能够思考 redis
wordsTuple.foreachRDD(rdd => {
rdd.foreachPartition(i =>
{//redis}
)
})
窗口操作
Spark Streaming 也反对 _windowed computations(窗口计算),它容许你在数据的一个滑动窗口上利用 transformation(转换)。
如上图显示,窗口在源 DStream 上 _slides(滑动),任何一个窗口操作都须要指定两个参数:
- window length(窗口长度) – 窗口的持续时间。
- sliding interval(滑动距离) – 执行窗口操作的距离。
比方计算过来 30 秒的词频:
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些罕用的窗口操作如下所示,这些操作都须要用到上文提到的两个参数 – windowLength(窗口长度) 和 _slideInterval(滑动的工夫距离)_。
Transformation(转换) | Meaning(含意) |
---|---|
window(windowLength, slideInterval) | 返回一个新的 DStream,它是基于 source DStream 的窗口 batch 进行计算的。 |
countByWindow(windowLength, slideInterval) | 返回 stream(流)中滑动窗口元素的数 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素 stream(流),它通过在一个滑动距离的 stream 中应用 func 来聚合以创立。该函数应该是 associative(关联的)且 commutative(可替换的),以便它能够并行计算 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [_numTasks_]) | 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, V) pairs 的 Stream,其中的每个 key 的 values 是在滑动窗口上的 batch 应用给定的函数 func 来聚合产生的。Note(留神): 默认状况下,该操作应用 Spark 的默认并行任务数量(local model 是 2,在 cluster mode 中的数量通过 spark.default.parallelism 来确定)来做 grouping。您能够通过一个可选的 numTasks 参数来设置一个不同的 tasks(工作)数量。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [_numTasks_]) | 上述 reduceByKeyAndWindow() 的更无效的一个版本,其中应用前一窗口的 reduce 值逐步计算每个窗口的 reduce 值。这是通过缩小进入滑动窗口的新数据,以及“inverse reducing(逆减)”来到窗口的旧数据来实现的。一个例子是当窗口滑动时”增加”和“减”keys 的数量。然而,它仅实用于“invertible reduce functions(可逆缩小函数)”,即具备相应“inverse reduce(反向缩小)”函数的 reduce 函数(作为参数 invFunc </ i>)。像在 reduceByKeyAndWindow 中的那样,reduce 工作的数量能够通过可选参数进行配置。请留神,针对该操作的应用必须启用 checkpointing. |
countByValueAndWindow(windowLength, slideInterval, [_numTasks_]) | 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, Long) pairs 的 DStream,其中每个 key 的 value 是它在一个滑动窗口之内的频次。像 code>reduceByKeyAndWindow 中的那样,reduce 工作的数量能够通过可选参数进行配置。 |
Join 操作
在 Spark Streaming 中能够执行不同类型的 join
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
// 也能够用窗口
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
DStreams 输入操作
输入操作容许将 DStream 的数据推送到内部零碎,如数据库或文件系统。
会触发所有变换的执行,相似 RDD 的 action 操作。有如下操作:
Output Operation | Meaning |
---|---|
print() | 在运行流应用程序的 driver 节点上的 DStream 中打印每批数据的前十个元素。这对于开发和调试很有用。 |
Python API 这在 Python API 中称为 pprint()。 | |
saveAsTextFiles(prefix, [_suffix_]) | 将此 DStream 的内容另存为文本文件。每个批处理距离的文件名是依据 前缀 和 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。 |
saveAsObjectFiles(prefix, [_suffix_]) | 将此 DStream 的内容另存为序列化 Java 对象的 SequenceFiles 。每个批处理距离的文件名是依据 前缀 和 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。 |
Python API 这在 Python API 中是不可用的。 | |
saveAsHadoopFiles(prefix, [_suffix_]) | 将此 DStream 的内容另存为 Hadoop 文件。每个批处理距离的文件名是依据 前缀 和 后缀_:“prefix-TIME_IN_MS[.suffix]”_ 生成的。 |
Python API 这在 Python API 中是不可用的。 | |
foreachRDD(func) | 对从流中生成的每个 RDD 利用函数 func 的最通用的输入运算符。此性能应将每个 RDD 中的数据推送到内部零碎,例如将 RDD 保留到文件,或将其通过网络写入数据库。请留神,函数 func 在运行流应用程序的 driver 过程中执行,通常会在其中具备 RDD 动作,这将强制流式传输 RDD 的计算。 |
foreachRDD 设计模式应用
dstream.foreachRDD 容许将数据发送到内部零碎。
但咱们不要每次都创立一个连贯,解决方案如下:
缩小开销,分区摊派开销
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()}
}
更好的做法是用动态资源池:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
连贯 Kafka
Apache Kafka 是一个高性能的音讯零碎,由 Scala 写成。是由 Apache 软件基金会开发的一个开源音讯零碎我的项目。
Kafka 最后是由 LinkedIn 开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目标指标是为解决实时数据提供一个对立、高通量、低期待(低延时)的平台。
更多 kafka 相干请查看 Kafka 入门宝典(具体截图版)
Spark Streaming 2.4.4 兼容 kafka 0.10.0 或者更高的版本
Spark Streaming 在 2.3.0 版本之前是提供了对 kafka 0.8 和 0.10 的反对的,不过在 2.3.0 当前对 0.8 的反对勾销了。
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
API Maturity | Deprecated | Stable |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API | No | Yes |
Dynamic Topic Subscription | No | Yes |
Receiver
这里简略介绍一下对 kafka0.8 的一种反对形式:基于 Receiver
依赖:
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.12
version = 2.4.4
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
这种状况 程序停掉数据会失落,为了不失落本人又写了一份,这种是很多余的。
因为采纳了 kafka 高阶 api,偏移量 offset 不可控。
Direct
Kafka 0.10.0 版本当前, 采纳了更好的一种 Direct 形式,这种咱们须要本人保护偏移量 offset。
直连形式 并行度会更高 生产环境用的最多,0.8 版本须要在 zk 或者 redis 等中央本人保护偏移量。咱们应用 0.10 以上版本反对本人设置偏移量,咱们只须要本人将偏移量写回 kafka 就能够。
依赖
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4
kafka 0.10 当前 能够将 offset 写回 kafka 咱们不须要本人保护 offset 了,具体代码如下:
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
val ssc = new StreamingContext(conf,Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
//latest none earliest
"auto.offset.reset" -> "earliest",
// 主动提交偏移量 false
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//val topics = Array("topicA", "topicB")
val topics = Array("test_topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
// 与 kafka broker 不在一个节点上 用不同策略
// 在一个节点用 PreferBrokers 策略 很少见
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
// 一般的 RDD 不能强转 HasOffsetRanges 但 kafkaRDD 有 with 这个个性 能够强转
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 解决数据 计算逻辑
rdd.foreachPartition { iter =>
// 一次解决一个分区的数据 获取这个分区的偏移量
// 计算完当前批改偏移量 要开启事务 相似数据库 connection -> conn.setAutoCommit(false) 各种操作 conn.commit(); conn.rollback()
// 获取偏移量 如果要本人记录的话这个
//val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
//println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
// 解决数据
iter.foreach(println)
}
//kafka 0.10 新个性 解决完数据后 将偏移量写回 kafka
// some time later, after outputs have completed
//kafka 有一个非凡的 topic 保留偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
更多 Flink,Kafka,Spark 等相干技术博文,科技资讯,欢送关注实时流式计算 公众号后盾回复“电子书”下载 300 页 Flink 实战电子书