乐趣区

关于spark:第一篇Spark概览

Apache Spark 最后在 2009 年诞生于美国加州大学伯克利分校的 APM 实验室,并于 2010 年开源,现在是 Apache 软件基金会下的顶级开源我的项目之一。Spark 的指标是设计一种编程模型,可能疾速地进行数据分析。Spark 提供了内存计算,缩小了 IO 开销。另外 Spark 是基于 Scala 编写的,提供了交互式的编程体验。通过 10 年的倒退,Spark 成为了煊赫一时的大数据处理平台,目前最新的版本是 Spark3.0。本文次要是对 Spark 进行一个总体概览式的介绍, 后续内容会对具体的细节进行展开讨论。本文的次要内容包含:

  • [x] Spark 的关注度剖析
  • [x] Spark 的特点
  • [x] Spark 的一些重要概念
  • [x] Spark 组件概览
  • [x] Spark 运行架构概览
  • [x] Spark 编程初体验

Spark 的关注热度剖析

详情

下图展现了近 1 年内在国内对于 Spark、Hadoop 及 Flink 的搜寻趋势

近 1 年内寰球对于 Spark、Hadoop 及 Flink 的搜寻趋势,如下:

近 1 年国内对于 Spark、Hadoop 及 Flink 的搜寻热度区域散布状况(按 Flink 搜寻热度降序排列):

近 1 年寰球对于 Spark、Hadoop 及 Flink 的搜寻热度区域散布状况(按 Flink 搜寻热度降序排列):

剖析

从下面的 4 幅图能够看出,近一年无论是在国内还是寰球,对于 Spark 的搜寻热度始终是比 Hadoop 和 Flink 要高。近年来 Flink 倒退迅猛,其在国内有阿里的背书,Flink 人造的流解决特点使其成为了开发流式利用的首选框架。能够看出,尽管 Flink 在国内很火,然而放眼寰球,热度依然不迭 Spark。所以学习并把握 Spark 技术依然是一个不错的抉择,技术有很多的相似性,如果你曾经把握了 Spark,再去学习 Flink 的话,置信你会有种似曾相识的感觉。

Spark 的特点

  • 速度快

    Apache Spark 应用 DAG 调度程序、查问优化器和物理执行引擎,为批处理和流解决提供了高性能。

  • 易于应用

    反对应用 Java,Scala,Python,R 和 SQL 疾速编写应用程序。Spark 提供了 80 多个高级操作算子,可轻松构建并行应用程序。

  • 通用性

    Spark 提供了十分丰盛的生态栈,包含 SQL 查问、流式计算、机器学习和图计算等组件,这些组件能够无缝整合在一个利用中,通过一站部署,能够应答多种简单的计算场景

  • 运行模式多样

    Spark 能够应用 Standalone 模式运行,也能够运行在 Hadoop,Apache Mesos,Kubernetes 等环境中运行。并且能够拜访 HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive 等多种数据源中的数据。

Spark 的一些重要概念

  • RDD

    弹性分布式数据集(Resilient Distributed Dataset),是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型

  • DAG

    有向无环图(Directed Acyclic Graph), 反映 RDD 之间的依赖关系

  • Application

    用户编写的 Spark 程序,由 driver program 和 executors 组成

  • Application jar
    用户编写的应用程序 JAR 包
  • Driver program
    用程序 main()函数的过程,能够创立 SparkContext
  • Cluster manager
    集群管理器,属于一个内部服务,用于资源申请调配(如:standalone manager, Mesos, YARN)
  • Deploy mode

    部署模式,决定 Driver 过程在哪里运行。如果是 cluster 模式,会由框架自身在集群外部某台机器上启动 Driver 过程。如果是 client 模式,会在提交程序的机器上启动 Driver 过程

  • Worker node

    集群中运行应用程序的节点 Executor 运行在 Worknode 节点上的一个过程,负责运行具体的工作,并为应用程序存储数据

  • Task
    运行在 executor 中的工作单元
  • Job
    一个 job 蕴含多个 RDD 及一些列的运行在 RDD 之上的算子操作,job 须要通过 action 操作进行触发(比方 save、collect 等)
  • Stage
    每一个作业会被分成由一些列 task 组成的 stage,stage 之间会相互依赖

Spark 组件概览

Spark 生态系统次要包含 Spark Core、SparkSQL、SparkStreaming、MLlib 和 GraphX 等组件,具体如下图所示:

  • Spark Core

    Spark core 是 Spark 的外围,蕴含了 Spark 的基本功能,如内存计算、任务调度、部署模式、存储管理等。SparkCore 提供了基于 RDD 的 API 是其余高级 API 的根底,次要性能是实现批处理。

  • Spark SQL

    Spark SQL 次要是为了解决结构化和半结构化数据而设计的,SparkSQL 容许用户在 Spark 程序中应用 SQL、DataFrame 和 DataSetAPI 查问结构化数据,反对 Java、Scala、Python 和 R 语言。因为 DataFrame API 提供了对立的拜访各种数据源的形式 (包含 Hive、Avro、Parquet、ORC 和 JDBC),用户能够通过雷同的形式连贯任何数据源。另外,Spark SQL 能够应用 hive 的元数据,从而实现了与 Hive 的完满集成,用户能够将 Hive 的作业间接运行在 Spark 上。Spark SQL 能够通过spark-sql 的 shell 命令拜访。

  • SparkStreaming

    SparkStreaming 是 Spark 很重要的一个模块,可实现实时数据流的可伸缩,高吞吐量,容错流解决。在外部,其工作形式是将实时输出的数据流拆分为一系列的 micro batch,而后由 Spark 引擎进行解决。SparkStreaming 反对多种数据源,如 kafka、Flume 和 TCP 套接字等

  • MLlib

    MLlib 是 Spark 提供的一个机器学习库,用户能够应用 Spark API 构建一个机器学习利用,Spark 尤其善于迭代计算,性能是 Hadoop 的 100 倍。该 lib 蕴含了常见机器学习算法,比方逻辑回归、反对向量机、分类、聚类、回归、随机森林、协同过滤、主成分剖析等。

  • GraphX

    GraphX 是 Spark 中用于图计算的 API,可认为是 Pregel 在 Spark 上的重写及优化,GraphX 性能良好,领有丰盛的性能和运算符,能在海量数据上自若地运行简单的图算法。GraphX 内置了许多图算法,比方驰名的 PageRank 算法。

Spark 运行架构概览

从整体来看,Spark 利用架构包含以下几个次要局部:

  • Driver program
  • Master node
  • Work node
  • Executor
  • Tasks
  • SparkContext

Standalone 模式下,运行架构如下图所示:

Driver program

Driver program 是 Spark 应用程序的 main()函数 (创立 SparkContext 和 Spark 会话)。运行 Driver 过程的节点称之为 Driver node,Driver 过程与集群管理器(Cluster Manager) 进行通信,向 Executor 发送调度的 task。

Cluster Manager

称之为集群管理器,次要用于治理集群。常见的集群管理器包含 YARN、Mesos 和 Standalone,Standalone 集群管理器包含两个长期运行的后盾过程,其中一个是在 Master 节点,另外一个是在 Work 节点。在后续集群部署模式篇,将具体探讨这一部分的内容,此处先有有一个大抵印象即可。

Worker node

相熟 Hadoop 的敌人应该晓得,Hadoop 包含 namenode 和 datanode 节点。Spark 也相似,Spark 将运行具体任务的节点称之为 Worker node。该节点会向 Master 节点汇报以后节点的可用资源,通常在每一台 Worker node 上启动一个 work 后盾过程,用于启动和监控 Executor。

Executor

Master 节点分配资源,应用集群中的 Work node 创立 Executor,Driver 应用这些 Executor 调配运行具体的 Task。每一个应用程序都有本人的 Executor 过程,应用多个线程执行具体的 Task。Executor 次要负责运行工作和保留数据。

Task

Task 是发送到 Executor 中的工作单元

SparkContext

SparkContext 是 Spark 会话的入口,用于连贯 Spark 集群。在提交应用程序之前,首先须要初始化 SparkContext,SparkContext 隐含了网络通信、存储体系、计算引擎、WebUI 等内容。值得注意的是,一个 JVM 过程中只能有一个 SparkContext,如果想创立新的 SparkContext,须要在原来的 SparkContext 上调用 stop()办法。

Spark 编程小试牛刀

Spark 实现分组取 topN 案例

形容:在 HDFS 上有订单数据 order.txt 文件,文件字段的宰割符号 ”,”,其中字段顺次示意订单 id,商品 id,交易额。样本数据如下:

Order_00001,Pdt_01,222.8
Order_00001,Pdt_05,25.8
Order_00002,Pdt_03,522.8
Order_00002,Pdt_04,122.4
Order_00002,Pdt_05,722.4
Order_00003,Pdt_01,222.8

问题:应用 sparkcore,求每个订单中成交额最大的商品 id

实现代码

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object TopOrderItemCluster {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("top n order and item")
    val sc = new SparkContext(conf)
    val hctx = new HiveContext(sc)
    val orderData = sc.textFile("data.txt")
    val splitOrderData = orderData.map(_.split(","))
    val mapOrderData = splitOrderData.map { arrValue =>
      val orderID = arrValue(0)
      val itemID = arrValue(1)
      val total = arrValue(2).toDouble
      (orderID, (itemID, total))
    }
    val groupOrderData = mapOrderData.groupByKey()

    /**
      ***groupOrderData.foreach(x => println(x))
      ***(Order_00003,CompactBuffer((Pdt_01,222.8)))
      ***(Order_00002,CompactBuffer((Pdt_03,522.8), (Pdt_04,122.4), (Pdt_05,722.4)))
      ***(Order_00001,CompactBuffer((Pdt_01,222.8), (Pdt_05,25.8)))
      */
   
    val topOrderData = groupOrderData.map(tupleData => {
      val orderid = tupleData._1
      val maxTotal = tupleData._2.toArray.sortWith(_._2 > _._2).take(1)
      (orderid, maxTotal)
    }
    )
    topOrderData.foreach(value =>
      println("最大成交额的订单 ID 为:" + value._1 + ", 对应的商品 ID 为:" + value._2(0)._1)

      /**
        *** 最大成交额的订单 ID 为:Order_00003 , 对应的商品 ID 为:Pdt_01
        *** 最大成交额的订单 ID 为:Order_00002 , 对应的商品 ID 为:Pdt_05
        *** 最大成交额的订单 ID 为:Order_00001 , 对应的商品 ID 为:Pdt_01
        */
      
    )
    // 结构出元数据为 Row 的 RDD
    val RowOrderData = topOrderData.map(value => Row(value._1, value._2(0)._1))
    // 构建元数据
    val structType = StructType(Array(StructField("orderid", StringType, false),
      StructField("itemid", StringType, false))
    )
    // 转换成 DataFrame
    val orderDataDF = hctx.createDataFrame(RowOrderData, structType)
   // 将数据写入 Hive
    orderDataDF.registerTempTable("tmptable")
    hctx.sql("CREATE TABLE IF NOT EXISTS orderid_itemid(orderid STRING,itemid STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY'\\t'")
      hctx.sql("INSERT INTO orderid_itemid SELECT * FROM tmptable")
  }

}

将上述代码打包,提交到集群运行,能够进入 hive cli 或者 spark-sql 的 shell 查看 Hive 中的数据。

总结

本文次要从整体上对 Spark 进行了介绍,次要包含 Spark 的搜寻热度剖析、Spark 的次要特点、Spark 的一些重要概念以及 Spark 的运行架构,最初给出了一个 Spark 编程案例。本文是 Spark 系列分享的第一篇,能够先感受一下 Spark 的全局风貌,下一篇将分享 Spark Core 编程指南。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版