本文已收录至 Github,举荐浏览 👉 Java 随想录
微信公众号:Java 随想录
之前说到了之后工作中会接触到 Spark 离线工作相干的内容,也事后学习了 Scala,所以这篇文章它来了。本篇文章会介绍 Spark 的相干概念以及原理,帮忙初学者疾速入门 Spark。
Spark 是什么
学习一个货色之前总要晓得这个货色是什么。
Spark 是一个开源的大数据处理引擎,它提供了一整套开发 API,包含流计算和机器学习。它反对批处理和流解决。
Spark 的一个显著特点是它可能在内存中进行迭代计算,从而放慢数据处理速度。只管 Spark 是用 Scala 开发的,但它也为 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。
Spark 组件
Spark 提供了 6 大组件:
- Spark Core
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark GraphX
- Spark Core
Spark Core 是 Spark 的根底,它提供了内存计算的能力,是分布式解决大数据集的根底。它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的下层组件提供 API。所有 Spark 的下层组件都建设在 Spark Core 的根底之上。
- Spark SQL
Spark SQL 是一个用于解决结构化数据的 Spark 组件。它容许应用 SQL 语句查问数据。Spark 反对多种数据源,包含 Hive 表、Parquet 和 JSON 等。
- Spark Streaming
Spark Streaming 是一个用于解决动静数据流的 Spark 组件。它可能开发出弱小的交互和数据查问程序。在 解决动静数据流时,流数据会被宰割成渺小的批处理,这些渺小批处理将会在 Spark Core 上按工夫程序疾速执行。
- Spark MLlib
Spark MLlib 是 Spark 的机器学习库。它提供了罕用的机器学习算法和实用程序,包含分类、回归、聚类、协同过滤、降维等。MLlib 还提供了一些底层优化原语和高层流水线 API,能够帮忙开发人员更快地创立和调试机器学习流水线。
- Spark GraphX
Spark GraphX 是 Spark 的图形计算库。它提供了一种分布式图形处理框架,能够帮忙开发人员更快地构建和剖析大型图形。
Spark 的劣势
Spark 有许多劣势,其中一些次要劣势包含:
- 速度:Spark 基于内存计算,可能比基于磁盘的计算快很多。对于迭代式算法和交互式数据挖掘工作,这种速度劣势尤为显著。
- 易用性:Spark 反对多种语言,包含 Java、Scala、Python 和 R。它提供了丰盛的内置 API,能够帮忙开发人员更快地构建和运行应用程序。
- 通用性:Spark 提供了多种组件,能够反对不同类型的计算工作,包含批处理、交互式查问、流解决、机器学习和图形处理等。
- 兼容性:Spark 能够与多种数据源集成,包含 Hadoop 分布式文件系统(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
- 容错性:Spark 提供了弹性分布式数据集(RDD)形象,能够帮忙开发人员更快地构建容错应用程序。
Word Count
上面是一个简略的 Word Count 的 Spark 程序:
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {def main (args:Array [String]): Unit = {//setMaster("local[9]") 示意在本地运行 Spark 程序,应用 9 个线程。local[*] 示意应用所有可用的处理器外围。// 这种模式通常用于本地测试和开发。val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]");
val sc = new SparkContext (conf);
sc.setLogLevel("ERROR")
val data = List("Hello World", "Hello Spark")
val textFile = sc.parallelize(data)
val wordCounts = textFile.flatMap (line => line.split (" ")).map (word => (word, 1)).reduceByKey ((a, b) => a + b)
wordCounts.collect().foreach(println)
}
}
程序首先创立了一个 SparkConf 对象,用来设置应用程序名称和运行模式。而后,它创立了一个 SparkContext 对象,用来连贯到 Spark 集群。
接下来,程序创立了一个蕴含两个字符串的列表,并应用 parallelize 办法将其转换为一个 RDD。而后,它应用 flatMap 办法将每一行文本拆分成单词,并应用 map 办法将每个单词映射为一个键值对(key-value pair),其中键是单词,值是 1。
最初,程序应用 reduceByKey 办法将具备雷同键的键值对进行合并,并对它们的值进行求和。最终后果是一个蕴含每个单词及其呈现次数的 RDD。程序应用 collect 办法将后果收集到驱动程序,并应用 foreach 办法打印进去。
Spark 基本概念
Spark 的实践较多,所以先理解一下基本概念,有助于前面开展学习 Spark。
Application
用户编写的 Spark 应用程序。
如下,”Word Count” 就是该应用程序的名字。
import org.apache.spark.sql.SparkSession
object WordCount {def main(args: Array[String]) {
// 创立 SparkSession 对象,它是 Spark Application 的入口
val spark = SparkSession.builder.appName("Word Count").getOrCreate()
// 读取文本文件并创立 Dataset
val textFile = spark.read.textFile("hdfs://...")
// 应用 flatMap 转换将文本宰割为单词,并应用 reduceByKey 转换计算每个单词的数量
val counts = textFile.flatMap(line => line.split(" "))
.groupByKey(identity)
.count()
// 将后果保留到文本文件中
counts.write.text("hdfs://...")
// 进行 SparkSession
spark.stop()}
}
Driver
Driver 是运行 Spark Application 的过程,它负责创立 SparkSession 和 SparkContext 对象,并将代码转换为转换和操作操作。它还负责创立逻辑和物理打算,并与集群管理器协调调度工作。
简而言之,Spark Application 是应用 Spark API 编写的程序,而 Spark Driver 是负责运行该程序并与集群管理器协调的过程。
能够将 Driver 了解为运行 Spark Application main
办法的过程。
driver 的内存大小能够进行设置:
# 设置 driver 内存大小
driver-memory 1024m
Master 和 Worker
在 Spark 中,Master 是独立集群的控制者,而 Worker 是工作者。一个 Spark 独立集群须要启动一个 Master 和多个 Worker。Worker 就是物理节点,能够在下面启动 Executor 过程。
Executor
在每个 Worker 上为某利用启动的一个过程,该过程负责运行 Task,并且负责将数据存在内存或者磁盘上,每个工作都有各自独立的 Executor。Executor 是一个执行 Task 的容器。实际上它是一组计算资源 (cpu 外围、memory) 的汇合。
一个 Worker 节点能够有多个 Executor。一个 Executor 能够运行多个 Task。
executor 创立胜利后,在日志文件会显示如下信息:INFO Executor: Starting executor ID [executorId] on host [executorHostname]
Job
一个 Job 蕴含多个 RDD 及作用于相应 RDD 上的各种操作,每个 Action 的触发就会生成一个 job。用户提交的 Job 会提交给 DAGScheduler,Job 会被分解成 Stage,Stage 会被细化成 Task。
Task
被发送到 executor 上的工作单元。每个 Task 负责计算一个分区的数据。
Stage
在 Spark 中,一个作业(job)会被划分为多个阶段(stage)。同一个 Stage 能够有多个 Task 并行执行(task 数 = 分区数)。
阶段之间的划分是依据数据的依赖关系来确定的。当一个 RDD 的分区依赖于另一个 RDD 的分区时,这两个 RDD 就属于同一个阶段。当一个 RDD 的分区依赖于多个 RDD 的分区时,这些 RDD 就属于不同的阶段。
上图中,stage 示意一个能够顺滑实现的阶段,就是能够单机运行。曲线示意 Shuffle。
如果 stage 可能复用后面的 stage 的话,那么会显示灰色。
Stage 的划分
Stage 的划分,简略的说是以宽依赖来划分:
对于窄依赖,partition 的转换解决在 stage 中实现计算,不划分 (将窄依赖尽量放在在同一个 stage 中,能够实现流水线计算)。
对于宽依赖,因为有 shuffle 的存在,只能在父 RDD 解决实现后,能力开始接下来的计算,也就是说须要要划分 stage。
Spark 会依据 shuffle/ 宽依赖应用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把以后的 RDD 退出到以后的 stage/ 阶段中。
Spark 会依据 RDD 之间的依赖关系将 DAG 图划分为不同的阶段,对于窄依赖,因为 partition 依赖关系的确定性,partition 的转换解决就能够在同一个线程里实现,窄依赖就被 spark 划分到同一个 stage 中,而对于宽依赖,只能等父 RDD shuffle 解决实现后,下一个 stage 能力开始接下来的计算。
至于什么是窄依赖和宽依赖,上面马上就会提及。
窄依赖 & 宽依赖
- 窄依赖
父 RDD 的一个分区只会被子 RDD 的一个分区依赖。比方:map/filter 和 union,这种依赖称之为窄依赖。
窄依赖的多个分区能够并行计算,并且窄依赖的一个分区的数据如果失落只须要从新计算对应的分区的数据就能够了。
- 宽依赖
指子 RDD 的分区依赖于父 RDD 的所有分区,这是因为 shuffle 类操作,称之为宽依赖。
对于宽依赖,必须等到上一阶段计算实现能力计算下一阶段。
Shuffle
在 Spark 中,shuffle 是指在不同阶段之间重新分配数据的过程。它通常产生在须要对数据进行聚合或分组操作的时候,例如 reduceByKey 或 groupByKey 等操作。
在 shuffle 过程中,Spark 会将数据依照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就能够独立地解决属于它本人分区的数据。
RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最根本的数据抽象,它代表一个不可变、可分区、外面的元素可并行计算的汇合。
RDD 的 Partition 是指数据集的分区。它是数据集中元素的汇合,这些元素被分区到集群的节点上,能够并行操作。对于 RDD 来说,每个分片都会被一个计算工作解决,并决定并行计算的粒度。用户能够在创立 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采纳默认值。默认值就是程序所调配到的 CPU Core 的数目。
一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会造成相似于流水线一样的前后依赖关系。在局部分区数据失落时,Spark 能够通过这个依赖关系从新计算失落的分区数据,而不是对 RDD 的所有分区进行从新计算。
DAG
有向无环图,其实说白了就是 RDD 之间的依赖关系图。
- 开始:通过 SparkContext 创立的 RDD;
- 完结 :触发 Action,一旦触发 Action 就造成了一个残缺的 DAG( 有几个 Action,就有几个 DAG)。
Spark 执行流程
Spark 的执行流程大抵如下:
- 构建 Spark Application 的运行环境(启动 SparkContext),SparkContext 向资源管理器(能够是 Standalone、Mesos 或 YARN)注册并申请运行 Executor 资源。
- 资源管理器为 Executor 分配资源并启动 Executor 过程,Executor 运行状况将随着“心跳”发送到资源管理器上。
- SparkContext 构建 DAG 图,将 DAG 图分解成多个 Stage,并把每个 Stage 的 TaskSet(工作集)发送给 Task Scheduler (任务调度器)。
- Executor 向 SparkContext 申请 Task,Task Scheduler 将 Task 发放给 Executor,同时,SparkContext 将利用程序代码发放给 Executor。
- Task 在 Executor 上运行,把执行后果反馈给 Task Scheduler,而后再反馈给 DAG Scheduler。
- 当一个阶段实现后,Spark 会依据数据依赖关系将后果传输给下一个阶段,并开始执行下一个阶段的工作。
- 最初,当所有阶段都实现后,Spark 会将最终后果返回给驱动程序,并实现作业的执行。
Spark 运行模式
Spark 反对多种运行模式,包含本地模式、独立模式、Mesos 模式、YARN 模式和 Kubernetes 模式。
- 本地模式:在本地模式下,Spark 应用程序会在单个机器上运行,不须要连贯到集群。这种模式实用于开发和测试,但不适用于生产环境。
- 独立模式:在独立模式下,Spark 应用程序会连贯到一个独立的 Spark 集群,并在集群中运行。这种模式实用于小型集群,但不反对动静资源分配。
- Mesos 模式:在 Mesos 模式下,Spark 应用程序会连贯到一个 Apache Mesos 集群,并在集群中运行。这种模式反对动静资源分配和细粒度资源共享,目前国内应用较少。
- YARN 模式:在 YARN 模式下,Spark 应用程序会连贯到一个 Apache Hadoop YARN 集群,并在集群中运行。这种模式反对动静资源分配和与其余 Hadoop 生态系统组件的集成,Spark 在 Yarn 模式下是不须要 Master 和 Worker 的。
- Kubernetes 模式:在 Kubernetes 模式下,Spark 应用程序会连贯到一个 Kubernetes 集群,并在集群中运行。这种模式反对动静资源分配和容器化部署。
RDD
RDD 的概念在 Spark 中非常重要,下面只是简略的介绍了一下,上面具体的对 RDD 开展介绍。
RDD 是“Resilient Distributed Dataset”的缩写,从全称就能够理解到 RDD 的一些典型个性:
- Resilient(弹性):RDD 之间会造成有向无环图(DAG),如果 RDD 失落了或者生效了,能够从父 RDD 从新计算失去。即容错性。
- Distributed(分布式):RDD 的数据是以逻辑分区的模式散布在集群的不同节点的。
- Dataset(数据集):即 RDD 存储的数据记录,能够从内部数据生成 RDD,例如 Json 文件,CSV 文件,文本文件,数据库等。
RDD 外面的数据集会被逻辑分成若干个分区,这些分区是散布在集群的不同节点的,基于这样的个性,RDD 能力在集群不同节点并行计算。
RDD 个性
- 内存计算
Spark RDD 运算数据是在内存中进行的,在内存足够的状况下,不会把两头后果存储在磁盘,所以计算速度十分高效。
- 惰性求值
所有的转换操作都是惰性的,也就是说不会立刻执行工作,只是把对数据的转换操作记录下来而已。只有碰到 action 操作才会被真正的执行。
- 容错性
Spark RDD 具备容错个性,在 RDD 生效或者数据失落的时候,能够依据 DAG 从父 RDD 从新把数据集计算出来,以达到数据容错的成果。
- 不变性
RDD 是过程平安的,因为 RDD 是不可批改的。它能够在任何工夫点被创立和查问,使得缓存,共享,备份都非常简单。在计算过程中,是 RDD 的不可批改个性保障了数据的一致性。
- 长久化
能够调用 cache 或者 persist 函数,把 RDD 缓存在内存、磁盘,下次应用的时候不须要从新计算而是间接应用。
RDD 操作
RDD 反对两种操作:
- 转换操作(Transformation)
- 口头操作(Actions)
转换操作(Transformation)
转换操作以 RDD 做为输出参数,而后输入一个或者多个 RDD。转换操作不会批改输出 RDD。Map()
、Filter()
这些都属于转换操作。
转换操作是惰性求值操作,只有在碰到口头操作(Actions)的时候,转换操作才会真正履行。转换操作分两种:窄依赖 和宽依赖(上文提到过)。
上面是一些常见的转换操作:
转换操作 | 形容 |
---|---|
map | 将函数利用于 RDD 中的每个元素,并返回一个新的 RDD |
filter | 返回一个新的 RDD,其中蕴含满足给定谓词的元素 |
flatMap | 将函数利用于 RDD 中的每个元素,并将返回的迭代器展平为一个新的 RDD |
union | 返回一个新的 RDD,其中蕴含两个 RDD 的元素 |
distinct | 返回一个新的 RDD,其中蕴含原始 RDD 中不同的元素 |
groupByKey | 将键值对 RDD 中具备雷同键的元素分组到一起,并返回一个新的 RDD |
reduceByKey | 将键值对 RDD 中具备雷同键的元素聚合到一起,并返回一个新的 RDD |
sortByKey | 返回一个新的键值对 RDD,其中元素依照键排序 |
口头操作(Action)
Action 是数据执行局部,其通过执行 count,reduce,collect 等办法真正执行数据的计算局部。
Action 操作 | 形容 |
---|---|
reduce | 通过函数聚合 RDD 中的所有元素 |
collect | 将 RDD 中的所有元素返回到驱动程序 |
count | 返回 RDD 中的元素个数 |
first | 返回 RDD 中的第一个元素 |
take | 返回 RDD 中的前 n 个元素 |
takeOrdered | 返回 RDD 中的前 n 个元素,依照天然程序或指定的程序排序 |
saveAsTextFile | 将 RDD 中的元素保留到文本文件中 |
foreach | 将函数利用于 RDD 中的每个元素 |
RDD 的创立形式
创立 RDD 有 3 种不同形式:
- 从内部存储系统
- 从其余 RDD
- 由一个曾经存在的 Scala 汇合创立
从内部存储系统
由内部存储系统的数据集创立,包含本地的文件系统,还有所有 Hadoop
反对的数据集,比方 HDFS、Cassandra、HBase
等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
从其余 RDD
通过已有的 RDD 通过算子转换生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))
由一个曾经存在的 Scala 汇合创立
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
其实makeRDD
办法底层调用了 parallelize
办法:
RDD 缓存机制
RDD 缓存是在内存存储 RDD 计算结果的一种优化技术。把两头后果缓存起来以便在须要的时候重复使用,这样能力无效加重计算压力,晋升运算性能。
要长久化一个 RDD,只有调用其 cache()
或者 persist()
办法即可。在该 RDD 第一次被计算出来时,就会间接缓存在每个节点中。而且 Spark 的长久化机制还是主动容错的,如果长久化的 RDD 的任何 partition 失落了,那么 Spark 会主动通过其源 RDD,应用 transformation 操作从新计算该 partition。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache // 缓存 / 长久化
rdd2.sortBy(_._2,false).collect// 触发 action, 会去读取 HDFS 的文件,rdd2 会真正执行长久化
rdd2.sortBy(_._2,false).collect// 触发 action, 会去读缓存中的数据, 执行速度会比之前快, 因为 rdd2 曾经长久化到内存中了
须要留神的是,在触发 action 的时候,才会去执行长久化。
cache()
和 persist()
的区别在于,cache()
是 persist()
的一种简化形式,cache()
的底层就是调用的 persist()
的无参版本,就是调用 persist(MEMORY_ONLY)
,将数据长久化到内存中。如果须要从内存中去除缓存,那么能够应用unpersist()
办法。
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()
存储级别
RDD 存储级别次要有以下几种。
级别 | 应用空间 | CPU 工夫 | 是否在内存中 | 是否在磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | 应用未序列化的 Java 对象格局,将数据保留在内存中。如果内存不够寄存所有的数据,则数据可能就不会进行长久化。 |
MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存 2 份 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 根本含意同 MEMORY_ONLY。惟一的区别是,会将 RDD 中的数据进行序列化。这种形式更加节俭内存 |
MEMORY_ONLY_SER_2 | 低 | 高 | 是 | 否 | 数据序列化,数据存 2 份 |
MEMORY_AND_DISK | 高 | 中等 | 局部 | 局部 | 如果数据在内存中放不下,则溢写到磁盘 |
MEMORY_AND_DISK_2 | 高 | 中等 | 局部 | 局部 | 数据存 2 份 |
MEMORY_AND_DISK_SER | 低 | 高 | 局部 | 局部 | 根本含意同 MEMORY_AND_DISK。惟一的区别是,会将 RDD 中的数据进行序列化 |
MEMORY_AND_DISK_SER_2 | 低 | 高 | 局部 | 局部 | 数据存 2 份 |
DISK_ONLY | 低 | 高 | 否 | 是 | 应用未序列化的 Java 对象格局,将数据全副写入磁盘文件中。 |
DISK_ONLY_2 | 低 | 高 | 否 | 是 | 数据存 2 份 |
OFF_HEAP | 这个目前是试验型选项,相似 MEMORY_ONLY_SER,然而数据是存储在堆外内存的。 |
对于上述任意一种长久化策略,如果加上后缀_2,代表的是将每个长久化的数据,都复制一份正本,并将正本保留到其余节点上。这种基于正本的长久化机制次要用于进行容错。如果某个节点挂掉了,节点的内存或磁盘中的长久化数据失落了,那么后续对 RDD 计算时还能够应用该数据在其余节点上的正本。如果没有正本的话,就只能将这些数据从源头处从新计算一遍了。
RDD 的血缘关系
血缘关系是指 RDD 之间的依赖关系。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD,并记录这两个 RDD 之间的依赖关系。这种依赖关系就是血缘关系。
血缘关系能够帮忙 Spark 在产生故障时复原数据。当一个分区失落时,Spark 能够依据血缘关系从新计算失落的分区,而不须要从头开始从新计算整个 RDD。
血缘关系还能够帮忙 Spark 优化计算过程。Spark 能够依据血缘关系合并多个间断的窄依赖转换,缩小数据传输和通信开销。
咱们能够执行 toDebugString
打印 RDD 的依赖关系。
上面是一个简略的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
println(filteredData.toDebugString)
在这个例子中,咱们首先创立了一个蕴含 5 个元素的 RDD,并对它执行了两个转换操作:map
和 filter
。而后,咱们应用 toDebugString
办法打印了最终 RDD 的血缘关系。
运行这段代码后,你会看到相似上面的输入:
(2) MapPartitionsRDD[2] at filter at <console>:26 []
| MapPartitionsRDD[1] at map at <console>:24 []
| ParallelCollectionRDD[0] at parallelize at <console>:22 []
这个输入示意最终的 RDD 是通过两个转换操作(map
和 filter
)从原始的 ParallelCollectionRDD
转换而来的。
CheckPoint
CheckPoint 能够将 RDD 从其依赖关系中抽出来,保留到牢靠的存储系统(例如 HDFS,S3 等),即它能够将数据和元数据保留到查看指向目录中。因而,在程序产生解体的时候,Spark 能够复原此数据,并从进行的任何中央开始。
CheckPoint 分为两类:
- 高可用 CheckPoint:容错性优先。这种类型的检查点可确保数据永恒存储,如存储在 HDFS 或其余分布式文件系统上。这也意味着数据通常会在网络中复制,这会升高检查点的运行速度。
- 本地 CheckPoint:性能优先。RDD 长久保留到执行程序中的本地文件系统。因而,数据写得更快,但本地文件系统也不是齐全牢靠的,一旦数据失落,工作将无奈复原。
开发人员能够应用 RDD.checkpoint()
办法来设置检查点。在应用检查点之前,必须应用 SparkContext.setCheckpointDir(directory: String)
办法设置检查点目录。
上面是一个简略的例子:
import org.apache.spark.{SparkConf, SparkContext}
object CheckpointExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")
val sc = new SparkContext(conf)
// 设置 checkpoint 目录
sc.setCheckpointDir("/tmp/checkpoint")
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
// 对 RDD 进行 checkpoint
filteredData.checkpoint()
// 触发 checkpoint
filteredData.count()}
}
RDD 的检查点机制就好比 Hadoop 将两头计算值存储到磁盘,即便计算中呈现了故障,咱们也能够轻松地从中复原。通过对 RDD 启动检查点机制能够实现容错和高可用。
Persist 与 CheckPoint 的区别
- 地位:Persist 和 Cache 只能保留在本地的磁盘和内存中(或者堆外内存–试验中),而 Checkpoint 能够保留数据到 HDFS 这类牢靠的存储上。
- 生命周期:Cache 和 Persist 的 RDD 会在程序完结后会被革除或者手动调用 unpersist 办法,而 Checkpoint 的 RDD 在程序完结后仍然存在,不会被删除。CheckPoint 将 RDD 长久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是始终存在的,也就是说能够被下一个 driver 应用,而 Persist 不能被其余 dirver 应用。
Spark-Submit
具体参数阐明
参数名 | 参数阐明 |
---|---|
—master | master 的地址,提交工作到哪里执行,例如 spark://host:port, yarn, local。具体指可参考上面对于 Master_URL 的列表 |
—deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
—class | 应用程序的主类,仅针对 java 或 scala 利用 |
—name | 应用程序的名称 |
—jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将蕴含在 driver 和 executor 的 classpath 下 |
—packages | 蕴含在 driver 和 executor 的 classpath 中的 jar 的 maven 坐标 |
—exclude-packages | 为了防止抵触 而指定不蕴含的 package |
—repositories | 近程 repository |
—conf PROP=VALUE | 指定 spark 配置属性的值,例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m” |
—properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
—driver-memory | Driver 内存,默认 1G |
—driver-java-options | 传给 driver 的额定的 Java 选项 |
—driver-library-path | 传给 driver 的额定的库门路 |
—driver-class-path | 传给 driver 的额定的类门路 |
—driver-cores | Driver 的核数,默认是 1。在 yarn 或者 standalone 下应用 |
—executor-memory | 每个 executor 的内存,默认是 1G |
—total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下应用 |
—num-executors | 启动的 executor 数量。默认为 2。在 yarn 下应用 |
—executor-core | 每个 executor 的核数。在 yarn 或者 standalone 下应用 |
Master_URL 的值
Master URL | 含意 |
---|---|
local | 应用 1 个 worker 线程在本地运行 Spark 应用程序 |
local[K] | 应用 K 个 worker 线程在本地运行 Spark 应用程序 |
local | 应用所有残余 worker 线程在本地运行 Spark 应用程序 |
spark://HOST:PORT | 连贯到 Spark Standalone 集群,以便在该集群上运行 Spark 应用程序 |
mesos://HOST:PORT | 连贯到 Mesos 集群,以便在该集群上运行 Spark 应用程序 |
yarn-client | 以 client 形式连贯到 YARN 集群,集群的定位由环境变量 HADOOP_CONF_DIR 定义,该形式 driver 在 client 运行。 |
yarn-cluster | 以 cluster 形式连贯到 YARN 集群,集群的定位由环境变量 HADOOP_CONF_DIR 定义,该形式 driver 也在集群中运行。 |
Spark 共享变量
个别状况下,当一个传递给 Spark 操作 (例如 map 和 reduce) 的函数在近程节点下面运行时,Spark 操作实际上操作的是这个函数所用变量的一个独立正本。这些变量被复制到每台机器上,并且这些变量在近程机器上的所有更新都不会传递回驱动程序 。通常跨工作的读写变量是低效的,所以,Spark 提供了两种共享变量: 播送变量(broadcast variable)和 累加器(accumulator)。
播送变量
播送变量 容许程序员缓存一个只读的变量在每台机器下面,而不是每个工作保留一份拷贝。说白了其实就是共享变量。
如果 Executor 端用到了 Driver 的变量,如果不应用播送变量在 Executor 有多少 task 就有多少 Driver 端的变量正本。如果应用播送变量在每个 Executor 中只有一份 Driver 端的变量正本。
一个播送变量能够通过调用 SparkContext.broadcast(v)
办法从一个初始变量 v 中创立。播送变量是 v 的一个包装变量,它的值能够通过 value 办法拜访,上面的代码阐明了这个过程:
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
// 创立一个播送变量
val factor = sc.broadcast(2)
// 应用播送变量
val result = data.map(x => x * factor.value)
result.collect().foreach(println)
}
}
播送变量创立当前,咱们就可能在集群的任何函数中应用它来代替变量 v,这样咱们就不须要再次传递变量 v 到每个节点上。另外,为了保障所有的节点失去播送变量具备雷同的值,对象 v 不能在播送之后被批改。
累加器
累加器是一种只能通过关联操作进行“加”操作的变量,因而它可能高效的利用于并行操作中。它们可能用来实现 counters 和 sums。
一个累加器能够通过调用 SparkContext.accumulator(v)
办法从一个初始变量 v 中创立。运行在集群上的工作能够通过 add 办法或者应用 +=
操作来给它加值。然而,它们无奈读取这个值。只有驱动程序能够应用 value 办法来读取累加器的值。
示例代码如下:
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorExample {def main(args: Array[String]) {val conf = new SparkConf().setAppName("AccumulatorExample")
val sc = new SparkContext(conf)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value) // 输入 10
}
}
这个示例中,咱们创立了一个名为 My Accumulator
的累加器,并应用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
来对其进行累加。最初,咱们应用 println(accum.value)
来输入累加器的值,后果为 10
。
咱们能够利用子类 AccumulatorParam 创立本人的累加器类型。AccumulatorParam 接口有两个办法:zero 办法为你的数据类型提供一个“0 值”(zero value);addInPlace 办法计算两个值的和。例如,假如咱们有一个 Vector 类代表数学上的向量,咱们可能如下定义累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {def zero(initialValue: Vector): Vector = {Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {v1 += v2}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
Spark SQL
Spark 为结构化数据处理引入了一个称为 Spark SQL 的编程模块。它提供了一个称为 DataFrame 的编程形象,并且能够充当分布式 SQL 查问引擎。
Spark SQL 的个性
- 集成
无缝地将 SQL 查问与 Spark 程序混合。Spark SQL 容许将结构化数据作为 Spark 中的分布式数据集 (RDD) 进行查问,在 Python,Scala 和 Java 中集成了 API。这种严密的集成使得能够轻松地运行 SQL 查问以及简单的剖析算法。
- Hive 兼容性
在现有仓库上运行未修改的 Hive 查问。Spark SQL 重用了 Hive 前端和 MetaStore,提供与现有 Hive 数据,查问和 UDF 的齐全兼容性。只需将其与 Hive 一起装置即可。
- 规范连贯
通过 JDBC 或 ODBC 连贯。Spark SQL 包含具备行业标准 JDBC 和 ODBC 连贯的服务器模式。
- 可扩展性
对于交互式查问和长查问应用雷同的引擎。Spark SQL 利用 RDD 模型来反对中查问容错,使其可能扩大到大型作业。不要放心为历史数据应用不同的引擎。
Spark SQL 数据类型
Spark SQL 反对多种数据类型,包含数字类型、字符串类型、二进制类型、布尔类型、日期工夫类型和区间类型等。
数字类型包含:
ByteType
:代表一个字节的整数,范畴是 -128 到 127¹²。ShortType
:代表两个字节的整数,范畴是 -32768 到 32767¹²。IntegerType
:代表四个字节的整数,范畴是 -2147483648 到 2147483647¹²。LongType
:代表八个字节的整数,范畴是 -9223372036854775808 到 9223372036854775807¹²。FloatType
:代表四字节的单精度浮点数¹²。DoubleType
:代表八字节的双精度浮点数¹²。DecimalType
:代表任意精度的十进制数据,通过外部的 java.math.BigDecimal 反对。BigDecimal 由一个任意精度的整型非标度值和一个 32 位整数组成¹²。
字符串类型包含:
StringType
:代表字符字符串值。
二进制类型包含:
BinaryType
:代表字节序列值。
布尔类型包含:
BooleanType
:代表布尔值。
日期工夫类型包含:
TimestampType
:代表蕴含字段年、月、日、时、分、秒的值,与会话本地时区相干。工夫戳值示意相对工夫点。DateType
:代表蕴含字段年、月和日的值,不带时区。
区间类型包含:
YearMonthIntervalType (startField, endField)
:示意由以下字段组成的间断子集组成的年月距离:MONTH(月份),YEAR(年份)。DayTimeIntervalType (startField, endField)
:示意由以下字段组成的间断子集组成的日工夫距离:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。
复合类型包含:
ArrayType (elementType, containsNull)
:代表由 elementType 类型元素组成的序列值。containsNull 用来指明 ArrayType 中的值是否有 null 值。MapType (keyType, valueType, valueContainsNull)
:示意包含一组键值对的值。通过 keyType 示意 key 数据的类型,通过 valueType 示意 value 数据的类型。valueContainsNull 用来指明 MapType 中的值是否有 null 值。StructType (fields)
:示意一个领有 StructFields (fields) 序列构造的值。StructField (name, dataType, nullable)
:代表 StructType 中的一个字段,字段的名字通过 name 指定,dataType 指定 field 的数据类型,nullable 示意字段的值是否有 null 值。
DataFrame
DataFrame 是 Spark 中用于解决结构化数据的一种数据结构。它相似于关系数据库中的表,具备行和列。每一列都有一个名称和一个类型,每一行都是一条记录。
DataFrame 反对多种数据源,包含结构化数据文件、Hive 表、内部数据库和现有的 RDD。它提供了丰盛的操作,包含筛选、聚合、分组、排序等。
DataFrame 的长处在于它提供了一种高级的形象,使得用户能够应用相似于 SQL 的语言进行数据处理,而无需关怀底层的实现细节。此外,Spark 会主动对 DataFrame 进行优化,以进步查问性能。
上面是一个应用 DataFrame 的代码例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._
val data = Seq(("Alice", 25),
("Bob", 30),
("Charlie", 35)
)
val df = data.toDF("name", "age")
df.show()
在这个示例中,咱们首先创立了一个 SparkSession
对象,而后应用 toDF
办法将一个序列转换为 DataFrame。最初,咱们应用 show
办法来显示 DataFrame 的内容。
创立 DataFrame
在 Scala 中,能够通过以下几种形式创立 DataFrame:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()
- 从内部数据源读取。例如,从 JSON 文件中读取数据并创立 DataFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val df = spark.read.json("path/to/json/file")
df.show()
- 通过编程形式创立。例如,应用
createDataFrame
办法:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val schema = StructType(
List(StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
)
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)
df.show()
DSL & SQL
在 Spark 中,能够应用两种形式对 DataFrame 进行查问:DSL(Domain-Specific Language)和 SQL。
DSL 是一种特定畛域语言,它提供了一组用于操作 DataFrame 的办法。例如,上面是一个应用 DSL 进行查问的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.select("name", "age")
.filter($"age" > 25)
.show()
SQL 是一种结构化查询语言,它用于治理关系数据库系统。在 Spark 中,能够应用 SQL 对 DataFrame 进行查问。例如,上面是一个应用 SQL 进行查问的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 25").show()
DSL 和 SQL 的区别在于语法和格调。DSL 应用办法调用链来构建查问,而 SQL 应用申明式语言来形容查问。抉择哪种形式取决于集体爱好和应用场景。
Spark SQL 数据源
Spark SQL 反对多种数据源,包含 Parquet、JSON、CSV、JDBC、Hive 等。
上面是示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://host:port/database")
.option("dbtable", "table")
.option("user", "username")
.option("password", "password")
.load()
df.show()
load & save
在 Spark 中,load
函数用于从内部数据源读取数据并创立 DataFrame,而 save
函数用于将 DataFrame 保留到内部数据源。
上面是从 Parquet 文件中读取数据并创立 DataFrame 的示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
val df = spark.read.load("path/to/parquet/file")
df.show()
上面是将 DataFrame 保留到 Parquet 文件的示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._
val df = Seq(("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.write.save("path/to/parquet/file")
函数
Spark SQL 提供了丰盛的内置函数,包含数学函数、字符串函数、日期工夫函数、聚合函数等。你能够在 Spark SQL 的官网文档中查看所有可用的内置函数。
此外,Spark SQL 还反对自定义函数(User-Defined Function,UDF),能够让用户编写本人的函数并在查问中应用。
上面是一个应用 SQL 语法编写自定义函数的示例代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._
val df = Seq(("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
val square = udf((x: Int) => x * x)
spark.udf.register("square", square)
spark.sql("SELECT name, square(age) FROM people").show()
在这个示例中,咱们首先定义了一个名为 square
的自定义函数,它承受一个整数参数并返回它的平方。而后,咱们应用 createOrReplaceTempView
办法创立一个长期视图,并应用 udf.register
办法注册自定义函数。最初,咱们应用 spark.sql
办法执行 SQL 查问,并在查问中调用自定义函数。
DataSet
DataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查问优化能力。
创立 DataSet
在 Scala 中,能够通过以下几种形式创立 DataSet:
- 从现有的 RDD 转换而来。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()
- 从内部数据源读取。例如,从 JSON 文件中读取数据并创立 DataSet:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Long)
val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()
- 通过编程形式创立。例如,应用
createDataset
办法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()
DataSet 和 DataFrame 区别
DataSet 和 DataFrame 都是 Spark 中用于解决结构化数据的数据结构。它们都提供了丰盛的操作,包含筛选、聚合、分组、排序等。
它们之间的次要区别在于类型安全性。DataFrame 是一种弱类型的数据结构,它的列只有在运行时能力确定类型。这意味着,在编译时无奈检测到类型谬误,只有在运行时才会抛出异样。
而 DataSet 是一种强类型的数据结构,它的类型在编译时就曾经确定。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行谬误的类型转换,编译器就会报错。
此外,DataSet 还提供了一些额定的操作,例如 map、flatMap、reduce 等。
RDD & DataFrame & Dataset 转化
RDD、DataFrame、Dataset 三者有许多共性,有各自实用的场景经常须要在三者之间转换。
- DataFrame/Dataset 转 RDD
val rdd1=testDF.rdd
val rdd2=testDS.rdd
- RDD 转 DataFrame
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable // 定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
能够留神到,定义每一行的类型(case class)时,曾经给出了字段名和类型,前面只有往 case class 外面增加值即可。
- Dataset 转 DataFrame
import spark.implicits._
val testDF = testDS.toDF
- DataFrame 转 Dataset
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable // 定义字段名和类型
val testDS = testDF.as[Coltest]
这种办法就是在给出每一列的类型后,应用 as
办法,转成 Dataset,这在数据类型在 DataFrame 须要针对各个字段解决时极为不便。
留神 :
在应用一些非凡的操作时,肯定要加上 import spark.implicits._
不然 toDF
、toDS
无奈应用。
Spark Streaming
Spark Streaming 的工作原理是将实时数据流拆分为小批量数据,并应用 Spark 引擎对这些小批量数据进行解决。这种微批处理(Micro-Batch Processing)的形式使得 Spark Streaming 可能以近乎实时的提早解决大规模的数据流。
上面是一个简略的 Spark Streaming 示例代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
咱们首先创立了一个 StreamingContext
对象,并指定了批处理距离为 1 秒。而后,咱们应用 socketTextStream
办法从套接字源创立了一个 DStream。接下来,咱们对 DStream 进行了一系列操作,包含 flatMap、map 和 reduceByKey。最初,咱们应用 print
办法打印出单词计数的后果。
Spark Streaming 优缺点
Spark Streaming 作为一种实时流解决框架,具备以下长处:
- 高性能:Spark Streaming 基于 Spark 引擎,可能疾速解决大规模的数据流。
- 易用性:Spark Streaming 提供了丰盛的 API,能够让开发人员疾速构建实时流解决利用。
- 容错性:Spark Streaming 具备良好的容错性,可能在节点故障时主动复原。
- 集成性:Spark Streaming 可能与 Spark 生态系统中的其余组件(如 Spark SQL、MLlib 等)无缝集成。
然而,Spark Streaming 也有一些毛病:
- 提早:因为 Spark Streaming 基于微批处理模型,因而它的提早绝对较高。对于须要极低提早的利用场景,Spark Streaming 可能不是最佳抉择。
- 复杂性:Spark Streaming 的配置和调优绝对简单,须要肯定的教训和技能。
DStream
DStream(离散化流)是 Spark Streaming 中用于示意实时数据流的一种形象。它由一系列间断的 RDD 组成,每个 RDD 蕴含一段时间内收集到的数据。
在 Spark Streaming 中,能够通过以下几种形式创立 DStream:
- 从输出源创立。例如,从套接字源创立 DStream:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
- 通过转换操作创立。例如,对现有的 DStream 进行 map 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()
ssc.start()
ssc.awaitTermination()
- 通过连贯操作创立。例如,对两个 DStream 进行 union 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()
ssc.start()
ssc.awaitTermination()
总结: 简略来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说实质上都能够了解成 RDD。
窗口函数
在 Spark Streaming 中,窗口函数用于对 DStream 中的数据进行窗口化解决。它容许你对一段时间内的数据进行聚合操作。
Spark Streaming 提供了多种窗口函数,包含:
window
:返回一个新的 DStream,它蕴含了原始 DStream 中指定窗口大小和滑动距离的数据。countByWindow
:返回一个新的单元素 DStream,它蕴含了原始 DStream 中指定窗口大小和滑动距离的元素个数。reduceByWindow
:返回一个新的 DStream,它蕴含了原始 DStream 中指定窗口大小和滑动距离的元素通过 reduce 函数解决后的后果。reduceByKeyAndWindow
:相似于reduceByWindow
,然而在进行 reduce 操作之前会先依照 key 进行分组。
上面是一个应用窗口函数的示例代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
在这个示例中,咱们首先创立了一个 DStream,并对其进行了一系列转换操作。而后,咱们应用 reduceByKeyAndWindow
函数对 DStream 进行窗口化解决,指定了窗口大小为 30 秒,滑动距离为 10 秒。最初,咱们应用 print
办法打印出单词计数的后果。
输入操作
Spark Streaming 容许 DStream 的数据输入到内部零碎,如数据库或文件系统,输入的数据能够被内部零碎所应用,该操作相似于 RDD 的输入操作。Spark Streaming 反对以下输入操作:
print()
: 打印 DStream 中每个 RDD 的前 10 个元素到控制台。saveAsTextFiles(prefix, [suffix])
: 将此 DStream 中每个 RDD 的所有元素以文本文件的模式保留。每个批次的数据都会保留在一个独自的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。saveAsObjectFiles(prefix, [suffix])
: 将此 DStream 中每个 RDD 的所有元素以 Java 对象序列化的模式保留。每个批次的数据都会保留在一个独自的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。saveAsHadoopFiles(prefix, [suffix])
: 将此 DStream 中每个 RDD 的所有元素以 Hadoop 文件(SequenceFile 等)的模式保留。每个批次的数据都会保留在一个独自的目录中,目录名为:prefix-TIME_IN_MS[.suffix]
。foreachRDD(func)
: 最通用的输入操作,将函数 func 利用于 DStream 中生成的每个 RDD。通过此函数,能够将数据写入任何反对写入操作的数据源。
Structured Streaming
Structured Streaming 是 Spark 2.0 版本中引入的一种新的流解决引擎。它基于 Spark SQL 引擎,提供了一种申明式的 API 来解决结构化数据流。
与 Spark Streaming 相比,Structured Streaming 具备以下长处:
- 易用性:Structured Streaming 提供了与 Spark SQL 雷同的 API,能够让开发人员疾速构建流解决利用。
- 高性能:Structured Streaming 基于 Spark SQL 引擎,可能疾速解决大规模的数据流。
- 容错性:Structured Streaming 具备良好的容错性,可能在节点故障时主动复原。
- 端到端一致性:Structured Streaming 提供了端到端一致性保障,可能确保数据不失落、不反复。
上面是一个简略的 Structured Streaming 示例代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
在这个示例中,咱们首先创立了一个 SparkSession
对象。而后,咱们应用 readStream
办法从套接字源创立了一个 DataFrame。接下来,咱们对 DataFrame 进行了一系列操作,包含 flatMap、groupBy 和 count。最初,咱们应用 writeStream
办法将后果输入到控制台。
Structured Streaming 同样反对 DSL 和 SQL 语法。
DSL 语法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
SQL 语法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
lines.createOrReplaceTempView("lines")
val wordCounts = spark.sql(
"""
|SELECT value, COUNT(*) as count
|FROM (| SELECT explode(split(value, ' ')) as value
| FROM lines
|)
|GROUP BY value
""".stripMargin)
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Source
Structured Streaming 反对多种输出源,包含文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。上面是一个应用 Scala 语言从 Kafka 中读取数据的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 订阅一个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Output
Structured Streaming 反对多种输入形式,包含控制台输入、内存输入、文件输入、数据源输入等。上面是将数据写入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 从 socket 中读取数据
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将数据写入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
output mode
每当后果表更新时,咱们都心愿将更改后的后果行写入内部接收器。
Output mode 指定了数据写入输入接收器的形式。Structured Streaming 反对以下三种 output mode:
Output Mode | 形容 |
---|---|
Append | 只将流 DataFrame/Dataset 中的新行写入接收器。 |
Complete | 每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器。 |
Update | 每当有更新时,只将流 DataFrame/Dataset 中更新的行写入接收器。 |
output sink
Output sink 指定了数据写入的地位。Structured Streaming 反对多种输入接收器,包含文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等。上面是一些应用 Scala 语言将数据写入到不同输入接收器中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 从 socket 中读取数据
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将数据写入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
// 将数据写入到 Kafka 中
//selectExpr 是一个 DataFrame 的转换操作,它容许你应用 SQL 表达式来抉择 DataFrame 中的列。//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 示意抉择 key 和 value 列,并将它们的类型转换为字符串类型。// 这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型。lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 将数据写入到控制台中
lines.writeStream
.format("console")
.start()
// 将数据写入到内存中
lines.writeStream
.format("memory")
.queryName("tableName")
.start()
PV,UV 统计
上面是用 Structured Streaming 实现 PV,UV 统计的例子,咱们来感触实战下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object PVUVExample {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
import spark.implicits._
// 假如咱们有一个蕴含用户 ID 和拜访的 URL 的输出流
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val data = lines.as[String].map(line => {val parts = line.split(",")
(parts(0), parts(1))
}).toDF("user", "url")
// 计算 PV
val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
val pvQuery = pv.writeStream.outputMode("complete").format("console").start()
// 计算 UV
val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
val uvQuery = uv.writeStream.outputMode("complete").format("console").start()
pvQuery.awaitTermination()
uvQuery.awaitTermination()}
}
这段代码演示了如何应用 Structured Streaming 对数据进行 PV 和 UV 统计。它首先从一个 socket 源读取数据,而后应用 groupBy
和count
对数据进行 PV 统计,最初应用 dropDuplicates
、groupBy
和count
对数据进行 UV 统计。
假如咱们在本地启动了一个 socket 服务器,并向其发送以下数据:
user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2
那么程序将输入以下后果:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| pv|
+--------------------+---+
|http://example.co...| 3|
|http://example.co...| 3|
+--------------------+---+
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| uv|
+--------------------+---+
|http://example.co...| 2|
|http://example.co...| 3|
+--------------------+---+
总结
总之,Spark 是一个弱小的大数据处理框架,它具备高性能、易用性和灵活性等长处。心愿本文可能帮忙你入门 Spark,并在理论利用中施展它的弱小性能。如果你想深刻学习 Spark,能够参考官网文档和相干书籍,也能够退出 Spark 社区,与其余开发人员交流经验。
本篇文章就到这里,感激浏览,如果本篇博客有任何谬误和倡议,欢送给我留言斧正。文章继续更新,能够关注公众号第一工夫浏览。