关于大数据:图解大数据-流式数据处理Spark-Streaming

42次阅读

共计 9554 个字符,预计需要花费 24 分钟才能阅读完成。

作者:韩信子 @ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/179
申明:版权所有,转载请分割平台与作者并注明出处

1.Spark Streaming 解读

1)Spark Streaming 简介

Spark Streaming 是 Spark 外围 API 的一个扩大,能够实现实时数据的可拓展,高吞吐量,容错机制的实时流解决框架。

Spark Streaming 反对的数据输出源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简略的 TCP 套接字等等。数据输出后能够用 Spark 的高度形象原语如:map、reduce、join、window 等进行运算。而后果也能保留在很多中央,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完满交融。

(1)流数据特点

  • 数据始终在变动
  • 数据无奈回退
  • 数据始终源源不断涌进

(2)DStream 概念

和 Spark 基于 RDD 的概念很类似,Spark Streaming 应用离散化流 (discretized stream) 作为形象示意,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在外部,每个工夫区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因而得名“离散化”)。

(3)DStream 造成步骤

  • 针对某个时间段切分的小数据块进行 RDD DAG 构建。
  • 间断工夫内产生的一连串小的数据进行切片解决别离构建 RDD DAG,造成 DStream。

定义一个 RDD 解决逻辑,数据依照工夫切片,每次流入的数据都不一样,然而 RDD 的 DAG 逻辑是一样的,即依照工夫划分成一个个 batch,用同一个逻辑解决。

DStream 能够从各种输出源创立,比方 Flume、Kafka 或者 HDFS。创立进去的 DStream 反对两种操作,一种是转化操作(transformation),会生成一个新的 DStream,另一种是输入操作(output operation),能够把数据写入内部零碎中。DStream 提供了许多与 RDD 所反对的操作相相似的操作反对,还减少了与工夫相干的新操作,比方滑动窗口。

2)Spark Streaming 特点

Spark Streaming 有下述一些特点:

  • 易用:Spark Streaming 反对 Java、Python、Scala 等编程语言,能够像编写离线程序一样编写实时计算的程序求照的器。
  • 容错 :Spark Streaming 在没有额定代码和配置的状况下,能够复原失落的数据。对于实时计算来说,容错性至关重要。首先要明确一下 Spak 中 RDD 的容错机制,即每一个 RDD 都是个不可变的分布式可重算的数据集,它记录着确定性的操作继承关系(lineage),所以只有输出数据是可容错的,那么任意一个 RDD 的分区(Partition) 出错或不可用,都能够应用原始输出数据通过转换操作从新计算失去。
  • 易整合到 Spark 体系中:Spark Streaming 能够在 Spark 上运行,并且还容许重复使用雷同的代码进行批处理。也就是说,实时处理能够与离线解决相结合,实现交互式的查问操作。

3)Spark Streaming 架构

大家晓得 Spark 的工作机制如下:

而 SparkStreaming 架构由三个模块组成:

在上图中几个外围的角色和性能别离是:

  • Master:记录 Dstream 之间的依赖关系或者血缘关系,并负责任务调度以生成新的 RD
  • Worker:

    • ①从网络接收数据并存储到内存中
    • ②执行 RDD 计算
  • Client:负责向 Spark Streaming 中灌入数据(flume kafka)

4)Spark Streaming 作业提交

(1)相干组件

Spark Sreaming 的作业提交蕴含的组件和性能别离为:

  • Network Input Tracker:跟踪每一个网络 received 数据,并且将其映射到相应的 Input Dstream 上
  • Job Scheduler:周期性的拜访 Dstream Graph 并生成 Spark Job,将其交给 Job Manager 执行
  • Job Manager:获取工作队列,并执行 Spark 工作

(2)具体流程

具体的作业提交流程如下:

要传入的数据会编排成 block id(元数据)的模式,再加上 RDD 的逻辑,就生产了 job scheduler,通过 job manager 造成 job queue,以队列模式有序执行。真正的数据是以 block 模式传入 worker,由 worker 上的 executor 通过元数据信息 Block ID 去 HDFS 上拉取对应的 block 数据进行执行。

Network Input Tracker 传入的并不是真正的数据,而是 Block IDs,相当于获取的是元数据,数据是通过 worker 进行承受的,也就是说 Master 上不论真正数据的承受状况,Master 上只是可能拿到数据 block 的 id,至于这些 block 做什么操作,是会放到 Job Manager 去,依照程序执行。

5)SparkStreaming 工作原理

Discretized Stream 是 Spark Streaming 的根底形象,代表持续性的数据流和通过各种 Spark 原语操作后的后果数据流。在外部实现上,DStream 是一系列间断的 RDD 来示意。每个 RDD 含有一段时间距离内的数据。

简略来说,SparkStreaming 承受实时的数据流,把数据依照指定的时间段切成一片片小的数据块(SparkStreaming 将每个小的数据块当作 RDD 来解决),而后把数据块传给 Spark Engine 解决,最终失去一批批的后果。

  • 每一批数据,在 Spark 内核中对应一个 RDD 实例
  • DStream 能够看作一组 RDDs,是继续的 RDD 序列

对于 Streaming 来说,它的单位是 DStream,而对于 SparkCore,它的单位是 RDD。针对 Spark 开发,就是开发 RDD 的 DAG 图,而针对 SparkStreaming,就是开发 DStream。

DStream 代表间断的一组 RDD,每个 RDD 都蕴含特定工夫距离的数据。DStream 外部的操作,能够间接映射到外部 RDD 进行,相当于 DStream 是在 RDD 上减少一个工夫的维度失去的。RDD 是 DStream 最小的一个数据单元。DStream 中对数据的操作也是依照 RDD 为单位来进行的。

简略来了解,SparkStreaming 对于流数据的处理速度是秒级别,无奈达到 Storm 的毫秒级别,因而也能够将 Streaming 看作是微批处理。

2.DStream 详解

大家在上文中频繁看到 Dstream 的外围概念,上面咱们对其做一些开展解说。

整体上看,Spark Streaming 的解决思路:将间断的数据长久化、离散化,而后进行批量处。

对下面这句话进行剖析:

  • 数据长久化:接管到的数据暂存,不便数据出错进行回滚
  • 离散化:按工夫分片,造成处理单元
  • 分片解决:采纳 RDD 模式将数据分批解决
  • DStream 相当于对 RDD 的再次封装,它提供了转化操作和输入操作两种操作方法

1)DStream 创立注意事项

Spark Streaming 原生反对一些不同的数据源。一些“外围”数据源曾经被打包到 Spark Streaming 的 Maven 工件中,而其余的一些则能够通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的工作的模式运行,因而会占据调配给利用的 CPU 外围。

此外,咱们还须要有可用的 CPU 外围来解决数据。这意味着如果要运行多个接收器,就必须至多有和接收器数目雷同的外围数,还要加上用来实现计算所须要的外围数。例如,如果咱们想要在流计算利用中运行 10 个接收器,那么至多须要为利用调配 11 个 CPU 外围。所以如果在本地模式运行,不要应用 local 或者 local。

2)DStream 转换

(1)TransFormation 算子与输入

DStream 上的原语与 RDD 的相似,分为 Transformations(转换)和 Output Operations(输入)两种,此外转换操作中还有一些比拟非凡的原语,如:updateStateByKey()、transform()以及各种 Window 相干的原语。

① TransFormation

  • Spark 反对 RDD 进行各种转换,因为 Dstream 是由 RDD 组成的,Spark Streaming 提供了一个能够在 DStream 上应用的转换汇合,这些汇合和 RDD 上可用的转换相似;
  • 转换利用到 Dstream 的每个 RDD;
  • Spark Streaming 提供了 reduce 和 count 这样的算子,但不会间接触发 Dstream 计算;
  • 罕用算子:Map、flatMap、join、reduceByKey;

② Output

  • Print:控制台输入;
  • saveAsObjectFile、saveAsTextFile、saveAsHadoopFiles:将一批数据输入到 Hadoop 文件系统中,用批量数据的开始工夫戳来命名;
  • forEachRDD:容许用户对 Stream 的每一批量数据对应的 RDD 自身做任意操作;

DStream = [rdd1, rdd2, …, rddn]
RDD 两类算子:transformation、action
DStream 两类算子:transformation、output

(2)无状态转换

无状态转化操作就是把简略的 RDD 转化操作利用到每个批次上,也就是转化 DStream 中的每一个 RDD。局部无状态转化操作列在了下表中。留神,针对键值对的 DStream 转化操作 (比方 reduceByKey()) 要增加 import StreamingContext.\_能力在 Scala 中应用。

函数名称 目标 Scala 示例 用来操作 DStream[T] 的用户自定义函数的函数签名
map () 对 DStream 中的每个元素利用给定函数,返回由各元素输入的元素组成的 DStream ds.map(x => x + 1) f : (T) -> U
flatMap() 对 DStream 中的每个元素利用给定函数,返回由各元素输入的迭代器组成的 DStream ds.flatMap(x => x.split (“”) ) f : T -> Iterable [U]
filter() 返回由给定 DStream 中通过筛选的元素组成的 DStream ds.filter(x => x! = 1) f : T -> Boolean
repartition() 扭转 DStream 的分区数 ds.repartition(10) N / A
reduceByKey() 将每个批次中键雷同的记录归约 ds.reduceByKey((x, y) => x + y) f : T , T -> T
groupByKey() 将每个批次中的记录依据键分组 ds.groupByKey() N / A

须要留神的是,只管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在外部是由许多 RDD(批次)组成,且无状态转化操作是别离利用到每个 RDD 上的。例如,reduceByKey()会归约每个工夫区间中的数据,但不会归约不同区间之间的数据。

无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个工夫区间内。例如,键 值对 DStream 领有和 RDD 一样的与连贯相干的转化操作,也就是 cogroup()、join()、leftOuterJoin() 等。咱们能够在 DStream 上应用这些操作,这样就对每个批次别离执行了对应的 RDD 操作。

咱们还能够像在惯例的 Spark 中一样应用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也能够应用 StreamingContext.union()来合并多个流。

(3)有状态转换

① UpdateStateByKey(全局统计量)

UpdateStateByKey 原语用于记录历史记录,有时,咱们须要在 DStream 中跨批次保护状态 (例如流计算中累加 wordcount)。针对这种状况,updateStateByKey() 为咱们提供了对一个状态变量的拜访,用于键值对模式的 DStream。

给定一个由 (键,事件) 对形成的 DStream,并传递一个指定如何依据新的事件更新每个键对应状态的函数,它能够构建出一个新的 DStream,其外部数据为(键,状态) 对。

  • updateStateByKey() 的后果会是一个新的 DStream,其外部的 RDD 序列是由每个工夫区间对应的 (键,状态) 对组成的。
  • updateStateByKey 操作使得咱们能够在用新信息进行更新时放弃任意的状态。

为应用这个性能,你须要做上面两步:

  • 定义状态,状态能够是一个任意的数据类型。
  • 定义状态更新函数,用此函数说明如何应用之前的状态和来自输出流的新值对状态进行更新。应用 updateStateByKey 须要对检查点目录进行配置,会应用检查点来保留状态。

如果要应用 updateStateByKey 算子,就必须设置一个 checkpoint 目录,开启 checkpoint 机制,这样的话能力把每个 key 对应的 state 除了在内存中有,在磁盘上也 checkpoint 一份。因为要长期保留一份 key 的 state 的话,那么 spark streaming 是要求必须用 checkpoint 的,以防止内存数据的失落。

次要解决:比如说在双十一统计一天销量和成交金额,这些计算须要全量汇总,对数据进行累加,就须要防止数据在内存中失落,造成不精确

② Window Operations

Window Operations 有点相似于 Storm 中的 State,能够设置窗口的大小和滑动窗口的距离来动静的获取以后 Steaming 的容许状态。

基于窗口的操作会在一个比 StreamingContext 的批次距离更长的工夫范畴内,通过整合多个批次(在窗口内的批次)的后果,计算出整个窗口的后果。

简略来说,Streaming 的 Window Operations 是 Spark 提供的一组窗口操作,通过滑动窗口的技术,对大规模数据的增量更新进行统计分析,即定时进行一段时间内的数据处理。

所有基于窗口的操作都须要两个参数,别离为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次距离的整数倍。

窗口时长管制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次距离的源 DStream,要创立一个最近 30 秒的工夫窗口(即最近 3 个批次),就该当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次距离相等,用来管制对新的 DStream 进行计算的距离。如果源 DStream 批次距离为 10 秒,并且咱们只心愿每两个批次计算一次窗口后果,就应该把滑动步长设置为 20 秒。

  • 窗口总长度(window length)// Reduce last 30 seconds of data, every 10 seconds
  • 滑动工夫距离(slide interval)va1 windowedWordCounts = pairs.reduceByKeyAndWindow(_+_, Seconds(30), Seconds(10))

滑动窗口的长度必须是滑动工夫距离的整数倍。因为 RDD 是 DStream 上最小的数据单元不可切分。如果不是整数倍,会呈现一个 RDD 被切分的状况,程序会报错。

3)DStream Graph

DStream Graph 是一系列 transformation 操作的形象,例如:

c = a.join(b), d = c.filter() 时,它们的 DAG 逻辑关系是 a /b → c,c → d,但在 Spark Streaming 在进行物理记录时却是反向的 a/b ← c, c ← d,目标是为了追溯。

(1)DStreamGraph

  • ①找代码输入
  • ②依据输入再往前追溯依赖关系

Dstream 之间的转换所造成的的依赖关系全副保留在 DStreamGraph 中,DStreamGraph 对于前期生成 RDD Graph 至关重要。
DStreamGraph 有点像简洁版的 DAG scheduler,负责依据某个工夫距离生成一序列 JobSet,以及依照依赖关系序列化

代码是始终在跑的,每隔肯定工夫就会造成一个 RDD。

(2)DStream 与 RDD 比照与了解

DStream.map(RDD => RDD.map)

  • 工夫维度:batchinterval 为工夫距离一直的生成 Job 实例并在集群上运行。
  • 空间维度:代表 RDD 依赖关系形成的具体的业务逻辑的解决步骤,用 DStreamGraph 示意

随看工夫的流逝,基于 Dstream Graph 一直的生成 RDD Graph 也就是 DAG 的形式产生 Job,并通过 Jobscheduler 的线程池提交给 SparkCluster 一直的执行。

每个工夫距离会积攒肯定的数据,这些数据能够看成由 event 组成(假如以 kafka 或者 Flume 为例),工夫距离是固定的,在工夫距离内的数据就是固定的。也就是 RDD 是由一个工夫距离内所有数据形成。工夫维度的不同,导致每次解决的数据量及内容不同。

3.Spark Streaming 利用代码示例

咱们先来看一看一个简略的 Spark Streaming 程序的样子。如果咱们想要计算从一个监听 TCP socket 的数据服务器接管到的文本数据(text data)中的字数,咱们能够依照如下步骤进行:

① 首先, 咱们导入 StreamingContext, 这是所有流性能的次要入口点。咱们创立了一个带有 2 个执行线程和间歇工夫为 1 秒的本地 StreamingContext。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 创立一个具备两个工作线程(working thread)并且批次距离为 1 秒的本地 StreamingContext .
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

② 应用该 context,咱们创立一个代表从 TCP 源流数据的 DStream,指定主机名(例如 localhost)和端口(例如 9999)。

# 创立一个将要连贯到 hostname:port 的 DStream,如 localhost:9999 
lines = ssc.socketTextStream("localhost", 9999)

③ 上述 lines DStream 示意将要从数据服务器接管到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,咱们心愿通过空格字符拆分这些数据,把每一行切分为单词。

# 将每一行拆分成单词
words = lines.flatMap(lambda line: line.split(" "))

④ flatMap 是一种一对多的 DStream 操作,它会通过在源 DStream 中依据每个记录生成多个新纪录的模式创立一个新的 DStream。在这种状况下,每一行都将被拆分成多个单词和代表单词 DStream 的单词流。下一步,咱们想要计算这些单词:

# 计算每一个 batch(批次)中的每一个 word(单词)pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# 在控制台打印出在这个 DStream 中生成的每个 RDD 的前十个元素
wordCounts.print()

上述单词 DStream 进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的 DStream,这个 DStream 而后被 reduce 来取得数据中每个批次的单词频率。最初,wordCounts.print() 将会打印一些每秒生成的统计后果。

⑤ 留神当这些行被执行的时候,Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地解决。为了在所有的转换都曾经设置好之后开始解决,咱们在最初调用:

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

该局部残缺的代码能够在 Spark Streaming 示例 NetworkWordCount 中找到。

如果你曾经 下载 并且 构建 Spark, 您能够应用如下形式来运行该示例. 你首先须要运行 Netcat(一个在大多数类 Unix 零碎中的小工具)作为咱们应用的数据服务器。

$ nc -lk 9999

而后,在另一个不同的终端,你能够通过执行如下命令来运行该示例:

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

而后,在运行在 netcat 服务器上的终端输出的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像上面这样:

# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world

# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...

4. 参考资料

  • 数据迷信工具速查 | Spark 使用指南(RDD 版) http://www.showmeai.tech/article-detail/106
  • 数据迷信工具速查 | Spark 使用指南(SQL 版) http://www.showmeai.tech/article-detail/107

ShowMeAI 相干文章举荐

  • 图解大数据 | 导论:大数据生态与利用
  • 图解大数据 | 分布式平台:Hadoop 与 Map-reduce 详解
  • 图解大数据 | 实操案例:Hadoop 零碎搭建与环境配置
  • 图解大数据 | 实操案例:利用 map-reduce 进行大数据统计
  • 图解大数据 | 实操案例:Hive 搭建与利用案例
  • 图解大数据 | 海量数据库与查问:Hive 与 HBase 详解
  • 图解大数据 | 大数据分析开掘框架:Spark 初步
  • 图解大数据 | Spark 操作:基于 RDD 的大数据处理剖析
  • 图解大数据 | Spark 操作:基于 Dataframe 与 SQL 的大数据处理剖析
  • 图解大数据 | 综合案例:应用 spark 剖析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘批发交易数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark 机器学习(上)- 工作流与特色工程
  • 图解大数据 | Spark 机器学习(下)- 建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析开掘

ShowMeAI 系列教程举荐

  • 图解 Python 编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解 AI 数学根底:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程
  • 图解机器学习算法:从入门到精通系列教程

正文完
 0