Spark-快速入门

5次阅读

共计 6319 个字符,预计需要花费 16 分钟才能阅读完成。

Spark

Spark 背景

什么是 Spark

官网:http://spark.apache.org

Spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校 AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,2014 年 2 月成为 Apache 顶级项目。目前,Spark 生态系统已经发展成为一个包含多个子项目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLlib 等子项目,Spark 是基于内存计算的大数据并行计算框架。Spark 基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将 Spark 部署在大量廉价硬件之上,形成集群。

Spark 与 Hadoop

Spark 是一个计算框架, 而 Hadoop 中包含计算框架 MapReduce 和分布式文件系统 HDFS,Hadoop 更广泛地说还包括在其生态系统上的其他系统.

为什么使用 Spark?

Hadoop 的 MapReduce 计算模型存在问题:
Hadoop 的 MapReduce 的核心是 Shuffle(洗牌). 在整个 Shuffle 的过程中, 至少产生 6 次 I / O 流. 基于 MapReduce 计算引擎通常会将结果输出到次盘上, 进行存储和容错. 另外, 当一些查询 (如:hive) 翻译到 MapReduce 任务是, 往往会产生多个 Stage, 而这些 Stage 有依赖底层文件系统来存储每一个 Stage 的输出结果, 而 I / O 的效率往往较低, 从而影响 MapReduce 的运行速度.

Spark 的特点

与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流。

易用

Spark 支持 Java、Python 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 shell,可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法。

通用

Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

兼容性

Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器. 并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 和 Cassandra 等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark 的使用门槛,使得所有人都可以非常容易地部署和使用 Spark。此外,Spark 还提供了在 EC2 上部 Standalone 的 Spark 集群的工具。

Spark 的生态系统

  • Spark Core:

实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统 交互等模块。Spark Core 中还包含了对弹性分布式数据集 (resilient distributed dataset,简称 RDD) 的 API 定义。

  • Spark Streaming:

Spark Streaming 基于微批量方式的计算和处理, 可以用于处理实时的流数据. 它使用 DStream, 简单来说是一个弹性分布式数据集 (RDD) 系列, 处理实时数据. 数据可以从 Kafka,Flume,Kinesis 或 TCP 套接字等众多来源获取, 并且可以使用由高级函数(如 map,reduce,join 和 window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。

  • Spark SQL

SPark SQL 可以通过 JDBC API 将 Spark 数据集暴露出去, 而且还可以用传统的 BI 和可视化工具在 Spark 数据上执行类似 SQL 的查询, 用户哈可以用 Spark SQL 对不同格式的数据 (如 Json, Parque 以及数据库等) 执行 ETl, 将其转化, 然后暴露特定的查询.

  • Spark MLlib

MLlib 是一个可扩展的 Spark 机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。

  • Spark Graphx:

GraphX 是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了 Spark RDD。为了支持图计算,GraphX 暴露了一个基础操作符集合(如 subgraph,joinVertices 和 aggregateMessages)和一个经过优化的 Pregel API 变体。此外,GraphX 还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。

  • 集群管理器:

Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计 算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器 (cluster manager) 上运行,包括 Hadoop YARN、Apache Mesos,以及 Spark 自带的一个简易调度 器,叫作独立调度器。

Spark 得到了众多大数据公司的支持,这些公司包括 Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的 Spark 已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用 GraphX 构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯 Spark 集群达到 8000 台的规模,是当前已知的世界上最大的 Spark 集群。

Spark 的用户和用途

我们大致把 Spark 的用例分为两类:数据科学应用和数据处理应用。也就对应的有两种人群:数据科学家和工程师。

数据科学任务

主要是数据分析领域,数据科学家要负责分析数据并建模,具备 SQL、统计、预测建模 (机器学习) 等方面的经验,以及一定的使用 Python、Matlab 或 R 语言进行编程的能力。

数据处理应用

工程师定义为使用 Spark 开发 生产环境中的数据处理应用的软件开发者,通过对接 Spark 的 API 实现对处理的处理和转换等任务。

Spark 架构中的基本组件:

  • Driver: 运行 Application 的 main() 函数并创建 SparkContext
  • Worker: 从节点, 负责控制计算节点, 启动 Ex 而粗投入或 Driver
  • SparkContext: 整个应用的上下文, 监控应用的生命周期
  • SparkConf:负责存储配置信息。
  • Executor: 执行器, 在 worker node 上执行任务组件, 用于启动线程执行任务. 每个 Application 拥有独立的一组 Executors
  • ClusterManager: 在 standlone 模式中即为 Master(主节点), 控制整个集群. 监控 Worker. 在 Yarn 模式中为资源管理器.
  • RDD: 弹性分布式集合,spark 的基本计算单元,一组 RDD 可形成执行的有向无环图 RDD Graph
  • DAG Scheduler: 根据作业 (Job) 构建基于 Stage 的 DAG, 并交给 Stage 给 TaskScheduler
  • TaskScheduler:将任务(Task)分发给 Executor 执行
  • SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。SparkEnv 内创建并包含如下一些重要组件的引用。
  • MapOutPutTracker:负责 Shuffle 元信息的存储。
  • BroadcastManager:负责广播变量的控制与元信息的存储。
  • BlockManager:负责存储管理、创建和查找块。
  • MetricsSystem:监控运行时性能指标信息。

Spark 的整体流程:client 提交应用,Master 找到一个 Worker 启动 Driver,Driver 向 Master 或者向资源管理器申请资源, 之后将应用转化为 RDD Graph,再由 DAGScheduler 将 RDD Graph 转化为 Stage 的有向无环图提交给 TaskScheduler,由 TaskScheduler 提交任务给 Executor 执行。在任务执行的过程中,其他组件协同工作,确保整个应用顺利执行。

搭建 Spark 集群

Spark 的部署模式有 Local、Local-Cluster、Standalone、Yarn、Mesos,我们选择最具代表性的 Standalone 集群部署模式。安装 java 环境,Spark 自动会把 scala SDK 打包到 Spark 中无需安装 scala 环境

环境

linux: CentOS-7.5_x64
hadoop: hadoop-3.2.0
spark: spark-2.3.3
zookeeper: zookeeper-3.4.10

机器规划

主机名 IP 安装软件 运行进程
node-1 192.168.91.11 spark Master
node-2 192.168.91.12 spark,zookeeper Worker,QuorumPeerMain
node-3 192.168.91.13 spark,zookeeper Worker,QuorumPeerMain
node-4 192.168.91.14 spark,zookeeper Worker,QuorumPeerMain

配置 Spark 环境


# 下载对应的 Spark 安装包
$ wget http://mirrors.hust.edu.cn/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz

# 解压缩
$ tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz

# 进入 spark 解压目录
$ cd $SPARK_HOME

# 修改 Spark 的环境配置文件
$ cp conf/spark-env.sh.template spark-env.sh
$ vim conf/spark-env.sh

# 添加如下配置
export JAVA_HOME=/usr/java/jdk1.8.0_191

# 修改 slave 的配置
$ cp $SPARK_HOME/conf/slaves.template slaves
$ vi slaves

# 在该文件中添加子节点所在的位置(Worker 节点)node-2
node-3
node-4

# 将配置好的 spark 复制到其他机器上(node-2,node-3,node-4)
$ scp -r spark-2.3.2-bin-hadoop2.7 root@node-2:/xxx/xxx

# 启动 spark 集群
$ sbin/start-master.sh
$ sbin/start-slaves.sh

# 也可以是用这个脚本启动所有机器
$ sbin/start-all.sh

启动后执行 jps 命令,主节点上有 Master 进程,其他子节点上有 Work 进行,登录 Spark 管理界面查看集群状态(主节点):http://node-1:8080/

Spark 集群 HA

机器规划

主机名 IP 安装软件 运行进程
node-1 192.168.91.11 spark Master
node-2 192.168.91.12 spark,zookeeper Master,QuorumPeerMain
node-3 192.168.91.13 spark,zookeeper Worker,QuorumPeerMain
node-4 192.168.91.14 spark,zookeeper Worker,QuorumPeerMain

1. 安装配置 zk 集群,并启动 zk 集群 zookeeper 安装

2. 修改 spark 的配置文件添加如下配置

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-2:2181,node-3:2181,node-4:2181 -Dspark.deploy.zookeeper.dir=/spark"

3. 修改所有节点的 slaves 文件改为(node-3,node-4)节点

4. 在 node1 上执行 sbin/start-all.sh,然后在 node-2 上启动第二个 Master(sbin/start-master.sh)

执行第一个 spark 程序

$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master-ip:7077 --executor-memory 1G --total-executor-cores 2 $SPARK_HOME/examples/jars/spark-examples_2.11-2.3.3.jar 100

spark Shell

spark-shell 是 Spark 自带的交互式 Shell 程序,方便用户进行交互式编程,用户可以在该命令行下用 scala 编写 spark 程序。

$SPARK_HOME/bin/spark-shell --master spark://node-1:7077 --executor-memory 2g --total-executor-cores 2

参数说明:

# 指定 Master 的地址
--master spark://node-1:7077

# 指定每个 worker 可用内存为 2G
--executor-memory 2g

# 指定整个集群使用的 cup 核数为 2 个
--total-executor-cores 2

注意

如果启动 spark shell 时没有指定 master 地址,但是也可以正常启动 spark shell 和执行 spark
shell 中的程序,其实是启动了 spark 的 local 模式,该模式仅在本机启动一个进程,没有与集群建立联系。
Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc。用户代码如果需要用到,则直接应用 sc 即可

spark shell 中编写 WordCount

在 spark shell 中用 scala 语言编写 spark 程序


# sc 是 SparkContext 对象,该对象时提交 spark 程序的入口
sc.textFile("file:///root/data/words.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).saveAsTextFile("file:///root/data/output1")

# 从本地文件系统中读取数据
textFile("file:///root/data/words.txt")

# 读取每一行数据并切分
flatMap(_.split(" "))

# 将数据切分映射将单词和 1 构成元组
map((_,1))

# 按照 key 进行 reduce,并将 value 累加
reduceByKey(_+_)

# 将结果写入到指定位置
saveAsTextFile("file:///root/data/output1")

正文完
 0