共计 17350 个字符,预计需要花费 44 分钟才能阅读完成。
作者 |Andrea Ialenti
编译 |VK
起源 |Towards Datas Science
正如在我简直所有对于这个工具的文章中都写到,Spark 和 SQL 一样非常容易应用。但不论我花多少工夫写代码,我只是无奈在我的大脑中永久性地存储 Spark API(有人会说我的记忆就像 RAM 一样,小而易失)。
无论你是想疾速入门介绍 sparksql,还是急于编写你的程序,还是像我一样须要一份备忘单,我置信你会发现这篇文章很有用。
这篇文章的目标是介绍 sparksql 的所有次要函数 / 个性,在片段中,你将始终看到原始的 SQL 查问及其在 PySpark 中的翻译。
我将在这个数据集上执行我的代码:https://drive.google.com/file…
在几个月前,我为另一篇文章创立了这个数据集,它由三个简略的表组成:
基础知识
Apache Spark 是一个用于大规模并行数据处理的引擎。这个框架的一个令人惊奇的个性是它以多种语言公开 api:我通常应用 Scala 与它交互,然而也能够应用 SQL、Python 甚至 Java 和 R。
当咱们编写 Spark 程序时,首先要晓得的是,当咱们执行代码时,咱们不肯定要对数据执行任何操作。实际上,该工具有两种类型的 API 调用:转换和操作。
Spark 转换背地的范例被称为“延后计算”,这意味着理论的数据计算在咱们要求采取行动之前不会开始。
为了了解这一概念,构想一下你须要对一个列执行 SELECT 和重命名的状况:如果不调用某个操作(例如 collect 或 count),那么你的代码只不过是定义了所谓的 Spark 执行打算。
Spark 以有向无环图 (十分驰名的 DAG) 组织执行打算。此构造形容将要执行的确切操作,并使调度器可能决定在给定工夫执行哪个工作。
正如 Miyagi 学生通知咱们的:
- 上蜡:定义 DAG(变换)
- 脱蜡:执行 DAG(动作)
与 Spark 交互
太好了,咱们从哪里开始交互?应用 Spark 有多种办法:
- 应用 IDE:我倡议应用 IntelliJ 或 PyCharm,但我想你能够抉择任何你想要的货色。查看附录中的 PyCharm 疾速入门(在本地运行查问)。我认为能够从你的本地环境应用近程 Spark executor,但说实话,我素来没有进行过这种配置。
- Jupyter Notebooks+Sparkmagic:Sparkmagic 是一组工具,用于通过 Spark REST 服务器 Livy 与近程 Spark 集群交互工作[1]。这是在 AWS、Azure 或 googlecloud 等云零碎上工作时应用 Spark 的次要形式。大多数云提供商都有一项服务,能够在大概 10 分钟内配置集群和 notebooks。
- 通过应用 spark shell 的终端:有时你不心愿在你和数据之间有任何货色(例如,对一个表进行超级疾速的查看);在这种状况下,你只需关上一个终端并启动 spark shell。
文章的代码次要用于 IDE。
在编写任何查问之前,咱们须要导入一些库并启动一个 Spark 会话 (应用Dataset 和DataFrame 的 API 编程)。上面的 PySpark 和 Scala 代码段将加载你须要的所有内容(假如你曾经配置了零碎)。之后,为了简略起见,咱们将只看到 PySpark 代码。除了一些细微差别外,scalaapi 基本相同。
PySpark
# 导入 Spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 初始化 Spark 会话
spark = SparkSession.builder \
.master("local") \
.appName("SparkLikeABoss") \
.getOrCreate()
Scala
// 导入 Spark
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
// 初始化 Spark 会话
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
解释数据集、数据帧和 RDD 之间的差别篇幅将过长,所以我跳过这一部分,伪装它不存在。
基本操作
你能写的最简略的查问可能是你所用过的最重要的查问。让咱们看看如何应用 Sales 表进行基本操作。
简略的 Select 语句和显示数据
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
FROM sales_table
'''
# 执行打算
sales_table_execution_plan = sales_table.select(col("*"))
# Show (Action) - 显示 5 行,列宽不受限制
sales_table_execution_plan.show(5, True)
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id AS the_order_id,
seller_id AS the_seller_id,
num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
# 以一行代码执行打算和显示进去
sales_table_execution_plan = sales_table.select(col("order_id").alias("the_order_id"),
col("seller_id").alias("the_seller_id"),
col("num_pieces_sold").alias("the_number_of_pieces_sold")
).show(5, True)
咱们在代码片段中所做的第一件事是定义执行打算;只有当咱们取得 show 操作时,才会执行该打算。
咱们能够在 Spark 打算中调用的其余操作示例包含:
- collect()—返回整个数据集
- count()—返回行数
- take(n)- 从数据集中返回 n 行
- show(n,truncate=False)- 显示 n 行。你能够决定截断后果或显示字段的所有长度
另一个值得注意的乏味的事件是列是由 col 对象标识的。在本例中,咱们让 Spark 推断这些列属于哪个数据帧。
咱们能够应用语法 execution_plan_variable[“column_name”]来指定列来自哪个执行打算。应用此代替语法,咱们能够失去:
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id AS the_order_id,
seller_id AS the_seller_id,
num_pieces_sold AS the_number_of_pieces_sold
FROM sales_table
'''
# 以一行代码执行打算和显示进去
sales_table_execution_plan = sales_table.select(sales_table["order_id"].alias("the_order_id"),
sales_table["seller_id"].alias("the_seller_id"),
sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")
).show(5, True)
在解决连贯时,限定字段的源表尤为重要(例如,两个表可能有两个同名字段,因而仅应用 col 对象不足以打消歧义)。Scala 中的语法略有不同:
// Qualify the source execution plan in Scala
sales_table.col("order_id")
重命名和增加列
有时咱们只想重命名一个列,或者咱们想增加一个新的列并进行一些计算(例如,在以下状况下):
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id,
product_id,
seller_id,
date,
num_pieces_sold AS pieces,
bill_raw_text
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
withColumnRenamed("num_pieces_sold", "pieces")
sales_table_execution_plan.show()
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT order_id,
product_id,
seller_id,
date,
num_pieces_sold,
bill_raw_text,
num_pieces_sold % 2 AS num_pieces_sold_is_even
FROM sales_table a
'''
sales_table_execution_plan = sales_table. \
withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)
sales_table_execution_plan.show()
简略聚合
Spark 反对所有次要的聚合函数。以下示例仅指“简略”的示例(例如平均值、总和、计数等)。稍后将介绍数组的聚合。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT product_id,
SUM(num_pieces_sold) AS total_pieces_sold,
AVG(num_pieces_sold) AS average_pieces_sold,
MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,
MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,
COUNT(num_pieces_sold) AS num_times_product_sold
FROM sales_table
GROUP BY product_id
'''
sales_table_execution_plan = sales_table.groupBy(col("product_id")
).agg(sum("num_pieces_sold").alias("total_pieces_sold"),
avg("num_pieces_sold").alias("average_pieces_sold"),
max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),
min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),
count("num_pieces_sold").alias("num_times_product_sold")
)
sales_table_execution_plan.show()
显示架构
显示命令的“table”有点误导人;更准确的定义是“显示执行打算”。应用 Spark API,咱们能够一个接一个地传递多个操作;应用 printSchema API,如果在磁盘上写入执行打算的后果,咱们将输入最终表的样子。
在上面的示例中,咱们重命名一些列,进行聚合,而后增加另一列。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
-- 创立一个长期表,进行一些重命名
CREATE TABLE temp_1 AS
SELECT seller_id AS the_seller,
num_pieces_sold AS pieces,
product_id
FROM sales_table;
-- 对新表进行聚合
CREATE TABLE temp_2 AS
SELECT product_id,
SUM(pieces) AS total_pieces
FROM temp_1
GROUP BY product_id;
-- 增加列
SELECT a.*,
1 AS fake_column
FROM temp2 a;
'''
sales_table_execution_plan = sales_table. \
withColumnRenamed("seller_id", "the_seller"). \
withColumnRenamed("num_pieces_sold", "pieces").\
groupBy(col("product_id")
).agg(sum("pieces").alias("total_pieces")
).withColumn("fake_column", lit(1))
# 输入 Schema
sales_table_execution_plan.printSchema()
printSchema 的输入是:
root
|-- product_id: string (nullable = true)
|-- total_pieces: double (nullable = true)
|-- fake_column: integer (nullable = false)
请留神,printSchema 不会触发操作;相同,Spark 会评估执行打算,以理解 DAG 在输入列中的地位。因为这个起因,这个操作比 show 快得多,show 会触发 DAG 的执行。
解释执行打算
能够通过 explain API 取得无关触发操作时引擎将执行的操作的更具体的阐明。在这种状况下,咱们将取得 Spark 将执行的操作的具体阐明。让咱们对上一个查问调用 explain:
# 输入 Schema
sales_table_execution_plan.printSchema()
== Physical Plan ==
*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])
+- Exchange hashpartitioning(product_id#361, 200)
+- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))])
+- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379]
+- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>
诚实说,我素来没有发现 explain API 太有用,尤其是当 DAG 开始变得宏大和简单时。在 Spark UI 中能够找到一个更好的视图,它公开了雷同信息的图形示意。
Select Distinct
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT DISTINCT seller_id,
date
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(col("seller_id"), col("date")
).distinct()
# 输入 Schema
sales_table_execution_plan.show()
Case When
在 Spark 中很好地实现了该操作(不须要非凡的 udf);让咱们简略地用 sales_table 将每一行插入到不同的 bucket 中,具体取决于 num_pieces_selled:
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT seller_id,
CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30',
WHEN num_pieces_sold < 60 THEN 'Between 31 and 60'
WHEN num_pieces_sold < 90 THEN 'Between 61 and 90'
ELSE 'More than 91' AS sales_bucket
FROM sales_table
'''
sales_table_execution_plan = sales_table.select(col("seller_id"),
when(col("num_pieces_sold") < 30, "Lower than 30").
when(col("num_pieces_sold") < 60, "Between 31 and 60").
when(col("num_pieces_sold") < 90, "Between 61 and 90").
otherwise("More than 91").alias("sales_bucket")
)
sales_table_execution_plan.show()
Union All
有时咱们须要将流分成多个局部,而后将所有内容合并到一个表中;在 SQL 中,这是用 UNION ALL 示意的。在 spark2.1 中,在执行 union all 操作之前必须对列进行排序。
侥幸的是,spark2.3 应用列名来对齐合并的执行打算。在上面的示例中,咱们首先将表拆分为两局部,而后将这些局部合并在一起(齐全没有必要,但它将演示如何应用 API):
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
CREATE TABLE part_1 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold > 50;
CREATE TABLE part_2 AS
SELECT *
FROM sales_table
WHERE num_pieces_sold <= 50;
SELECT *
FROM part_1
UNION ALL
SELECT *
FROM part_2
'''
# 拆散 part1
sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)
# 拆散 part2
sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)
# 合并
sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)
sales_table_execution_plan.explain()
让咱们看看解释,看看幕后产生了什么:
Union
:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
: +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50))
: +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]
+- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50))
+- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
表正在合并。
Spark 的 Join
当代码呈现性能问题时,连贯通常是咱们首先要查看的中央。Spark 引擎在并行化非连贯操作方面相当杰出,但在连贯工作时可能须要进行调整。
我写了一整篇对于这个主题的文章,所以我不会再深刻探讨这个问题:如果你想晓得更多,或者你遇到了一些连贯性能问题,我倡议你看看:https://towardsdatascience.co…
同时,这里是连贯的语法。在示例中,咱们将连贯 Sales 和 Sellers 表。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT a.*,
b.*
FROM sales_table a
LEFT JOIN sellers_table b
ON a.seller_id = b.seller_id
'''
# 左连贯
left_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left")
# 内连贯
inner_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="inner")
# 右连贯
right_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="right")
# 全外连贯
full_outer_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="full_outer")
除了传统的连贯类型(左、右、内、穿插等),Spark 还反对半连贯和反连贯; 这两个基本上是在 Spark 中示意操作和不示意操作的一种形式:
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT *
FROM sales_table
WHERE seller_id IN (SELECT seller_id FROM sellers_table)
'''
# 左半连贯是在 SQL 中示意 IN 操作的一种形式
semi_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left_semi")
semi_join_execution_plan.show()
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
'''
SELECT *
FROM sales_table
WHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)
'''
# 左反连贯是在 SQL 中示意 NOT IN 操作的一种形式
anti_join_execution_plan = sales_table.join(sellers_table,
on=sales_table["seller_id"] == sellers_table["seller_id"],
how="left_anti")
anti_join_execution_plan.show()
Window 函数
window 函数对定义为 frame 或 window 的特定行子集执行计算。典型的例子是子群的排序。在咱们的玩具数据集中,假如咱们想晓得,对于每个卖家来说,什么是销售最多的产品。要提取这些信息,咱们须要:
- 定义咱们将利用排序函数的“分区”:咱们须要对每个卖家的产品执行一次排序操作
- 利用咱们的首选排序函数:dense_rank
,
`rank,
row_number`。上面是 Spark 中的窗口函数列表。
下图是咱们心愿如何分区数据的示例:
# 导入 Window
from pyspark.sql.window import Window
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT seller_id,
product_id,
total_pieces,
dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rank
FROM (
SELECT seller_id,
product_id,
SUM(total_pieces_sold) AS total_pieces
FROM sales_table
GROUP BY seller_id,
product_id
)
'''sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))
# 定义窗口: 在卖方 ID 上对表进行分区,并依据销售的总块对每个组进行排序
window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())
# 利用 dense_rank 函数,依据下面的标准创立窗口
sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()
字符串
数据科学家在解决数据时面临的另一组十分常见的操作,包含从字符串中提取信息。当然,有很多 Spark API 能够对文本数据进行简直任何 (根本) 操作。
让咱们先从简略的 LIKE 运算符开始,而后再探讨正则表达式的用法。对于 API 的残缺列表,我将参考文档;上面是可能应用最多的 API。
Like
在上面的示例中,咱们心愿应用 sales 表来抉择 bill_raw_text 相似于“ab%cd%”的所有字符串(即,以字符串 ab 结尾,两头有一个字符串 cd。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
WHERE bill_raw_text LIKE 'ab%cd%'
'''
sales_table_execution_plan = sales_table.where(col('bill_raw_text').like("ab%cd%")
)
sales_table_execution_plan.show()
有时咱们想要找到的模式更简单,无奈用简略的通配符来表白。在这种状况下,咱们须要应用正则表达式。让咱们深入研究几个函数。在上面的示例中,咱们将始终利用雷同的正则表达式。
(ab[cd]{2,4})|(aa[abcde]{1,2})
Like 的正则表达式(Regex)
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT *
FROM sales_table
WHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})'
'''
sales_table_execution_plan = sales_table.where(col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})")
)
sales_table_execution_plan.show()
用正则表达式提取模式
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text,'(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_pattern
WHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "FROM sales_table'''
sales_table_execution_plan = sales_table.select(
# 最初一个整数示意要提取哪一组
regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")
).where(col("extracted_pattern") != "").distinct()
sales_table_execution_plan.show(100,False)
数组操作
数组是一种数据类型,。Spark 实现了很多函数来操作数组(精确地说,从 2.4 版开始就是这样)。让咱们深刻理解根本状况。
数组聚合
将列转换为数组与调用聚合函数一样简略。Spark 2.3 有两种次要的数组聚合函数 collect_set 和 collect_list:第一种只蕴含惟一的元素,而后一种只是将组转换为列表。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list,
seller_id
FROM sales_table
GROUP BY seller_id
'''sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg(collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"),
collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),
)
sales_table_execution_plan.show(10, True)
合成阵列
聚合的逆操作是“数组合成”,即从程度数组生成“垂直”列。为此,咱们能够应用 explode 函数。
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
'''
CREATE TABLE sales_table_aggregated AS
SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,
seller_id
FROM sales_table
GROUP BY seller_id;
SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_set
FROM sales_table_aggregated;
'''sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg(collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set")
)
sales_table_execution_exploded = sales_table_execution_aggregated.select(explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set")
)
sales_table_execution_exploded.show(10, True)
其余应用数组的操作(从 Spark 2.4 开始)
可怜的是,Spark 2.3 不反对对数组执行太多操作。侥幸的是,Spark 2.4 能够!Spark 2.4 之后提供的一些性能包含:
- array_except(array1,array2)- 返回 array1 中的元素数组,而不是 array2 中的元素,没有反复项。
- array_intersect(array1,array2)- 返回 array1 和 array2 相交的元素数组,不蕴含反复项。
- array_join(array,delimiter[,nullReplacement])- 应用分隔符和可选字符串连贯给定数组的元素。
- array_max(array)- 返回数组中的最大值。跳过空元素。
- array_min(array)- 返回数组中的最小值。跳过空元素。
- array_sort(array)- 按升序对输出数组进行排序。输出数组的元素必须是可排序的。空元素将放在返回数组的开端。
等等。以上定义间接取自参考文献。我倡议你查一下,以便有更多的细节!
UDFs
最初,用户定义函数。当咱们在默认的 api 中找不到转换时,udf 就是一种办法。
UDF 是一个定制函数,程序员能够像咱们目前看到的所有 api 一样定义并利用于列。它们提供了最大的灵活性 (咱们简直能够在其中编写任何代码);毛病是 Spark 将它们视为黑匣子,因而外部的 Spark 引擎优化器(Catalyst) 无奈进行任何优化:udf 可能会减慢咱们的代码速度。
作为一个示例,让咱们实现一个 UDF,它模仿函数 array_repeat(element,count),该函数返回一个蕴含元素 count 次的数组。
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
# 创立将在 UDF 中应用的函数
def array_repeat_custom(element, count):
list = ["{}".format(element) for x in range(0, count)]
return list
# 将函数转换为 UDF。指出 UDF 的返回类型是一种很好的做法
# 在本例中,返回类型是字符串数组
array_repeat_custom_udf = udf(array_repeat_custom, ArrayType(StringType()))
# 以 Parquet 格局读取源表
sales_table = spark.read.parquet("./data/sales_parquet")
# 调用 UDF
sales_table_execution_plan = sales_table.select(array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array")
)
sales_table_execution_plan.show()
除了 UDF 的语法之外,我倡议你关注下面应用的 lit 函数。有些 Spark 函数只承受列作为输出:如果须要应用常量,则可能须要将该常量转换为“列”。lit 函数会创立一列文字值。
下一步
我心愿我可能证实 Spark 并不比 SQL 更难,他们基本上是一样的。
你能够设想,这篇文章的题目有点夸大:实际上精通这个工具须要 15 分钟以上的工夫;但我置信以上是一个很好的疾速入门!
我的倡议是开始应用下面的 api,因为它们将笼罩 70% 的用例。当你对基础知识有信念时,我倡议你写上面两篇文章,那是一位值得信赖的作者 (lol) 几个月前写的。第一个问题将挑战你在应用此工具进行开发时遇到的一些经典问题,而第二个问题是对 Spark Joins 的深入研究。
https://towardsdatascience.co…
https://towardsdatascience.co…
附录 - 配置 PyCharm
在本地 (非分布式) 环境中装置 Spark 是一项非常简单的工作。在本附录中,我将向你展现 PyCharm Community Edition 的根本配置,以便应用 Python 运行 Spark。有五个简略步骤:
- 下载 PyCharm 社区版
- 下载 Spark
- 装置 PySpark
- 配置 PyCharm 以执行正确的 Spark executor
- 测试是否一切正常
两个注意事项:
- 我假如你的零碎中正确装置了 Java。
- 在 Windows 上,须要装置 Winutils,这是运行 Hadoop 所需的一组二进制文件。查看此 Git repo 理解更多信息:https://github.com/stevelough…。
下载 PyCharm 社区版
侥幸的是,JetBrains 有一个 PyCharm 的开源版本。咱们能够简略地从他们的网站下载最新版本。装置很简略。
下载 Spark
咱们只须要从 Spark 官方网站下载一个压缩文件。在我写作时,有两个次要版本可用:3.0.1 和 2.4.7。对于文章的范畴,咱们能够抉择其中之一。
一旦下载实现,咱们只须要在一个适合的地位解压包。
装置 PySpark
当初是运行 PyCharm 并装置所需的所有软件包的时候了。首先,让咱们关上 PyCharm,创立一个新我的项目和一个新的虚拟环境。
最初,间接从 PyCharm 装置 PySpark:
留神,为了启用提醒,咱们还应该装置 pyspark-stubs 包。
配置 PyCharm 以执行正确的 Spark executor
心愿咱们没有呈现任何谬误,所以咱们只须要批示 PyCharm 运行正确的 Spark 执行器。它位于咱们解压缩 Spark 自身的文件夹中。让咱们为 PyCharm 我的项目创立一个运行配置。
测试是否一切正常
要测试 Spark 是否失常工作,只需运行以下代码片段
# 导入库
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 初始化 Spark 会话
spark = SparkSession.builder \
.master("local") \
.appName("SparkLikeABoss") \
.getOrCreate()
print(spark.version)
原文链接:https://towardsdatascience.co…
欢送关注磐创 AI 博客站:
http://panchuang.net/
sklearn 机器学习中文官网文档:
http://sklearn123.com/
欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/