关于spark:第一篇Spark概览

Apache Spark最后在2009年诞生于美国加州大学伯克利分校的APM实验室,并于2010年开源,现在是Apache软件基金会下的顶级开源我的项目之一。Spark的指标是设计一种编程模型,可能疾速地进行数据分析。Spark提供了内存计算,缩小了IO开销。另外Spark是基于Scala编写的,提供了交互式的编程体验。通过10年的倒退,Spark成为了煊赫一时的大数据处理平台,目前最新的版本是Spark3.0。本文次要是对Spark进行一个总体概览式的介绍,后续内容会对具体的细节进行展开讨论。本文的次要内容包含:

  • [x] Spark的关注度剖析
  • [x] Spark的特点
  • [x] Spark的一些重要概念
  • [x] Spark组件概览
  • [x] Spark运行架构概览
  • [x] Spark编程初体验

Spark的关注热度剖析

详情

下图展现了近1年内在国内对于Spark、Hadoop及Flink的搜寻趋势

近1年内寰球对于Spark、Hadoop及Flink的搜寻趋势,如下:

近1年国内对于Spark、Hadoop及Flink的搜寻热度区域散布状况(按Flink搜寻热度降序排列):

近1年寰球对于Spark、Hadoop及Flink的搜寻热度区域散布状况(按Flink搜寻热度降序排列):

剖析

从下面的4幅图能够看出,近一年无论是在国内还是寰球,对于Spark的搜寻热度始终是比Hadoop和Flink要高。近年来Flink倒退迅猛,其在国内有阿里的背书,Flink人造的流解决特点使其成为了开发流式利用的首选框架。能够看出,尽管Flink在国内很火,然而放眼寰球,热度依然不迭Spark。所以学习并把握Spark技术依然是一个不错的抉择,技术有很多的相似性,如果你曾经把握了Spark,再去学习Flink的话,置信你会有种似曾相识的感觉。

Spark的特点

  • 速度快

    Apache Spark应用DAG调度程序、查问优化器和物理执行引擎,为批处理和流解决提供了高性能。

  • 易于应用

    反对应用Java,Scala,Python,R和SQL疾速编写应用程序。Spark提供了80多个高级操作算子,可轻松构建并行应用程序。

  • 通用性

    Spark提供了十分丰盛的生态栈,包含SQL查问、流式计算、机器学习和图计算等组件,这些组件能够无缝整合在一个利用中,通过一站部署,能够应答多种简单的计算场景

  • 运行模式多样

    Spark能够应用Standalone模式运行,也能够运行在Hadoop,Apache Mesos,Kubernetes等环境中运行。并且能够拜访HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等多种数据源中的数据。

Spark的一些重要概念

  • RDD

    弹性分布式数据集(Resilient Distributed Dataset),是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型

  • DAG

    有向无环图(Directed Acyclic Graph),反映RDD之间的依赖关系

  • Application

    用户编写的Spark程序,由 driver program 和 executors 组成

  • Application jar
    用户编写的应用程序JAR包
  • Driver program
    用程序main()函数的过程,能够创立SparkContext
  • Cluster manager
    集群管理器,属于一个内部服务,用于资源申请调配(如:standalone manager, Mesos, YARN)
  • Deploy mode

    部署模式,决定Driver过程在哪里运行。如果是cluster模式,会由框架自身在集群外部某台机器上启动Driver过程。如果是client模式,会在提交程序的机器上启动Driver过程

  • Worker node

    集群中运行应用程序的节点Executor运行在Worknode节点上的一个过程,负责运行具体的工作,并为应用程序存储数据

  • Task
    运行在executor中的工作单元
  • Job
    一个job蕴含多个RDD及一些列的运行在RDD之上的算子操作,job须要通过action操作进行触发(比方save、collect等)
  • Stage
    每一个作业会被分成由一些列task组成的stage,stage之间会相互依赖

Spark组件概览

Spark生态系统次要包含Spark Core、SparkSQL、SparkStreaming、MLlib和GraphX等组件,具体如下图所示:

  • Spark Core

    Spark core是Spark的外围,蕴含了Spark的基本功能,如内存计算、任务调度、部署模式、存储管理等。SparkCore提供了基于RDD的API是其余高级API的根底,次要性能是实现批处理。

  • Spark SQL

    Spark SQL次要是为了解决结构化和半结构化数据而设计的,SparkSQL容许用户在Spark程序中应用SQL、DataFrame和DataSetAPI查问结构化数据,反对Java、Scala、Python和R语言。因为DataFrame API提供了对立的拜访各种数据源的形式(包含Hive、Avro、Parquet、ORC和JDBC),用户能够通过雷同的形式连贯任何数据源。另外,Spark SQL能够应用hive的元数据,从而实现了与Hive的完满集成,用户能够将Hive的作业间接运行在Spark上。Spark SQL能够通过spark-sql的shell命令拜访。

  • SparkStreaming

    SparkStreaming是Spark很重要的一个模块,可实现实时数据流的可伸缩,高吞吐量,容错流解决。在外部,其工作形式是将实时输出的数据流拆分为一系列的micro batch,而后由Spark引擎进行解决。SparkStreaming反对多种数据源,如kafka、Flume和TCP套接字等

  • MLlib

    MLlib是Spark提供的一个机器学习库,用户能够应用Spark API构建一个机器学习利用,Spark尤其善于迭代计算,性能是Hadoop的100倍。该lib蕴含了常见机器学习算法,比方逻辑回归、反对向量机、分类、聚类、回归、随机森林、协同过滤、主成分剖析等。

  • GraphX

    GraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,GraphX性能良好,领有丰盛的性能和运算符,能在海量数据上自若地运行简单的图算法。GraphX内置了许多图算法,比方驰名的PageRank算法。

Spark运行架构概览

从整体来看,Spark利用架构包含以下几个次要局部:

  • Driver program
  • Master node
  • Work node
  • Executor
  • Tasks
  • SparkContext

Standalone模式下,运行架构如下图所示:

Driver program

Driver program是Spark应用程序的main()函数(创立SparkContext和Spark会话)。运行Driver过程的节点称之为Driver node,Driver过程与集群管理器(Cluster Manager)进行通信,向Executor发送调度的task。

Cluster Manager

称之为集群管理器,次要用于治理集群。常见的集群管理器包含YARN、Mesos和Standalone,Standalone集群管理器包含两个长期运行的后盾过程,其中一个是在Master节点,另外一个是在Work节点。在后续集群部署模式篇,将具体探讨这一部分的内容,此处先有有一个大抵印象即可。

Worker node

相熟Hadoop的敌人应该晓得,Hadoop包含namenode和datanode节点。Spark也相似,Spark将运行具体任务的节点称之为Worker node。该节点会向Master节点汇报以后节点的可用资源,通常在每一台Worker node上启动一个work后盾过程,用于启动和监控Executor。

Executor

Master节点分配资源,应用集群中的Work node创立Executor,Driver应用这些Executor调配运行具体的Task。每一个应用程序都有本人的Executor过程,应用多个线程执行具体的Task。Executor次要负责运行工作和保留数据。

Task

Task是发送到Executor中的工作单元

SparkContext

SparkContext是Spark会话的入口,用于连贯Spark集群。在提交应用程序之前,首先须要初始化SparkContext,SparkContext隐含了网络通信、存储体系、计算引擎、WebUI等内容。值得注意的是,一个JVM过程中只能有一个SparkContext,如果想创立新的SparkContext,须要在原来的SparkContext上调用stop()办法。

Spark编程小试牛刀

Spark实现分组取topN案例

形容:在HDFS上有订单数据order.txt文件,文件字段的宰割符号”,”,其中字段顺次示意订单id,商品id,交易额。样本数据如下:

Order_00001,Pdt_01,222.8
Order_00001,Pdt_05,25.8
Order_00002,Pdt_03,522.8
Order_00002,Pdt_04,122.4
Order_00002,Pdt_05,722.4
Order_00003,Pdt_01,222.8

问题:应用sparkcore,求每个订单中成交额最大的商品id

实现代码

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object TopOrderItemCluster {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("top n order and item")
    val sc = new SparkContext(conf)
    val hctx = new HiveContext(sc)
    val orderData = sc.textFile("data.txt")
    val splitOrderData = orderData.map(_.split(","))
    val mapOrderData = splitOrderData.map { arrValue =>
      val orderID = arrValue(0)
      val itemID = arrValue(1)
      val total = arrValue(2).toDouble
      (orderID, (itemID, total))
    }
    val groupOrderData = mapOrderData.groupByKey()

    /**
      ***groupOrderData.foreach(x => println(x))
      ***(Order_00003,CompactBuffer((Pdt_01,222.8)))
      ***(Order_00002,CompactBuffer((Pdt_03,522.8), (Pdt_04,122.4), (Pdt_05,722.4)))
      ***(Order_00001,CompactBuffer((Pdt_01,222.8), (Pdt_05,25.8)))
      */
   
    val topOrderData = groupOrderData.map(tupleData => {
      val orderid = tupleData._1
      val maxTotal = tupleData._2.toArray.sortWith(_._2 > _._2).take(1)
      (orderid, maxTotal)
    }
    )
    topOrderData.foreach(value =>
      println("最大成交额的订单ID为:" + value._1 + " ,对应的商品ID为:" + value._2(0)._1)

      /**
        ***最大成交额的订单ID为:Order_00003 ,对应的商品ID为:Pdt_01
        ***最大成交额的订单ID为:Order_00002 ,对应的商品ID为:Pdt_05
        ***最大成交额的订单ID为:Order_00001 ,对应的商品ID为:Pdt_01
        */
      
    )
    //结构出元数据为Row的RDD
    val RowOrderData = topOrderData.map(value => Row(value._1, value._2(0)._1))
    //构建元数据
    val structType = StructType(Array(
      StructField("orderid", StringType, false),
      StructField("itemid", StringType, false))
    )
    //转换成DataFrame
    val orderDataDF = hctx.createDataFrame(RowOrderData, structType)
   // 将数据写入Hive
    orderDataDF.registerTempTable("tmptable")
    hctx.sql("CREATE TABLE IF NOT EXISTS orderid_itemid(orderid STRING,itemid STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'")
      hctx.sql("INSERT INTO orderid_itemid SELECT * FROM tmptable")
  }

}

将上述代码打包,提交到集群运行,能够进入hive cli或者spark-sql的shell查看Hive中的数据。

总结

本文次要从整体上对Spark进行了介绍,次要包含Spark的搜寻热度剖析、Spark的次要特点、Spark的一些重要概念以及Spark的运行架构,最初给出了一个Spark编程案例。本文是Spark系列分享的第一篇,能够先感受一下Spark的全局风貌,下一篇将分享Spark Core编程指南。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

【腾讯云】云产品限时秒杀,爆款1核2G云服务器,首年99元

阿里云限时活动-1核2G-1M带宽-40-100G ,特惠价87.12元/年(原价1234.2元/年,可以直接买3年),速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

You may also like...

发表评论

邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据