作者|Andrea Ialenti
编译|VK
起源|Towards Datas Science

正如在我简直所有对于这个工具的文章中都写到,Spark和SQL一样非常容易应用。但不论我花多少工夫写代码,我只是无奈在我的大脑中永久性地存储Spark API(有人会说我的记忆就像RAM一样,小而易失)。

无论你是想疾速入门介绍sparksql,还是急于编写你的程序,还是像我一样须要一份备忘单,我置信你会发现这篇文章很有用。

这篇文章的目标是介绍sparksql的所有次要函数/个性,在片段中,你将始终看到原始的SQL查问及其在PySpark中的翻译。

我将在这个数据集上执行我的代码:https://drive.google.com/file...

在几个月前,我为另一篇文章创立了这个数据集,它由三个简略的表组成:

基础知识

Apache Spark是一个用于大规模并行数据处理的引擎。这个框架的一个令人惊奇的个性是它以多种语言公开api:我通常应用Scala与它交互,然而也能够应用SQL、Python甚至Java和R。

当咱们编写Spark程序时,首先要晓得的是,当咱们执行代码时,咱们不肯定要对数据执行任何操作。实际上,该工具有两种类型的API调用:转换和操作。

Spark转换背地的范例被称为“延后计算”,这意味着理论的数据计算在咱们要求采取行动之前不会开始。

为了了解这一概念,构想一下你须要对一个列执行SELECT和重命名的状况:如果不调用某个操作(例如collect或count),那么你的代码只不过是定义了所谓的Spark执行打算。

Spark以有向无环图(十分驰名的DAG)组织执行打算。此构造形容将要执行的确切操作,并使调度器可能决定在给定工夫执行哪个工作。

正如Miyagi学生通知咱们的:

  1. 上蜡:定义DAG(变换)
  2. 脱蜡:执行DAG(动作)

与Spark交互

太好了,咱们从哪里开始交互?应用Spark有多种办法:

  • 应用IDE:我倡议应用IntelliJ或PyCharm,但我想你能够抉择任何你想要的货色。查看附录中的PyCharm疾速入门(在本地运行查问)。我认为能够从你的本地环境应用近程Spark executor,但说实话,我素来没有进行过这种配置。
  • Jupyter Notebooks+Sparkmagic:Sparkmagic是一组工具,用于通过Spark REST服务器Livy与近程Spark集群交互工作[1]。这是在AWS、Azure或googlecloud等云零碎上工作时应用Spark的次要形式。大多数云提供商都有一项服务,能够在大概10分钟内配置集群和notebooks 。
  • 通过应用spark shell的终端:有时你不心愿在你和数据之间有任何货色(例如,对一个表进行超级疾速的查看);在这种状况下,你只需关上一个终端并启动spark shell。

文章的代码次要用于IDE。

在编写任何查问之前,咱们须要导入一些库并启动一个Spark会话(应用DatasetDataFrame 的API编程)。上面的PySpark和Scala代码段将加载你须要的所有内容(假如你曾经配置了零碎)。之后,为了简略起见,咱们将只看到PySpark代码。除了一些细微差别外,scalaapi基本相同。

PySpark

# 导入Sparkimport pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *# 初始化Spark会话spark = SparkSession.builder \    .master("local") \    .appName("SparkLikeABoss") \    .getOrCreate()

Scala

//  导入Sparkimport org.apache.spark.sql._import org.apache.spark.sql.functions._//  初始化Spark会话val spark = SparkSession.builder.      master("local")      .appName("spark session example")      .getOrCreate()

解释数据集、数据帧和RDD之间的差别篇幅将过长,所以我跳过这一部分,伪装它不存在。

基本操作

你能写的最简略的查问可能是你所用过的最重要的查问。让咱们看看如何应用Sales表进行基本操作。

简略的Select语句和显示数据

#  以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *FROM sales_table'''#   执行打算sales_table_execution_plan = sales_table.select(col("*"))#   Show (Action) - 显示5行,列宽不受限制sales_table_execution_plan.show(5, True)
#  以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id AS the_order_id,       seller_id AS the_seller_id,       num_pieces_sold AS the_number_of_pieces_soldFROM sales_table'''#  以一行代码执行打算和显示进去sales_table_execution_plan = sales_table.select(    col("order_id").alias("the_order_id"),    col("seller_id").alias("the_seller_id"),    col("num_pieces_sold").alias("the_number_of_pieces_sold")).show(5, True)

咱们在代码片段中所做的第一件事是定义执行打算;只有当咱们取得show操作时,才会执行该打算。

咱们能够在Spark打算中调用的其余操作示例包含:

  • collect()—返回整个数据集
  • count()—返回行数
  • take(n)-从数据集中返回n行
  • show(n,truncate=False)-显示n行。你能够决定截断后果或显示字段的所有长度

另一个值得注意的乏味的事件是列是由col对象标识的。在本例中,咱们让Spark推断这些列属于哪个数据帧。

咱们能够应用语法execution_plan_variable[“column_name”]来指定列来自哪个执行打算。应用此代替语法,咱们能够失去:

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id AS the_order_id,       seller_id AS the_seller_id,       num_pieces_sold AS the_number_of_pieces_soldFROM sales_table'''#  以一行代码执行打算和显示进去sales_table_execution_plan = sales_table.select(    sales_table["order_id"].alias("the_order_id"),    sales_table["seller_id"].alias("the_seller_id"),    sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")).show(5, True)

在解决连贯时,限定字段的源表尤为重要(例如,两个表可能有两个同名字段,因而仅应用col对象不足以打消歧义)。Scala中的语法略有不同:

// Qualify the source execution plan in Scalasales_table.col("order_id")

重命名和增加列

有时咱们只想重命名一个列,或者咱们想增加一个新的列并进行一些计算(例如,在以下状况下):

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id,       product_id,       seller_id,       date,       num_pieces_sold AS pieces,       bill_raw_textFROM sales_table a'''sales_table_execution_plan = sales_table. \    withColumnRenamed("num_pieces_sold", "pieces")sales_table_execution_plan.show()
#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id,       product_id,       seller_id,       date,       num_pieces_sold,       bill_raw_text,       num_pieces_sold % 2 AS num_pieces_sold_is_evenFROM sales_table a'''sales_table_execution_plan = sales_table. \    withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)sales_table_execution_plan.show()

简略聚合

Spark反对所有次要的聚合函数。以下示例仅指“简略”的示例(例如平均值、总和、计数等)。稍后将介绍数组的聚合。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT product_id,       SUM(num_pieces_sold) AS total_pieces_sold,       AVG(num_pieces_sold) AS average_pieces_sold,       MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,       MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,       COUNT(num_pieces_sold) AS num_times_product_soldFROM sales_tableGROUP BY product_id'''sales_table_execution_plan = sales_table.groupBy(    col("product_id")).agg(    sum("num_pieces_sold").alias("total_pieces_sold"),    avg("num_pieces_sold").alias("average_pieces_sold"),    max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),    min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),    count("num_pieces_sold").alias("num_times_product_sold"))sales_table_execution_plan.show()

显示架构

显示命令的“table”有点误导人;更准确的定义是“显示执行打算”。应用Spark API,咱们能够一个接一个地传递多个操作;应用printSchema API,如果在磁盘上写入执行打算的后果,咱们将输入最终表的样子。

在上面的示例中,咱们重命名一些列,进行聚合,而后增加另一列。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''-- 创立一个长期表,进行一些重命名CREATE TABLE temp_1 ASSELECT seller_id AS the_seller,       num_pieces_sold AS pieces,       product_idFROM sales_table;--对新表进行聚合CREATE TABLE temp_2 ASSELECT product_id,       SUM(pieces) AS total_piecesFROM temp_1GROUP BY product_id;-- 增加列SELECT a.*,       1 AS fake_columnFROM temp2 a;'''sales_table_execution_plan = sales_table. \    withColumnRenamed("seller_id", "the_seller"). \    withColumnRenamed("num_pieces_sold", "pieces").\groupBy(    col("product_id")).agg(    sum("pieces").alias("total_pieces")).withColumn("fake_column", lit(1))#   输入 Schemasales_table_execution_plan.printSchema()

printSchema的输入是:

root |-- product_id: string (nullable = true) |-- total_pieces: double (nullable = true) |-- fake_column: integer (nullable = false)

请留神,printSchema不会触发操作;相同,Spark会评估执行打算,以理解DAG在输入列中的地位。因为这个起因,这个操作比show快得多,show会触发DAG的执行。

解释执行打算

能够通过explain API取得无关触发操作时引擎将执行的操作的更具体的阐明。在这种状况下,咱们将取得Spark将执行的操作的具体阐明。让咱们对上一个查问调用explain:

#   输入 Schemasales_table_execution_plan.printSchema()
== Physical Plan ==*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])+- Exchange hashpartitioning(product_id#361, 200)   +- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))])      +- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379]         +- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>

诚实说,我素来没有发现explain API太有用,尤其是当DAG开始变得宏大和简单时。在Spark UI中能够找到一个更好的视图,它公开了雷同信息的图形示意。

Select Distinct

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT DISTINCT seller_id,       dateFROM sales_table'''sales_table_execution_plan = sales_table.select(    col("seller_id"), col("date")).distinct()#   输入 Schemasales_table_execution_plan.show()

Case When

在Spark中很好地实现了该操作(不须要非凡的udf);让咱们简略地用sales_table将每一行插入到不同的bucket中,具体取决于num_pieces_selled:

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT seller_id,       CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30',            WHEN num_pieces_sold < 60 THEN 'Between 31 and 60'            WHEN num_pieces_sold < 90 THEN 'Between 61 and 90'            ELSE 'More than 91' AS sales_bucketFROM sales_table'''sales_table_execution_plan = sales_table.select(    col("seller_id"),    when(col("num_pieces_sold") < 30, "Lower than 30").    when(col("num_pieces_sold") < 60, "Between 31 and 60").    when(col("num_pieces_sold") < 90, "Between 61 and 90").    otherwise("More than 91").alias("sales_bucket"))sales_table_execution_plan.show()

Union All

有时咱们须要将流分成多个局部,而后将所有内容合并到一个表中;在SQL中,这是用UNION ALL示意的。在spark2.1中,在执行union all操作之前必须对列进行排序。

侥幸的是,spark2.3应用列名来对齐合并的执行打算。在上面的示例中,咱们首先将表拆分为两局部,而后将这些局部合并在一起(齐全没有必要,但它将演示如何应用API):

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''CREATE TABLE part_1 ASSELECT *FROM sales_tableWHERE num_pieces_sold > 50;CREATE TABLE part_2 ASSELECT *FROM sales_tableWHERE num_pieces_sold <= 50;SELECT *FROM part_1 UNION ALLSELECT *FROM part_2'''#   拆散part1sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)#   拆散part2sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)#   合并sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)sales_table_execution_plan.explain()

让咱们看看解释,看看幕后产生了什么:

Union:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]:  +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50)):     +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]   +- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50))      +- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...

表正在合并。

Spark的Join

当代码呈现性能问题时,连贯通常是咱们首先要查看的中央。Spark引擎在并行化非连贯操作方面相当杰出,但在连贯工作时可能须要进行调整。

我写了一整篇对于这个主题的文章,所以我不会再深刻探讨这个问题:如果你想晓得更多,或者你遇到了一些连贯性能问题,我倡议你看看:https://towardsdatascience.co...

同时,这里是连贯的语法。在示例中,咱们将连贯Sales和Sellers表。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT a.*,       b.*FROM sales_table a    LEFT JOIN sellers_table b        ON a.seller_id = b.seller_id'''#   左连贯left_join_execution_plan = sales_table.join(sellers_table,                    on=sales_table["seller_id"] == sellers_table["seller_id"],                    how="left")#   内连贯inner_join_execution_plan = sales_table.join(sellers_table,                    on=sales_table["seller_id"] == sellers_table["seller_id"],                    how="inner")#   右连贯right_join_execution_plan = sales_table.join(sellers_table,                    on=sales_table["seller_id"] == sellers_table["seller_id"],                    how="right")#  全外连贯full_outer_join_execution_plan = sales_table.join(sellers_table,                    on=sales_table["seller_id"] == sellers_table["seller_id"],                    how="full_outer")

除了传统的连贯类型(左、右、内、穿插等),Spark还反对半连贯和反连贯;这两个基本上是在Spark中示意操作和不示意操作的一种形式:

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT *FROM sales_tableWHERE seller_id IN (SELECT seller_id FROM sellers_table)'''# 左半连贯是在SQL中示意IN操作的一种形式semi_join_execution_plan = sales_table.join(sellers_table,                 on=sales_table["seller_id"] == sellers_table["seller_id"],                 how="left_semi")semi_join_execution_plan.show()
#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT *FROM sales_tableWHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)'''# 左反连贯是在SQL中示意NOT IN操作的一种形式anti_join_execution_plan = sales_table.join(sellers_table,                on=sales_table["seller_id"] == sellers_table["seller_id"],                how="left_anti")anti_join_execution_plan.show()

Window函数

window函数对定义为frame或window的特定行子集执行计算。典型的例子是子群的排序。在咱们的玩具数据集中,假如咱们想晓得,对于每个卖家来说,什么是销售最多的产品。要提取这些信息,咱们须要:

  1. 定义咱们将利用排序函数的“分区”:咱们须要对每个卖家的产品执行一次排序操作
  2. 利用咱们的首选排序函数:dense_rank, `rank, row_number`。上面是Spark中的窗口函数列表。

下图是咱们心愿如何分区数据的示例:

#   导入 Windowfrom pyspark.sql.window import Window#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT seller_id,       product_id,       total_pieces,       dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rankFROM (    SELECT seller_id,           product_id,           SUM(total_pieces_sold) AS total_pieces    FROM sales_table    GROUP BY seller_id,           product_id)'''sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))#  定义窗口:在卖方ID上对表进行分区,并依据销售的总块对每个组进行排序window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())# 利用dense_rank函数,依据下面的标准创立窗口sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()

字符串

数据科学家在解决数据时面临的另一组十分常见的操作,包含从字符串中提取信息。当然,有很多Spark API能够对文本数据进行简直任何(根本)操作。

让咱们先从简略的LIKE运算符开始,而后再探讨正则表达式的用法。对于API的残缺列表,我将参考文档;上面是可能应用最多的API。

Like

在上面的示例中,咱们心愿应用sales表来抉择bill_raw_text相似于“ab%cd%”的所有字符串(即,以字符串ab结尾,两头有一个字符串cd。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *WHERE bill_raw_text LIKE 'ab%cd%''''sales_table_execution_plan = sales_table.where(    col('bill_raw_text').like("ab%cd%"))sales_table_execution_plan.show()

有时咱们想要找到的模式更简单,无奈用简略的通配符来表白。在这种状况下,咱们须要应用正则表达式。让咱们深入研究几个函数。在上面的示例中,咱们将始终利用雷同的正则表达式。

(ab[cd]{2,4})|(aa[abcde]{1,2})

Like的正则表达式(Regex)

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *FROM sales_tableWHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})''''sales_table_execution_plan = sales_table.where(    col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})"))sales_table_execution_plan.show()

用正则表达式提取模式

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_patternWHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "FROM sales_table'''sales_table_execution_plan = sales_table.select(    #  最初一个整数示意要提取哪一组    regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")).where(col("extracted_pattern") != "").distinct()sales_table_execution_plan.show(100,False)

数组操作

数组是一种数据类型,。Spark实现了很多函数来操作数组(精确地说,从2.4版开始就是这样)。让咱们深刻理解根本状况。

数组聚合

将列转换为数组与调用聚合函数一样简略。Spark 2.3有两种次要的数组聚合函数collect_set和collect_list:第一种只蕴含惟一的元素,而后一种只是将组转换为列表。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,       COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list,       seller_idFROM sales_tableGROUP BY seller_id'''sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg(    collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"),    collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),)sales_table_execution_plan.show(10, True)

合成阵列

聚合的逆操作是“数组合成”,即从程度数组生成“垂直”列。为此,咱们能够应用explode函数。

#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''CREATE TABLE sales_table_aggregated ASSELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set,       seller_idFROM sales_tableGROUP BY seller_id;SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_setFROM sales_table_aggregated;'''sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg(    collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"))sales_table_execution_exploded = sales_table_execution_aggregated.select(    explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set"))sales_table_execution_exploded.show(10, True)

其余应用数组的操作(从Spark 2.4开始)

可怜的是,Spark 2.3不反对对数组执行太多操作。侥幸的是,Spark 2.4能够!Spark 2.4之后提供的一些性能包含:

  • array_except(array1,array2)-返回array1中的元素数组,而不是array2中的元素,没有反复项。
  • array_intersect(array1,array2)-返回array1和array2相交的元素数组,不蕴含反复项。
  • array_join(array,delimiter[,nullReplacement])-应用分隔符和可选字符串连贯给定数组的元素。
  • array_max(array)-返回数组中的最大值。跳过空元素。
  • array_min(array)-返回数组中的最小值。跳过空元素。
  • array_sort(array)-按升序对输出数组进行排序。输出数组的元素必须是可排序的。空元素将放在返回数组的开端。

等等。以上定义间接取自参考文献。我倡议你查一下,以便有更多的细节!

UDFs

最初,用户定义函数。当咱们在默认的api中找不到转换时,udf就是一种办法。

UDF是一个定制函数,程序员能够像咱们目前看到的所有api一样定义并利用于列。它们提供了最大的灵活性(咱们简直能够在其中编写任何代码);毛病是Spark将它们视为黑匣子,因而外部的Spark引擎优化器(Catalyst)无奈进行任何优化:udf可能会减慢咱们的代码速度。

作为一个示例,让咱们实现一个UDF,它模仿函数array_repeat(element,count),该函数返回一个蕴含元素count次的数组。

from pyspark.sql.functions import udffrom pyspark.sql.types import ArrayType, StringType#  创立将在UDF中应用的函数def array_repeat_custom(element, count):  list = ["{}".format(element) for x in range(0, count)]  return list#   将函数转换为UDF。指出UDF的返回类型是一种很好的做法#   在本例中,返回类型是字符串数组array_repeat_custom_udf = udf(array_repeat_custom,  ArrayType(StringType()))#   以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")#   调用UDFsales_table_execution_plan = sales_table.select(    array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array"))sales_table_execution_plan.show()

除了UDF的语法之外,我倡议你关注下面应用的lit函数。有些Spark函数只承受列作为输出:如果须要应用常量,则可能须要将该常量转换为“列”。lit函数会创立一列文字值。

下一步

我心愿我可能证实Spark并不比SQL更难,他们基本上是一样的。

你能够设想,这篇文章的题目有点夸大:实际上精通这个工具须要15分钟以上的工夫;但我置信以上是一个很好的疾速入门!

我的倡议是开始应用下面的api,因为它们将笼罩70%的用例。当你对基础知识有信念时,我倡议你写上面两篇文章,那是一位值得信赖的作者(lol)几个月前写的。第一个问题将挑战你在应用此工具进行开发时遇到的一些经典问题,而第二个问题是对Spark Joins的深入研究。

https://towardsdatascience.co...

https://towardsdatascience.co...

附录-配置PyCharm

在本地(非分布式)环境中装置Spark是一项非常简单的工作。在本附录中,我将向你展现PyCharm Community Edition的根本配置,以便应用Python运行Spark。有五个简略步骤:

  1. 下载PyCharm社区版
  2. 下载Spark
  3. 装置PySpark
  4. 配置PyCharm以执行正确的Spark executor
  5. 测试是否一切正常

两个注意事项:

  • 我假如你的零碎中正确装置了Java。
  • 在Windows上,须要装置Winutils,这是运行Hadoop所需的一组二进制文件。查看此Git repo理解更多信息:https://github.com/stevelough...。

下载PyCharm社区版

侥幸的是,JetBrains有一个PyCharm的开源版本。咱们能够简略地从他们的网站下载最新版本。装置很简略。

下载Spark

咱们只须要从Spark官方网站下载一个压缩文件。在我写作时,有两个次要版本可用:3.0.1和2.4.7。对于文章的范畴,咱们能够抉择其中之一。

一旦下载实现,咱们只须要在一个适合的地位解压包。

装置PySpark

当初是运行PyCharm并装置所需的所有软件包的时候了。首先,让咱们关上PyCharm,创立一个新我的项目和一个新的虚拟环境。

最初,间接从PyCharm装置PySpark:

留神,为了启用提醒,咱们还应该装置pyspark-stubs包。

配置PyCharm以执行正确的Spark executor

心愿咱们没有呈现任何谬误,所以咱们只须要批示PyCharm运行正确的Spark执行器。它位于咱们解压缩Spark自身的文件夹中。让咱们为PyCharm我的项目创立一个运行配置。

测试是否一切正常

要测试Spark是否失常工作,只需运行以下代码片段

# 导入库import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *# 初始化Spark会话spark = SparkSession.builder \    .master("local") \    .appName("SparkLikeABoss") \    .getOrCreate()print(spark.version)

原文链接:https://towardsdatascience.co...

欢送关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/