乐趣区

PySpark-SQL-相关知识介绍

本文作者:foochane 
本文链接:https://foochane.cn/article/2019060601.html

1 大数据简介

大数据是这个时代最热门的话题之一。但是什么是大数据呢? 它描述了一个庞大的数据集,并且正在以惊人的速度增长。大数据除了体积 (Volume) 和速度 (velocity) 外,数据的多样性 (variety) 和准确性 (veracity) 也是大数据的一大特点。让我们详细讨论体积、速度、多样性和准确性。这些也被称为大数据的 4V 特征。

1.1 Volume

数据体积 (Volume) 指定要处理的数据量。对于大量数据,我们需要大型机器或分布式系统。计算时间随数据量的增加而增加。所以如果我们能并行化计算,最好使用分布式系统。数据可以是结构化数据、非结构化数据或介于两者之间的数据。如果我们有非结构化数据,那么情况就会变得更加复杂和计算密集型。你可能会想,大数据到底有多大? 这是一个有争议的问题。但一般来说,我们可以说,我们无法使用传统系统处理的数据量被定义为大数据。现在让我们讨论一下数据的速度。

1.2 Velocity

越来越多的组织机构开始重视数据。每时每刻都在收集大量的数据。这意味着数据的速度在增加。一个系统如何处理这个速度? 当必须实时分析大量流入的数据时,问题就变得复杂了。许多系统正在开发,以处理这种巨大的数据流入。将传统数据与大数据区别开来的另一个因素是数据的多样性。

1.3 Variety

数据的多样性使得它非常复杂,传统的数据分析系统无法正确地分析它。我们说的是哪一种? 数据不就是数据吗? 图像数据不同于表格数据,因为它的组织和保存方式不同。可以使用无限数量的文件系统。每个文件系统都需要一种不同的方法来处理它。读取和写入 JSON 文件与处理 CSV 文件的方式不同。现在,数据科学家必须处理数据类型的组合。您将要处理的数据可能是图片、视频、文本等的组合。大数据的多样性使得分析变得更加复杂。

1.4 Veracity

你能想象一个逻辑错误的计算机程序产生正确的输出吗? 同样,不准确的数据将提供误导的结果。准确性,或数据正确性,是一个重要的问题。对于大数据,我们必须考虑数据的异常。

2 Hadoop 介绍

Hadoop 是一个解决大数据问题的分布式、可伸缩的框架。Hadoop 是由 Doug Cutting 和 Mark Cafarella 开发的。Hadoop 是用 Java 编写的。它可以安装在一组商用硬件上,并且可以在分布式系统上水平扩展。

在商品硬件上工作使它非常高效。如果我们的工作是在商品硬件,故障是一个不可避免的问题。但是 Hadoop 为数据存储和计算提供了一个容错系统。这种容错能力使得 Hadoop 非常流行。

Hadoop 有两个组件:第一个组件是 HDFS(Hadoop Distributed File System),它是一个分布式文件系统。第二个组件是 MapReduce。HDFS 用于分布式数据存储,MapReduce 用于对存储在 HDFS 中的数据执行计算。

2.1 HDFS 介绍

HDFS 用于以分布式和容错的方式存储大量数据。HDFS 是用 Java 编写的,在普通硬件上运行。它的灵感来自于谷歌文件系统 (GFS) 的谷歌研究论文。它是一个写一次读多次的系统,对大量的数据是有效的。HDFS 有两个组件 NameNode 和 DataNode。

这两个组件是 Java 守护进程。NameNode 负责维护分布在集群上的文件的元数据,它是许多 datanode 的主节点。HDFS 将大文件分成小块,并将这些块保存在不同的 datanode 上。实际的文件数据块驻留在 datanode 上。HDFS 提供了一组类 unix-shell 的命令。但是,我们可以使用 HDFS 提供的 Java filesystem API 在更细的级别上处理大型文件。容错是通过复制数据块来实现的。

我们可以使用并行的单线程进程访问 HDFS 文件。HDFS 提供了一个非常有用的实用程序,称为 distcp,它通常用于以并行方式将数据从一个 HDFS 系统传输到另一个 HDFS 系统。它使用并行映射任务复制数据。

2.2 MapReduce 介绍

计算的 MapReduce 模型最早出现在谷歌的一篇研究论文中。Hadoop 的 MapReduce 是 Hadoop 框架的计算引擎,它在 HDFS 中对分布式数据进行计算。MapReduce 已被发现可以在商品硬件的分布式系统上进行水平伸缩。它也适用于大问题。在 MapReduce 中,问题的解决分为 Map 阶段和 Reduce 阶段。在 Map 阶段,处理数据块,在 Reduce 阶段,对 Map 阶段的结果运行聚合或缩减操作。Hadoop 的 MapReduce 框架也是用 Java 编写的。

MapReduce 是一个主从模型。在 Hadoop 1 中,这个 MapReduce 计算由两个守护进程 Jobtracker 和 Tasktracker 管理。Jobtracker 是处理许多任务跟踪器的主进程。Tasktracker 是 Jobtracker 的从节点。但在 Hadoop 2 中,Jobtracker 和 Tasktracker 被 YARN 取代。

我们可以使用框架提供的 API 和 Java 编写 MapReduce 代码。Hadoop streaming 体模块使具有 Python 和 Ruby 知识的程序员能够编写 MapReduce 程序。

MapReduce 算法有很多用途。如许多机器学习算法都被 Apache Mahout 实现,它可以在 Hadoop 上通过 Pig 和 Hive 运行。

但是 MapReduce 并不适合迭代算法。在每个 Hadoop 作业结束时,MapReduce 将数据保存到 HDFS 并为下一个作业再次读取数据。我们知道,将数据读入和写入文件是代价高昂的活动。Apache Spark 通过提供内存中的数据持久性和计算,减轻了 MapReduce 的缺点。

更多关于 Mapreduce 和 Mahout 可以查看如下网页:

  • https://www.usenix.org/legacy/publications/library/proceedings/osdi04/tech/full_papers/dean/dean_html/index.html
  • https://mahout.apache.org/users/basics/quickstart.html

3 Apache Hive 介绍

计算机科学是一个抽象的世界。每个人都知道数据是以位的形式出现的信息。像 C 这样的编程语言提供了对机器和汇编语言的抽象。其他高级语言提供了更多的抽象。结构化查询语言 (Structured Query Language, SQL) 就是这些抽象之一。世界各地的许多数据建模专家都在使用 SQL。Hadoop 非常适合大数据分析。那么,了解 SQL 的广大用户如何利用 Hadoop 在大数据上的计算能力呢? 为了编写 Hadoop 的 MapReduce 程序,用户必须知道可以用来编写 Hadoop 的 MapReduce 程序的编程语言。

现实世界中的日常问题遵循一定的模式。一些问题在日常生活中很常见,比如数据操作、处理缺失值、数据转换和数据汇总。为这些日常问题编写 MapReduce 代码对于非程序员来说是一项令人头晕目眩的工作。编写代码来解决问题不是一件很聪明的事情。但是编写具有性能可伸缩性和可扩展性的高效代码是有价值的。考虑到这个问题,Apache Hive 就在 Facebook 开发出来,它可以解决日常问题,而不需要为一般问题编写 MapReduce 代码。

根据 Hive wiki 的语言,Hive 是一个基于 Apache Hadoop 的数据仓库基础设施。Hive 有自己的 SQL 方言,称为 Hive 查询语言。它被称为 HiveQL,有时也称为 HQL。使用 HiveQL, Hive 查询 HDFS 中的数据。Hive 不仅运行在 HDFS 上,还运行在 Spark 和其他大数据框架上,比如 Apache Tez。

Hive 为 HDFS 中的结构化数据向用户提供了类似关系数据库管理系统的抽象。您可以创建表并在其上运行类似 sql 的查询。Hive 将表模式保存在一些 RDBMS 中。Apache Derby 是 Apache Hive 发行版附带的默认 RDBMS。Apache Derby 完全是用 Java 编写的,是 Apache License Version 2.0 附带的开源 RDBMS。

HiveQL 命令被转换成 Hadoop 的 MapReduce 代码,然后在 Hadoop 集群上运行。

了解 SQL 的人可以轻松学习 Apache Hive 和 HiveQL,并且可以在日常的大数据数据分析工作中使用 Hadoop 的存储和计算能力。PySpark SQL 也支持 HiveQL。您可以在 PySpark SQL 中运行 HiveQL 命令。除了执行 HiveQL 查询,您还可以直接从 Hive 读取数据到 PySpark SQL 并将结果写入 Hive

相关链接:

  • https://cwiki.apache.org/confluence/display/Hive/Tutorial
  • https://db.apache.org/derby/

4 Apache Pig 介绍

Apache Pig 是一个数据流框架,用于对大量数据执行数据分析。它是由雅虎开发的,并向 Apache 软件基金会开放源代码。它现在可以在 Apache 许可 2.0 版本下使用。Pig 编程语言是一种 Pig 拉丁脚本语言。Pig 松散地连接到 Hadoop,这意味着我们可以将它连接到 Hadoop 并执行许多分析。但是 Pig 可以与 Apache Tez 和 Apache Spark 等其他工具一起使用。

Apache Hive 用作报告工具,其中 Apache Pig 用于提取、转换和加载 (ETL)。我们可以使用用户定义函数(UDF) 扩展 Pig 的功能。用户定义函数可以用多种语言编写,包括 Java、Python、Ruby、JavaScript、Groovy 和 Jython。

Apache Pig 使用 HDFS 读取和存储数据,Hadoop 的 MapReduce 执行算法。Apache Pig 在使用 Hadoop 集群方面类似于 Apache Hive。在 Hadoop 上,Pig 命令首先转换为 Hadoop 的 MapReduce 代码。然后将它们转换为 MapReduce 代码,该代码运行在 Hadoop 集群上。

Pig 最好的部分是对代码进行优化和测试,以处理日常问题。所以用户可以直接安装 Pig 并开始使用它。Pig 提供了 Grunt shell 来运行交互式的 Pig 命令。因此,任何了解 Pig Latin 的人都可以享受 HDFS 和 MapReduce 的好处,而不需要了解 Java 或 Python 等高级编程语言。

相关链接

  • http://pig.apache.org/docs/
  • https://en.wikipedia.org/wiki/Pig_(programming_tool))
  • https://cwiki.apache.org/confluence/display/PIG/Index

5 Apache Kafka 介绍

Apache Kafka 是一个发布 - 订阅的分布式消息传递平台。它由 LinkedIn 开发,并进一步开源给 Apache 基金会。它是容错的、可伸缩的和快速的。Kafka 术语中的消息 (数据的最小单位) 通过 Kafka 服务器从生产者流向消费者,并且可以在稍后的时间被持久化和使用。

Kafka 提供了一个内置的 API,开发人员可以使用它来构建他们的应用程序。接下来我们讨论 Apache Kafka 的三个主要组件。

5.1 Producer

Kafka Producer 将消息生成到 Kafka 主题,它可以将数据发布到多个主题。

5.2 Broker

这是运行在专用机器上的 Kafka 服务器,消息由 Producer 推送到 Broker。Broker 将主题保存在不同的分区中,这些分区被复制到不同的 Broker 以处理错误。它本质上是无状态的,因此使用者必须跟踪它所消费的消息。

5.3 Consumer

Consumer 从 Kafka 代理获取消息。记住,它获取消息。Kafka Broker 不会将消息推送给 Consumer; 相反,Consumer 从 Kafka Broker 中提取数据。Consumer 订阅 Kafka Broker 上的一个或多个主题,并读取消息。Broker 还跟踪它所使用的所有消息。数据将在 Broker 中保存指定的时间。如果使用者失败,它可以在重新启动后获取数据。

相关链接:

  • https://kafka.apache.org/docu…
  • https://kafka.apache.org/quic…

6 Apache Spark 介绍

Apache Spark 是一个通用的分布式编程框架。它被认为非常适合迭代和批处理数据。它是在 AMP 实验室开发的,它提供了一个内存计算框架。它是开源软件。一方面,它最适合批量处理,另一方面,它对实时或接近实时的数据非常有效。机器学习和图形算法本质上是迭代的,这就是 Spark 的神奇之处。根据它的研究论文,它比它的同行 Hadoop 快得多。数据可以缓存在内存中。在迭代算法中缓存中间数据提供了惊人的快速处理。Spark 可以使用 Java、Scala、Python 和 R 进行编程。

如果您认为 Spark 是经过改进的 Hadoop,在某种程度上,确实是可以这么认为的。因为我们可以在 Spark 中实现 MapReduce 算法,所以 Spark 使用了 HDFS 的优点。这意味着它可以从 HDFS 读取数据并将数据存储到 HDFS,而且它可以有效地处理迭代计算,因为数据可以保存在内存中。除了内存计算外,它还适用于交互式数据分析。

还有许多其他库也位于 PySpark 之上,以便更容易地使用 PySpark。下面我们将讨论一些:

  • MLlib: MLlib 是 PySpark 核心的一个包装器,它处理机器学习算法。MLlib 库提供的机器学习 api 非常容易使用。MLlib 支持多种机器学习算法,包括分类、聚类、文本分析等等。
  • ML: ML 也是一个位于 PySpark 核心的机器学习库。ML 的机器学习 api 可以用于数据流。
  • GraphFrames: GraphFrames 库提供了一组 api,可以使用 PySpark core 和 PySpark SQL 高效地进行图形分析。

7 PySpark SQL 介绍

数据科学家处理的大多数数据在本质上要么是结构化的,要么是半结构化的。为了处理结构化和半结构化数据集,PySpark SQL 模块是该 PySpark 核心之上的更高级别抽象。我们将在整本书中学习 PySpark SQL。它内置在 PySpark 中,这意味着它不需要任何额外的安装。

使用 PySpark SQL,您可以从许多源读取数据。PySpark SQL 支持从许多文件格式系统读取,包括文本文件、CSV、ORC、Parquet、JSON 等。您可以从关系数据库管理系统 (RDBMS) 读取数据,如 MySQL 和 PostgreSQL。您还可以将分析报告保存到许多系统和文件格式。

7.1 DataFrames

DataFrames 是一种抽象,类似于关系数据库系统中的表。它们由指定的列组成。DataFrames 是行对象的集合,这些对象在 PySpark SQL 中定义。DataFrames 也由指定的列对象组成。用户知道表格形式的模式,因此很容易对数据流进行操作。

DataFrame 列中的元素将具有相同的数据类型。DataFrame 中的行可能由不同数据类型的元素组成。基本数据结构称为弹性分布式数据集(RDD)。数据流是 RDD 上的包装器。它们是 RDD 或 row 对象。

相关链接:

  • https://spark.apache.org/docs/latest/sql-programming-guide.html

7.2 SparkSession

SparkSession 对象是替换 SQLContext 和 HiveContext 的入口点。为了使 PySpark SQL 代码与以前的版本兼容,SQLContext 和 HiveContext 将继续在 PySpark 中运行。在 PySpark 控制台中,我们获得了 SparkSession 对象。我们可以使用以下代码创建 SparkSession 对象。

为了创建 SparkSession 对象,我们必须导入 SparkSession,如下所示。

from pyspark.sql import SparkSession

导入 SparkSession 后,我们可以使用 SparkSession.builder 进行操作:

spark = SparkSession.builder.appName("PythonSQLAPP") .getOrCreate()

appName 函数将设置应用程序的名称。函数的作用是: 返回一个现有的 SparkSession 对象。如果不存在 SparkSession 对象,getOrCreate()函数将创建一个新对象并返回它。

7.3 Structured Streaming

我们可以使用结构化流框架 (PySpark SQL 的包装器) 进行流数据分析。我们可以使用结构化流以类似的方式对流数据执行分析,就像我们使用 PySpark SQL 对静态数据执行批处理分析一样。正如 Spark 流模块对小批执行流操作一样,结构化流引擎也对小批执行流操作。结构化流最好的部分是它使用了类似于 PySpark SQL 的 API。因此,学习曲线很高。对数据流的操作进行优化,并以类似的方式在性能上下文中优化结构化流 API。

7.4 Catalyst Optimizer

SQL 是一种声明性语言。使用 SQL,我们告诉 SQL 引擎要做什么。我们不告诉它如何执行任务。类似地,PySpark SQL 命令不会告诉它如何执行任务。这些命令只告诉它要执行什么。因此,PySpark SQL 查询在执行任务时需要优化。catalyst 优化器在 PySpark SQL 中执行查询优化。PySpark SQL 查询被转换为低级的弹性分布式数据集 (RDD) 操作。catalyst 优化器首先将 PySpark SQL 查询转换为逻辑计划,然后将此逻辑计划转换为优化的逻辑计划。从这个优化的逻辑计划创建一个物理计划。创建多个物理计划。使用成本分析仪,选择最优的物理方案。最后,创建低层 RDD 操作代码。

8 集群管理器(Cluster Managers)

在分布式系统中,作业或应用程序被分成不同的任务,这些任务可以在集群中的不同机器上并行运行。如果机器发生故障,您必须在另一台机器上重新安排任务。

由于资源管理不善,分布式系统通常面临可伸缩性问题。考虑一个已经在集群上运行的作业。另一个人想做另一份工作。第二项工作必须等到第一项工作完成。但是这样我们并没有最优地利用资源。资源管理很容易解释,但是很难在分布式系统上实现。开发集群管理器是为了优化集群资源的管理。有三个集群管理器可用于 Spark 单机、Apache Mesos 和 YARN。这些集群管理器最好的部分是,它们在用户和集群之间提供了一个抽象层。由于集群管理器提供的抽象,用户体验就像在一台机器上工作,尽管他们在集群上工作。集群管理器将集群资源调度到正在运行的应用程序。

8.1 单机集群管理器(Standalone Cluster Manager)

Apache Spark 附带一个单机集群管理器。它提供了一个主从架构来激发集群。它是一个只使用 spark 的集群管理器。您只能使用这个独立的集群管理器运行 Spark 应用程序。它的组件是主组件和工作组件。工人是主过程的奴隶,它是最简单的集群管理器。可以使用 Spark 的 sbin 目录中的脚本配置 Spark 独立集群管理器。

8.2 Apache Mesos 集群管理器(Apache Mesos Cluster Manager)

Apache Mesos 是一个通用的集群管理器。它是在加州大学伯克利分校的 AMP 实验室开发的。Apache Mesos 帮助分布式解决方案有效地扩展。您可以使用 Mesos 在同一个集群上使用不同的框架运行不同的应用程序。来自不同框架的不同应用程序的含义是什么? 这意味着您可以在 Mesos 上同时运行 Hadoop 应用程序和 Spark 应用程序。当多个应用程序在 Mesos 上运行时,它们共享集群的资源。Apache Mesos 有两个重要组件: 主组件和从组件。这种主从架构类似于 Spark 独立集群管理器。运行在 Mesos 上的应用程序称为框架。奴隶告诉主人作为资源提供的可用资源。从机定期提供资源。主服务器的分配模块决定哪个框架获取资源。

8.3 YARN 集群管理器(YARN Cluster Manager)

YARN 代表着另一个资源谈判者 (Resource Negotiator)。在 Hadoop 2 中引入了 YARN 来扩展 Hadoop。资源管理与作业管理分离。分离这两个组件使 Hadoop 的伸缩性更好。YARN 的主要成分是资源管理器(Resource Manager)、应用程序管理器(Application Master) 和节点管理器(Node Manager)。有一个全局资源管理器,每个集群将运行许多节点管理器。节点管理器是资源管理器的奴隶。调度程序是 ResourceManager 的组件,它为集群上的不同应用程序分配资源。最棒的部分是,您可以在 YARN 管理的集群上同时运行 Spark 应用程序和任何其他应用程序,如 Hadoop 或 MPI。每个应用程序有一个 application master,它处理在分布式系统上并行运行的任务。另外,Hadoop 和 Spark 有它们自己的 ApplicationMaster。

相关链接:

  • https://spark.apache.org/docs/2.0.0/spark-standalone.html
  • https://spark.apache.org/docs/2.0.0/running-on-mesos.html
  • https://spark.apache.org/docs/2.0.0/running-on-yarn.html

9 PostgreSQL 介绍

关系数据库管理系统在许多组织中仍然非常常见。这里的关系是什么意思? 关系表。PostgreSQL 是一个关系数据库管理系统。它可以运行在所有主要的操作系统上,比如 Microsoft Windows、基于 unix 的操作系统、MacOS X 等等。它是一个开源程序,代码在 PostgreSQL 许可下可用。因此,您可以自由地使用它,并根据您的需求进行修改。

PostgreSQL 数据库可以通过其他编程语言 (如 Java、Perl、Python、C 和 c ++) 和许多其他语言 (通过不同的编程接口) 连接。还可以使用与 PL/SQL 类似的过程编程语言 PL/pgSQL(过程语言 /PostgreSQL)对其进行编程。您可以向该数据库添加自定义函数。您可以用 C / c++ 和其他编程语言编写自定义函数。您还可以使用 JDBC 连接器从 PySpark SQL 中读取 PostgreSQL 中的数据。

PostgreSQL 遵循 ACID(Atomicity, Consistency, Isolation and
Durability/ 原子性、一致性、隔离性和持久性)原则。它具有许多特性,其中一些是 PostgreSQL 独有的。它支持可更新视图、事务完整性、复杂查询、触发器等。PostgreSQL 使用多版本并发控制模型进行并发管理。

PostgreSQL 得到了广泛的社区支持。PostgreSQL 被设计和开发为可扩展的。

相关链接:

  • https://wiki.postgresql.org/wiki/Main_Page
  • https://en.wikipedia.org/wiki/PostgreSQL
  • https://en.wikipedia.org/wiki/Multiversion_concurrency_control
  • http://postgresguide.com/

10 MongoDB 介绍

MongoDB 是一个基于文档的 NoSQL 数据库。它是一个开放源码的分布式数据库,由 MongoDB 公司开发。MongoDB 是用 c ++ 编写的,它是水平伸缩的。许多组织将其用于后端数据库和许多其他用途。

MongoDB 附带一个 mongo shell,这是一个到 MongoDB 服务器的 JavaScript 接口。mongo shell 可以用来运行查询以及执行管理任务。在 mongo shell 上,我们也可以运行 JavaScript 代码。

使用 PySpark SQL,我们可以从 MongoDB 读取数据并执行分析。我们也可以写出结果。

相关链接:

  • https://docs.mongodb.com/

11 Cassandra 介绍

Cassandra 是开放源码的分布式数据库,附带 Apache 许可证。这是一个由 Facebook 开发的 NoSQL 数据库。它是水平可伸缩的,最适合处理结构化数据。它提供了高水平的一致性,并且具有可调的一致性。它没有一个单一的故障点。它使用对等的分布式体系结构在不同的节点上复制数据。节点使用闲话协议交换信息。

相关链接:

  • https://www.datastax.com/resources/tutorials
  • http://cassandra.apache.org/doc/latest/
退出移动版