关于人工智能:Spark15分钟教程

1次阅读

共计 17350 个字符,预计需要花费 44 分钟才能阅读完成。

作者 |Andrea Ialenti
编译 |VK
起源 |Towards Datas Science

正如在我简直所有对于这个工具的文章中都写到,Spark 和 SQL 一样非常容易应用。但不论我花多少工夫写代码,我只是无奈在我的大脑中永久性地存储 Spark API(有人会说我的记忆就像 RAM 一样,小而易失)。

无论你是想疾速入门介绍 sparksql,还是急于编写你的程序,还是像我一样须要一份备忘单,我置信你会发现这篇文章很有用。

这篇文章的目标是介绍 sparksql 的所有次要函数 / 个性,在片段中,你将始终看到原始的 SQL 查问及其在 PySpark 中的翻译。

我将在这个数据集上执行我的代码:https://drive.google.com/file…

在几个月前,我为另一篇文章创立了这个数据集,它由三个简略的表组成:

基础知识

Apache Spark 是一个用于大规模并行数据处理的引擎。这个框架的一个令人惊奇的个性是它以多种语言公开 api:我通常应用 Scala 与它交互,然而也能够应用 SQL、Python 甚至 Java 和 R。

当咱们编写 Spark 程序时,首先要晓得的是,当咱们执行代码时,咱们不肯定要对数据执行任何操作。实际上,该工具有两种类型的 API 调用:转换和操作。

Spark 转换背地的范例被称为“延后计算”,这意味着理论的数据计算在咱们要求采取行动之前不会开始。

为了了解这一概念,构想一下你须要对一个列执行 SELECT 和重命名的状况:如果不调用某个操作(例如 collect 或 count),那么你的代码只不过是定义了所谓的 Spark 执行打算。

Spark 以有向无环图 (十分驰名的 DAG) 组织执行打算。此构造形容将要执行的确切操作,并使调度器可能决定在给定工夫执行哪个工作。

正如 Miyagi 学生通知咱们的:

  1. 上蜡:定义 DAG(变换)
  2. 脱蜡:执行 DAG(动作)

与 Spark 交互

太好了,咱们从哪里开始交互?应用 Spark 有多种办法:

  • 应用 IDE:我倡议应用 IntelliJ 或 PyCharm,但我想你能够抉择任何你想要的货色。查看附录中的 PyCharm 疾速入门(在本地运行查问)。我认为能够从你的本地环境应用近程 Spark executor,但说实话,我素来没有进行过这种配置。
  • Jupyter Notebooks+Sparkmagic:Sparkmagic 是一组工具,用于通过 Spark REST 服务器 Livy 与近程 Spark 集群交互工作[1]。这是在 AWS、Azure 或 googlecloud 等云零碎上工作时应用 Spark 的次要形式。大多数云提供商都有一项服务,能够在大概 10 分钟内配置集群和 notebooks。
  • 通过应用 spark shell 的终端:有时你不心愿在你和数据之间有任何货色(例如,对一个表进行超级疾速的查看);在这种状况下,你只需关上一个终端并启动 spark shell。

文章的代码次要用于 IDE。

在编写任何查问之前,咱们须要导入一些库并启动一个 Spark 会话 (应用DatasetDataFrame 的 API 编程)。上面的 PySpark 和 Scala 代码段将加载你须要的所有内容(假如你曾经配置了零碎)。之后,为了简略起见,咱们将只看到 PySpark 代码。除了一些细微差别外,scalaapi 基本相同。

PySpark

# 导入 Spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 初始化 Spark 会话
spark = SparkSession.builder \
    .master("local") \
    .appName("SparkLikeABoss") \
    .getOrCreate()

Scala

//  导入 Spark
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

//  初始化 Spark 会话
val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

解释数据集、数据帧和 RDD 之间的差别篇幅将过长,所以我跳过这一部分,伪装它不存在。

基本操作

你能写的最简略的查问可能是你所用过的最重要的查问。让咱们看看如何应用 Sales 表进行基本操作。

简略的 Select 语句和显示数据

#  以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
FROM sales_table
'''
#   执行打算
sales_table_execution_plan = sales_table.select(col("*"))
#   Show (Action) - 显示 5 行,列宽不受限制
sales_table_execution_plan.show(5, True)
#  以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id AS the_order_id,
       seller_id AS the_seller_id,
       num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
#  以一行代码执行打算和显示进去
sales_table_execution_plan = sales_table.select(col("order_id").alias("the_order_id"),
    col("seller_id").alias("the_seller_id"),
    col("num_pieces_sold").alias("the_number_of_pieces_sold")
).show(5, True)

咱们在代码片段中所做的第一件事是定义执行打算;只有当咱们取得 show 操作时,才会执行该打算。

咱们能够在 Spark 打算中调用的其余操作示例包含:

  • collect()—返回整个数据集
  • count()—返回行数
  • take(n)- 从数据集中返回 n 行
  • show(n,truncate=False)- 显示 n 行。你能够决定截断后果或显示字段的所有长度

另一个值得注意的乏味的事件是列是由 col 对象标识的。在本例中,咱们让 Spark 推断这些列属于哪个数据帧。

咱们能够应用语法 execution_plan_variable[“column_name”]来指定列来自哪个执行打算。应用此代替语法,咱们能够失去:

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id AS the_order_id,
       seller_id AS the_seller_id,
       num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
#  以一行代码执行打算和显示进去
sales_table_execution_plan = sales_table.select(sales_table["order_id"].alias("the_order_id"),
    sales_table["seller_id"].alias("the_seller_id"),
    sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")
).show(5, True)

在解决连贯时,限定字段的源表尤为重要(例如,两个表可能有两个同名字段,因而仅应用 col 对象不足以打消歧义)。Scala 中的语法略有不同:

// Qualify the source execution plan in Scala
sales_table.col("order_id")

重命名和增加列

有时咱们只想重命名一个列,或者咱们想增加一个新的列并进行一些计算(例如,在以下状况下):

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id,
       product_id,
       seller_id,
       date,
       num_pieces_sold AS pieces,
       bill_raw_text
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
    withColumnRenamed("num_pieces_sold", "pieces")

sales_table_execution_plan.show()
#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT order_id,
       product_id,
       seller_id,
       date,
       num_pieces_sold,
       bill_raw_text,
       num_pieces_sold % 2 AS num_pieces_sold_is_even
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
    withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)

sales_table_execution_plan.show()

简略聚合

Spark 反对所有次要的聚合函数。以下示例仅指“简略”的示例(例如平均值、总和、计数等)。稍后将介绍数组的聚合。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT product_id,
       SUM(num_pieces_sold) AS total_pieces_sold,
       AVG(num_pieces_sold) AS average_pieces_sold,
       MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,
       MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,
       COUNT(num_pieces_sold) AS num_times_product_sold
FROM sales_table
GROUP BY product_id
'''
sales_table_execution_plan = sales_table.groupBy(col("product_id")
).agg(sum("num_pieces_sold").alias("total_pieces_sold"),
    avg("num_pieces_sold").alias("average_pieces_sold"),
    max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),
    min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),
    count("num_pieces_sold").alias("num_times_product_sold")
)

sales_table_execution_plan.show()

显示架构

显示命令的“table”有点误导人;更准确的定义是“显示执行打算”。应用 Spark API,咱们能够一个接一个地传递多个操作;应用 printSchema API,如果在磁盘上写入执行打算的后果,咱们将输入最终表的样子。

在上面的示例中,咱们重命名一些列,进行聚合,而后增加另一列。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
-- 创立一个长期表,进行一些重命名
CREATE TABLE temp_1 AS
SELECT seller_id AS the_seller,
       num_pieces_sold AS pieces,
       product_id
FROM sales_table;
-- 对新表进行聚合
CREATE TABLE temp_2 AS
SELECT product_id,
       SUM(pieces) AS total_pieces
FROM temp_1
GROUP BY product_id;
-- 增加列
SELECT a.*,
       1 AS fake_column
FROM temp2 a;
'''
sales_table_execution_plan = sales_table. \
    withColumnRenamed("seller_id", "the_seller"). \
    withColumnRenamed("num_pieces_sold", "pieces").\
groupBy(col("product_id")
).agg(sum("pieces").alias("total_pieces")
).withColumn("fake_column", lit(1))

#   输入 Schema
sales_table_execution_plan.printSchema()

printSchema 的输入是:

root
 |-- product_id: string (nullable = true)
 |-- total_pieces: double (nullable = true)
 |-- fake_column: integer (nullable = false)

请留神,printSchema 不会触发操作;相同,Spark 会评估执行打算,以理解 DAG 在输入列中的地位。因为这个起因,这个操作比 show 快得多,show 会触发 DAG 的执行。

解释执行打算

能够通过 explain API 取得无关触发操作时引擎将执行的操作的更具体的阐明。在这种状况下,咱们将取得 Spark 将执行的操作的具体阐明。让咱们对上一个查问调用 explain:

#   输入 Schema
sales_table_execution_plan.printSchema()
== Physical Plan ==
*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])
+- Exchange hashpartitioning(product_id#361, 200)
   +- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))])
      +- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379]
         +- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>

诚实说,我素来没有发现 explain API 太有用,尤其是当 DAG 开始变得宏大和简单时。在 Spark UI 中能够找到一个更好的视图,它公开了雷同信息的图形示意。

Select Distinct

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT DISTINCT seller_id,
       date
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(col("seller_id"), col("date")
).distinct()

#   输入 Schema
sales_table_execution_plan.show()

Case When

在 Spark 中很好地实现了该操作(不须要非凡的 udf);让咱们简略地用 sales_table 将每一行插入到不同的 bucket 中,具体取决于 num_pieces_selled:

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT seller_id,
       CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30',
            WHEN num_pieces_sold < 60 THEN 'Between 31 and 60'
            WHEN num_pieces_sold < 90 THEN 'Between 61 and 90'
            ELSE 'More than 91' AS sales_bucket
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(col("seller_id"),
    when(col("num_pieces_sold") < 30, "Lower than 30").
    when(col("num_pieces_sold") < 60, "Between 31 and 60").
    when(col("num_pieces_sold") < 90, "Between 61 and 90").
    otherwise("More than 91").alias("sales_bucket")
)

sales_table_execution_plan.show()

Union All

有时咱们须要将流分成多个局部,而后将所有内容合并到一个表中;在 SQL 中,这是用 UNION ALL 示意的。在 spark2.1 中,在执行 union all 操作之前必须对列进行排序。

侥幸的是,spark2.3 应用列名来对齐合并的执行打算。在上面的示例中,咱们首先将表拆分为两局部,而后将这些局部合并在一起(齐全没有必要,但它将演示如何应用 API):

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
CREATE TABLE part_1 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold > 50;
CREATE TABLE part_2 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold <= 50;
SELECT *
FROM part_1
 UNION ALL
SELECT *
FROM part_2
'''
#   拆散 part1
sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)

#   拆散 part2
sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)

#   合并
sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)

sales_table_execution_plan.explain()

让咱们看看解释,看看幕后产生了什么:

Union
:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
:  +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50))
:     +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
   +- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50))
      +- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...

表正在合并。

Spark 的 Join

当代码呈现性能问题时,连贯通常是咱们首先要查看的中央。Spark 引擎在并行化非连贯操作方面相当杰出,但在连贯工作时可能须要进行调整。

我写了一整篇对于这个主题的文章,所以我不会再深刻探讨这个问题:如果你想晓得更多,或者你遇到了一些连贯性能问题,我倡议你看看:https://towardsdatascience.co…

同时,这里是连贯的语法。在示例中,咱们将连贯 Sales 和 Sellers 表。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

'''
SELECT a.*,
       b.*
FROM sales_table a
    LEFT JOIN sellers_table b
        ON a.seller_id = b.seller_id
'''
#   左连贯
left_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="left")

#   内连贯
inner_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="inner")

#   右连贯
right_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="right")

#  全外连贯
full_outer_join_execution_plan = sales_table.join(sellers_table, 
                   on=sales_table["seller_id"] == sellers_table["seller_id"], 
                   how="full_outer")

除了传统的连贯类型(左、右、内、穿插等),Spark 还反对半连贯和反连贯; 这两个基本上是在 Spark 中示意操作和不示意操作的一种形式:

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")


'''
SELECT *
FROM sales_table
WHERE seller_id IN (SELECT seller_id FROM sellers_table)
'''
# 左半连贯是在 SQL 中示意 IN 操作的一种形式
semi_join_execution_plan = sales_table.join(sellers_table, 
                on=sales_table["seller_id"] == sellers_table["seller_id"], 
                how="left_semi")

semi_join_execution_plan.show()
#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

'''
SELECT *
FROM sales_table
WHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)
'''
# 左反连贯是在 SQL 中示意 NOT IN 操作的一种形式
anti_join_execution_plan = sales_table.join(sellers_table,
                on=sales_table["seller_id"] == sellers_table["seller_id"],
                how="left_anti")

anti_join_execution_plan.show()

Window 函数

window 函数对定义为 frame 或 window 的特定行子集执行计算。典型的例子是子群的排序。在咱们的玩具数据集中,假如咱们想晓得,对于每个卖家来说,什么是销售最多的产品。要提取这些信息,咱们须要:

  1. 定义咱们将利用排序函数的“分区”:咱们须要对每个卖家的产品执行一次排序操作
  2. 利用咱们的首选排序函数:dense_rank, `rank, row_number`。上面是 Spark 中的窗口函数列表。

下图是咱们心愿如何分区数据的示例:

#   导入 Window
from pyspark.sql.window import Window

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT seller_id,
       product_id,
       total_pieces,
       dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rank
FROM (
    SELECT seller_id,
           product_id,
           SUM(total_pieces_sold) AS total_pieces
    FROM sales_table
    GROUP BY seller_id,
           product_id
)
'''sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))

#  定义窗口: 在卖方 ID 上对表进行分区,并依据销售的总块对每个组进行排序
window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())

# 利用 dense_rank 函数,依据下面的标准创立窗口
sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()

字符串

数据科学家在解决数据时面临的另一组十分常见的操作,包含从字符串中提取信息。当然,有很多 Spark API 能够对文本数据进行简直任何 (根本) 操作。

让咱们先从简略的 LIKE 运算符开始,而后再探讨正则表达式的用法。对于 API 的残缺列表,我将参考文档;上面是可能应用最多的 API。

Like

在上面的示例中,咱们心愿应用 sales 表来抉择 bill_raw_text 相似于“ab%cd%”的所有字符串(即,以字符串 ab 结尾,两头有一个字符串 cd。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
WHERE bill_raw_text LIKE 'ab%cd%'
'''
sales_table_execution_plan = sales_table.where(col('bill_raw_text').like("ab%cd%")
)

sales_table_execution_plan.show()

有时咱们想要找到的模式更简单,无奈用简略的通配符来表白。在这种状况下,咱们须要应用正则表达式。让咱们深入研究几个函数。在上面的示例中,咱们将始终利用雷同的正则表达式。

(ab[cd]{2,4})|(aa[abcde]{1,2})

Like 的正则表达式(Regex)

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT *
FROM sales_table
WHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})'
'''
sales_table_execution_plan = sales_table.where(col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})")
)

sales_table_execution_plan.show()

用正则表达式提取模式

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text,'(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_pattern
WHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "FROM sales_table'''
sales_table_execution_plan = sales_table.select(
    #  最初一个整数示意要提取哪一组
    regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")
).where(col("extracted_pattern") != "").distinct()

sales_table_execution_plan.show(100,False)

数组操作

数组是一种数据类型,。Spark 实现了很多函数来操作数组(精确地说,从 2.4 版开始就是这样)。让咱们深刻理解根本状况。

数组聚合

将列转换为数组与调用聚合函数一样简略。Spark 2.3 有两种次要的数组聚合函数 collect_set 和 collect_list:第一种只蕴含惟一的元素,而后一种只是将组转换为列表。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
       COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list,
       seller_id
FROM sales_table
GROUP BY seller_id
'''sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg(collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"),
    collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),

)

sales_table_execution_plan.show(10, True)

合成阵列

聚合的逆操作是“数组合成”,即从程度数组生成“垂直”列。为此,咱们能够应用 explode 函数。

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

'''
CREATE TABLE sales_table_aggregated AS
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
       seller_id
FROM sales_table
GROUP BY seller_id;
SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_set
FROM sales_table_aggregated;
'''sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg(collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set")
)

sales_table_execution_exploded = sales_table_execution_aggregated.select(explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set")
)

sales_table_execution_exploded.show(10, True)

其余应用数组的操作(从 Spark 2.4 开始)

可怜的是,Spark 2.3 不反对对数组执行太多操作。侥幸的是,Spark 2.4 能够!Spark 2.4 之后提供的一些性能包含:

  • array_except(array1,array2)- 返回 array1 中的元素数组,而不是 array2 中的元素,没有反复项。
  • array_intersect(array1,array2)- 返回 array1 和 array2 相交的元素数组,不蕴含反复项。
  • array_join(array,delimiter[,nullReplacement])- 应用分隔符和可选字符串连贯给定数组的元素。
  • array_max(array)- 返回数组中的最大值。跳过空元素。
  • array_min(array)- 返回数组中的最小值。跳过空元素。
  • array_sort(array)- 按升序对输出数组进行排序。输出数组的元素必须是可排序的。空元素将放在返回数组的开端。

等等。以上定义间接取自参考文献。我倡议你查一下,以便有更多的细节!

UDFs

最初,用户定义函数。当咱们在默认的 api 中找不到转换时,udf 就是一种办法。

UDF 是一个定制函数,程序员能够像咱们目前看到的所有 api 一样定义并利用于列。它们提供了最大的灵活性 (咱们简直能够在其中编写任何代码);毛病是 Spark 将它们视为黑匣子,因而外部的 Spark 引擎优化器(Catalyst) 无奈进行任何优化:udf 可能会减慢咱们的代码速度。

作为一个示例,让咱们实现一个 UDF,它模仿函数 array_repeat(element,count),该函数返回一个蕴含元素 count 次的数组。

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

#  创立将在 UDF 中应用的函数
def array_repeat_custom(element, count):
  list = ["{}".format(element) for x in range(0, count)]
  return list

#   将函数转换为 UDF。指出 UDF 的返回类型是一种很好的做法
#   在本例中,返回类型是字符串数组
array_repeat_custom_udf = udf(array_repeat_custom,  ArrayType(StringType()))

#   以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")

#   调用 UDF
sales_table_execution_plan = sales_table.select(array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array")
)

sales_table_execution_plan.show()

除了 UDF 的语法之外,我倡议你关注下面应用的 lit 函数。有些 Spark 函数只承受列作为输出:如果须要应用常量,则可能须要将该常量转换为“列”。lit 函数会创立一列文字值。

下一步

我心愿我可能证实 Spark 并不比 SQL 更难,他们基本上是一样的。

你能够设想,这篇文章的题目有点夸大:实际上精通这个工具须要 15 分钟以上的工夫;但我置信以上是一个很好的疾速入门!

我的倡议是开始应用下面的 api,因为它们将笼罩 70% 的用例。当你对基础知识有信念时,我倡议你写上面两篇文章,那是一位值得信赖的作者 (lol) 几个月前写的。第一个问题将挑战你在应用此工具进行开发时遇到的一些经典问题,而第二个问题是对 Spark Joins 的深入研究。

https://towardsdatascience.co…

https://towardsdatascience.co…

附录 - 配置 PyCharm

在本地 (非分布式) 环境中装置 Spark 是一项非常简单的工作。在本附录中,我将向你展现 PyCharm Community Edition 的根本配置,以便应用 Python 运行 Spark。有五个简略步骤:

  1. 下载 PyCharm 社区版
  2. 下载 Spark
  3. 装置 PySpark
  4. 配置 PyCharm 以执行正确的 Spark executor
  5. 测试是否一切正常

两个注意事项:

  • 我假如你的零碎中正确装置了 Java。
  • 在 Windows 上,须要装置 Winutils,这是运行 Hadoop 所需的一组二进制文件。查看此 Git repo 理解更多信息:https://github.com/stevelough…。

下载 PyCharm 社区版

侥幸的是,JetBrains 有一个 PyCharm 的开源版本。咱们能够简略地从他们的网站下载最新版本。装置很简略。

下载 Spark

咱们只须要从 Spark 官方网站下载一个压缩文件。在我写作时,有两个次要版本可用:3.0.1 和 2.4.7。对于文章的范畴,咱们能够抉择其中之一。

一旦下载实现,咱们只须要在一个适合的地位解压包。

装置 PySpark

当初是运行 PyCharm 并装置所需的所有软件包的时候了。首先,让咱们关上 PyCharm,创立一个新我的项目和一个新的虚拟环境。

最初,间接从 PyCharm 装置 PySpark:

留神,为了启用提醒,咱们还应该装置 pyspark-stubs 包。

配置 PyCharm 以执行正确的 Spark executor

心愿咱们没有呈现任何谬误,所以咱们只须要批示 PyCharm 运行正确的 Spark 执行器。它位于咱们解压缩 Spark 自身的文件夹中。让咱们为 PyCharm 我的项目创立一个运行配置。

测试是否一切正常

要测试 Spark 是否失常工作,只需运行以下代码片段

# 导入库
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 初始化 Spark 会话
spark = SparkSession.builder \
    .master("local") \
    .appName("SparkLikeABoss") \
    .getOrCreate()

print(spark.version)

原文链接:https://towardsdatascience.co…

欢送关注磐创 AI 博客站:
http://panchuang.net/

sklearn 机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/

正文完
 0