作者|Andrea Ialenti
编译|VK
起源|Towards Datas Science
正如在我简直所有对于这个工具的文章中都写到,Spark和SQL一样非常容易应用。但不论我花多少工夫写代码,我只是无奈在我的大脑中永久性地存储Spark API(有人会说我的记忆就像RAM一样,小而易失)。
无论你是想疾速入门介绍sparksql,还是急于编写你的程序,还是像我一样须要一份备忘单,我置信你会发现这篇文章很有用。
这篇文章的目标是介绍sparksql的所有次要函数/个性,在片段中,你将始终看到原始的SQL查问及其在PySpark中的翻译。
我将在这个数据集上执行我的代码:https://drive.google.com/file...
在几个月前,我为另一篇文章创立了这个数据集,它由三个简略的表组成:
基础知识
Apache Spark是一个用于大规模并行数据处理的引擎。这个框架的一个令人惊奇的个性是它以多种语言公开api:我通常应用Scala与它交互,然而也能够应用SQL、Python甚至Java和R。
当咱们编写Spark程序时,首先要晓得的是,当咱们执行代码时,咱们不肯定要对数据执行任何操作。实际上,该工具有两种类型的API调用:转换和操作。
Spark转换背地的范例被称为“延后计算”,这意味着理论的数据计算在咱们要求采取行动之前不会开始。
为了了解这一概念,构想一下你须要对一个列执行SELECT和重命名的状况:如果不调用某个操作(例如collect或count),那么你的代码只不过是定义了所谓的Spark执行打算。
Spark以有向无环图(十分驰名的DAG)组织执行打算。此构造形容将要执行的确切操作,并使调度器可能决定在给定工夫执行哪个工作。
正如Miyagi学生通知咱们的:
- 上蜡:定义DAG(变换)
- 脱蜡:执行DAG(动作)
与Spark交互
太好了,咱们从哪里开始交互?应用Spark有多种办法:
- 应用IDE:我倡议应用IntelliJ或PyCharm,但我想你能够抉择任何你想要的货色。查看附录中的PyCharm疾速入门(在本地运行查问)。我认为能够从你的本地环境应用近程Spark executor,但说实话,我素来没有进行过这种配置。
- Jupyter Notebooks+Sparkmagic:Sparkmagic是一组工具,用于通过Spark REST服务器Livy与近程Spark集群交互工作[1]。这是在AWS、Azure或googlecloud等云零碎上工作时应用Spark的次要形式。大多数云提供商都有一项服务,能够在大概10分钟内配置集群和notebooks 。
- 通过应用spark shell的终端:有时你不心愿在你和数据之间有任何货色(例如,对一个表进行超级疾速的查看);在这种状况下,你只需关上一个终端并启动spark shell。
文章的代码次要用于IDE。
在编写任何查问之前,咱们须要导入一些库并启动一个Spark会话(应用Dataset和DataFrame 的API编程)。上面的PySpark和Scala代码段将加载你须要的所有内容(假如你曾经配置了零碎)。之后,为了简略起见,咱们将只看到PySpark代码。除了一些细微差别外,scalaapi基本相同。
PySpark
# 导入Sparkimport pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *# 初始化Spark会话spark = SparkSession.builder \ .master("local") \ .appName("SparkLikeABoss") \ .getOrCreate()
Scala
// 导入Sparkimport org.apache.spark.sql._import org.apache.spark.sql.functions._// 初始化Spark会话val spark = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate()
解释数据集、数据帧和RDD之间的差别篇幅将过长,所以我跳过这一部分,伪装它不存在。
基本操作
你能写的最简略的查问可能是你所用过的最重要的查问。让咱们看看如何应用Sales表进行基本操作。
简略的Select语句和显示数据
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *FROM sales_table'''# 执行打算sales_table_execution_plan = sales_table.select(col("*"))# Show (Action) - 显示5行,列宽不受限制sales_table_execution_plan.show(5, True)
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id AS the_order_id, seller_id AS the_seller_id, num_pieces_sold AS the_number_of_pieces_soldFROM sales_table'''# 以一行代码执行打算和显示进去sales_table_execution_plan = sales_table.select( col("order_id").alias("the_order_id"), col("seller_id").alias("the_seller_id"), col("num_pieces_sold").alias("the_number_of_pieces_sold")).show(5, True)
咱们在代码片段中所做的第一件事是定义执行打算;只有当咱们取得show操作时,才会执行该打算。
咱们能够在Spark打算中调用的其余操作示例包含:
- collect()—返回整个数据集
- count()—返回行数
- take(n)-从数据集中返回n行
- show(n,truncate=False)-显示n行。你能够决定截断后果或显示字段的所有长度
另一个值得注意的乏味的事件是列是由col对象标识的。在本例中,咱们让Spark推断这些列属于哪个数据帧。
咱们能够应用语法execution_plan_variable[“column_name”]来指定列来自哪个执行打算。应用此代替语法,咱们能够失去:
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id AS the_order_id, seller_id AS the_seller_id, num_pieces_sold AS the_number_of_pieces_soldFROM sales_table'''# 以一行代码执行打算和显示进去sales_table_execution_plan = sales_table.select( sales_table["order_id"].alias("the_order_id"), sales_table["seller_id"].alias("the_seller_id"), sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")).show(5, True)
在解决连贯时,限定字段的源表尤为重要(例如,两个表可能有两个同名字段,因而仅应用col对象不足以打消歧义)。Scala中的语法略有不同:
// Qualify the source execution plan in Scalasales_table.col("order_id")
重命名和增加列
有时咱们只想重命名一个列,或者咱们想增加一个新的列并进行一些计算(例如,在以下状况下):
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id, product_id, seller_id, date, num_pieces_sold AS pieces, bill_raw_textFROM sales_table a'''sales_table_execution_plan = sales_table. \ withColumnRenamed("num_pieces_sold", "pieces")sales_table_execution_plan.show()
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT order_id, product_id, seller_id, date, num_pieces_sold, bill_raw_text, num_pieces_sold % 2 AS num_pieces_sold_is_evenFROM sales_table a'''sales_table_execution_plan = sales_table. \ withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)sales_table_execution_plan.show()
简略聚合
Spark反对所有次要的聚合函数。以下示例仅指“简略”的示例(例如平均值、总和、计数等)。稍后将介绍数组的聚合。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT product_id, SUM(num_pieces_sold) AS total_pieces_sold, AVG(num_pieces_sold) AS average_pieces_sold, MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders, MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders, COUNT(num_pieces_sold) AS num_times_product_soldFROM sales_tableGROUP BY product_id'''sales_table_execution_plan = sales_table.groupBy( col("product_id")).agg( sum("num_pieces_sold").alias("total_pieces_sold"), avg("num_pieces_sold").alias("average_pieces_sold"), max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"), min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"), count("num_pieces_sold").alias("num_times_product_sold"))sales_table_execution_plan.show()
显示架构
显示命令的“table”有点误导人;更准确的定义是“显示执行打算”。应用Spark API,咱们能够一个接一个地传递多个操作;应用printSchema API,如果在磁盘上写入执行打算的后果,咱们将输入最终表的样子。
在上面的示例中,咱们重命名一些列,进行聚合,而后增加另一列。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''-- 创立一个长期表,进行一些重命名CREATE TABLE temp_1 ASSELECT seller_id AS the_seller, num_pieces_sold AS pieces, product_idFROM sales_table;--对新表进行聚合CREATE TABLE temp_2 ASSELECT product_id, SUM(pieces) AS total_piecesFROM temp_1GROUP BY product_id;-- 增加列SELECT a.*, 1 AS fake_columnFROM temp2 a;'''sales_table_execution_plan = sales_table. \ withColumnRenamed("seller_id", "the_seller"). \ withColumnRenamed("num_pieces_sold", "pieces").\groupBy( col("product_id")).agg( sum("pieces").alias("total_pieces")).withColumn("fake_column", lit(1))# 输入 Schemasales_table_execution_plan.printSchema()
printSchema的输入是:
root |-- product_id: string (nullable = true) |-- total_pieces: double (nullable = true) |-- fake_column: integer (nullable = false)
请留神,printSchema不会触发操作;相同,Spark会评估执行打算,以理解DAG在输入列中的地位。因为这个起因,这个操作比show快得多,show会触发DAG的执行。
解释执行打算
能够通过explain API取得无关触发操作时引擎将执行的操作的更具体的阐明。在这种状况下,咱们将取得Spark将执行的操作的具体阐明。让咱们对上一个查问调用explain:
# 输入 Schemasales_table_execution_plan.printSchema()
== Physical Plan ==*(2) HashAggregate(keys=[product_id#361], functions=[sum(cast(pieces#379 as double))])+- Exchange hashpartitioning(product_id#361, 200) +- *(1) HashAggregate(keys=[product_id#361], functions=[partial_sum(cast(pieces#379 as double))]) +- *(1) Project [product_id#361, num_pieces_sold#364 AS pieces#379] +- *(1) FileScan parquet [product_id#361,num_pieces_sold#364] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<PATH_TO_FILE>/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,num_pieces_sold:string>
诚实说,我素来没有发现explain API太有用,尤其是当DAG开始变得宏大和简单时。在Spark UI中能够找到一个更好的视图,它公开了雷同信息的图形示意。
Select Distinct
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT DISTINCT seller_id, dateFROM sales_table'''sales_table_execution_plan = sales_table.select( col("seller_id"), col("date")).distinct()# 输入 Schemasales_table_execution_plan.show()
Case When
在Spark中很好地实现了该操作(不须要非凡的udf);让咱们简略地用sales_table将每一行插入到不同的bucket中,具体取决于num_pieces_selled:
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT seller_id, CASE WHEN num_pieces_sold < 30 THEN 'Lower than 30', WHEN num_pieces_sold < 60 THEN 'Between 31 and 60' WHEN num_pieces_sold < 90 THEN 'Between 61 and 90' ELSE 'More than 91' AS sales_bucketFROM sales_table'''sales_table_execution_plan = sales_table.select( col("seller_id"), when(col("num_pieces_sold") < 30, "Lower than 30"). when(col("num_pieces_sold") < 60, "Between 31 and 60"). when(col("num_pieces_sold") < 90, "Between 61 and 90"). otherwise("More than 91").alias("sales_bucket"))sales_table_execution_plan.show()
Union All
有时咱们须要将流分成多个局部,而后将所有内容合并到一个表中;在SQL中,这是用UNION ALL示意的。在spark2.1中,在执行union all操作之前必须对列进行排序。
侥幸的是,spark2.3应用列名来对齐合并的执行打算。在上面的示例中,咱们首先将表拆分为两局部,而后将这些局部合并在一起(齐全没有必要,但它将演示如何应用API):
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''CREATE TABLE part_1 ASSELECT *FROM sales_tableWHERE num_pieces_sold > 50;CREATE TABLE part_2 ASSELECT *FROM sales_tableWHERE num_pieces_sold <= 50;SELECT *FROM part_1 UNION ALLSELECT *FROM part_2'''# 拆散part1sales_table_execution_plan_part_1 = sales_table.where(col("num_pieces_sold") > 50)# 拆散part2sales_table_execution_plan_part_2 = sales_table.where(col("num_pieces_sold") <= 50)# 合并sales_table_execution_plan = sales_table_execution_plan_part_1.unionByName(sales_table_execution_plan_part_2)sales_table_execution_plan.explain()
让咱们看看解释,看看幕后产生了什么:
Union:- *(1) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488]: +- *(1) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) > 50)): +- *(1) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...+- *(2) Project [order_id#483, product_id#484, seller_id#485, date#486, num_pieces_sold#487, bill_raw_text#488] +- *(2) Filter (isnotnull(num_pieces_sold#487) && (cast(num_pieces_sold#487 as int) <= 50)) +- *(2) FileScan parquet [order_id#483,product_id#484,seller_id#485,date#486,num_pieces_sold#487,bill_raw_text#488] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:<FILE_PATH>/sales_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(num_pieces_sold)], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...
表正在合并。
Spark的Join
当代码呈现性能问题时,连贯通常是咱们首先要查看的中央。Spark引擎在并行化非连贯操作方面相当杰出,但在连贯工作时可能须要进行调整。
我写了一整篇对于这个主题的文章,所以我不会再深刻探讨这个问题:如果你想晓得更多,或者你遇到了一些连贯性能问题,我倡议你看看:https://towardsdatascience.co...
同时,这里是连贯的语法。在示例中,咱们将连贯Sales和Sellers表。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT a.*, b.*FROM sales_table a LEFT JOIN sellers_table b ON a.seller_id = b.seller_id'''# 左连贯left_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="left")# 内连贯inner_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="inner")# 右连贯right_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="right")# 全外连贯full_outer_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="full_outer")
除了传统的连贯类型(左、右、内、穿插等),Spark还反对半连贯和反连贯;这两个基本上是在Spark中示意操作和不示意操作的一种形式:
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT *FROM sales_tableWHERE seller_id IN (SELECT seller_id FROM sellers_table)'''# 左半连贯是在SQL中示意IN操作的一种形式semi_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="left_semi")semi_join_execution_plan.show()
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")sellers_table = spark.read.parquet("./data/sellers_parquet")'''SELECT *FROM sales_tableWHERE seller_id NOT IN (SELECT seller_id FROM sellers_table)'''# 左反连贯是在SQL中示意NOT IN操作的一种形式anti_join_execution_plan = sales_table.join(sellers_table, on=sales_table["seller_id"] == sellers_table["seller_id"], how="left_anti")anti_join_execution_plan.show()
Window函数
window函数对定义为frame或window的特定行子集执行计算。典型的例子是子群的排序。在咱们的玩具数据集中,假如咱们想晓得,对于每个卖家来说,什么是销售最多的产品。要提取这些信息,咱们须要:
- 定义咱们将利用排序函数的“分区”:咱们须要对每个卖家的产品执行一次排序操作
- 利用咱们的首选排序函数:dense_rank
,
`rank,
row_number`。上面是Spark中的窗口函数列表。
下图是咱们心愿如何分区数据的示例:
# 导入 Windowfrom pyspark.sql.window import Window# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT seller_id, product_id, total_pieces, dense_rank() OVER (PARTITION BY seller_id ORDER BY total_pieces DESC) as rankFROM ( SELECT seller_id, product_id, SUM(total_pieces_sold) AS total_pieces FROM sales_table GROUP BY seller_id, product_id)'''sales_table_agg = sales_table.groupBy(col("seller_id"), col("product_id")).agg(sum("num_pieces_sold").alias("total_pieces"))# 定义窗口:在卖方ID上对表进行分区,并依据销售的总块对每个组进行排序window_specifications = Window.partitionBy(col("seller_id")).orderBy(col("total_pieces").asc())# 利用dense_rank函数,依据下面的标准创立窗口sales_table_agg.withColumn('dense_rank', dense_rank().over(window_specifications)).show()
字符串
数据科学家在解决数据时面临的另一组十分常见的操作,包含从字符串中提取信息。当然,有很多Spark API能够对文本数据进行简直任何(根本)操作。
让咱们先从简略的LIKE运算符开始,而后再探讨正则表达式的用法。对于API的残缺列表,我将参考文档;上面是可能应用最多的API。
Like
在上面的示例中,咱们心愿应用sales表来抉择bill_raw_text相似于“ab%cd%”的所有字符串(即,以字符串ab结尾,两头有一个字符串cd。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *WHERE bill_raw_text LIKE 'ab%cd%''''sales_table_execution_plan = sales_table.where( col('bill_raw_text').like("ab%cd%"))sales_table_execution_plan.show()
有时咱们想要找到的模式更简单,无奈用简略的通配符来表白。在这种状况下,咱们须要应用正则表达式。让咱们深入研究几个函数。在上面的示例中,咱们将始终利用雷同的正则表达式。
(ab[cd]{2,4})|(aa[abcde]{1,2})
Like的正则表达式(Regex)
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT *FROM sales_tableWHERE bill_raw_text RLIKE '(ab[cd]{2,4})|(aa[abcde]{1,2})''''sales_table_execution_plan = sales_table.where( col('bill_raw_text').rlike("(ab[cd]{2,4})|(aa[abcde]{1,2})"))sales_table_execution_plan.show()
用正则表达式提取模式
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT DISTINCT REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') AS extracted_patternWHERE REGEXP_EXTRACT(bill_raw_text, '(ab[cd]{2,4})|(aa[abcde]{1,2})') <> "FROM sales_table'''sales_table_execution_plan = sales_table.select( # 最初一个整数示意要提取哪一组 regexp_extract(col('bill_raw_text'), "(ab[cd]{2,4})|(aa[abcde]{1,2})", 0).alias("extracted_pattern")).where(col("extracted_pattern") != "").distinct()sales_table_execution_plan.show(100,False)
数组操作
数组是一种数据类型,。Spark实现了很多函数来操作数组(精确地说,从2.4版开始就是这样)。让咱们深刻理解根本状况。
数组聚合
将列转换为数组与调用聚合函数一样简略。Spark 2.3有两种次要的数组聚合函数collect_set和collect_list:第一种只蕴含惟一的元素,而后一种只是将组转换为列表。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''SELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set, COLLECT_LIST(num_pieces_list) AS num_pieces_sold_list, seller_idFROM sales_tableGROUP BY seller_id'''sales_table_execution_plan = sales_table.groupBy(col("seller_id")).agg( collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"), collect_list(col("num_pieces_sold")).alias("num_pieces_sold_list"),)sales_table_execution_plan.show(10, True)
合成阵列
聚合的逆操作是“数组合成”,即从程度数组生成“垂直”列。为此,咱们能够应用explode函数。
# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")'''CREATE TABLE sales_table_aggregated ASSELECT COLLECT_SET(num_pieces_sold) AS num_pieces_sold_set, seller_idFROM sales_tableGROUP BY seller_id;SELECT EXPLODE(num_pieces_sold_set) AS exploded_num_pieces_setFROM sales_table_aggregated;'''sales_table_execution_aggregated = sales_table.groupBy(col("seller_id")).agg( collect_set(col("num_pieces_sold")).alias("num_pieces_sold_set"))sales_table_execution_exploded = sales_table_execution_aggregated.select( explode(col("num_pieces_sold_set")).alias("exploded_num_pieces_set"))sales_table_execution_exploded.show(10, True)
其余应用数组的操作(从Spark 2.4开始)
可怜的是,Spark 2.3不反对对数组执行太多操作。侥幸的是,Spark 2.4能够!Spark 2.4之后提供的一些性能包含:
- array_except(array1,array2)-返回array1中的元素数组,而不是array2中的元素,没有反复项。
- array_intersect(array1,array2)-返回array1和array2相交的元素数组,不蕴含反复项。
- array_join(array,delimiter[,nullReplacement])-应用分隔符和可选字符串连贯给定数组的元素。
- array_max(array)-返回数组中的最大值。跳过空元素。
- array_min(array)-返回数组中的最小值。跳过空元素。
- array_sort(array)-按升序对输出数组进行排序。输出数组的元素必须是可排序的。空元素将放在返回数组的开端。
等等。以上定义间接取自参考文献。我倡议你查一下,以便有更多的细节!
UDFs
最初,用户定义函数。当咱们在默认的api中找不到转换时,udf就是一种办法。
UDF是一个定制函数,程序员能够像咱们目前看到的所有api一样定义并利用于列。它们提供了最大的灵活性(咱们简直能够在其中编写任何代码);毛病是Spark将它们视为黑匣子,因而外部的Spark引擎优化器(Catalyst)无奈进行任何优化:udf可能会减慢咱们的代码速度。
作为一个示例,让咱们实现一个UDF,它模仿函数array_repeat(element,count),该函数返回一个蕴含元素count次的数组。
from pyspark.sql.functions import udffrom pyspark.sql.types import ArrayType, StringType# 创立将在UDF中应用的函数def array_repeat_custom(element, count): list = ["{}".format(element) for x in range(0, count)] return list# 将函数转换为UDF。指出UDF的返回类型是一种很好的做法# 在本例中,返回类型是字符串数组array_repeat_custom_udf = udf(array_repeat_custom, ArrayType(StringType()))# 以Parquet格局读取源表sales_table = spark.read.parquet("./data/sales_parquet")# 调用UDFsales_table_execution_plan = sales_table.select( array_repeat_custom_udf(col("num_pieces_sold"), lit(3)).alias("sample_array"))sales_table_execution_plan.show()
除了UDF的语法之外,我倡议你关注下面应用的lit函数。有些Spark函数只承受列作为输出:如果须要应用常量,则可能须要将该常量转换为“列”。lit函数会创立一列文字值。
下一步
我心愿我可能证实Spark并不比SQL更难,他们基本上是一样的。
你能够设想,这篇文章的题目有点夸大:实际上精通这个工具须要15分钟以上的工夫;但我置信以上是一个很好的疾速入门!
我的倡议是开始应用下面的api,因为它们将笼罩70%的用例。当你对基础知识有信念时,我倡议你写上面两篇文章,那是一位值得信赖的作者(lol)几个月前写的。第一个问题将挑战你在应用此工具进行开发时遇到的一些经典问题,而第二个问题是对Spark Joins的深入研究。
https://towardsdatascience.co...
https://towardsdatascience.co...
附录-配置PyCharm
在本地(非分布式)环境中装置Spark是一项非常简单的工作。在本附录中,我将向你展现PyCharm Community Edition的根本配置,以便应用Python运行Spark。有五个简略步骤:
- 下载PyCharm社区版
- 下载Spark
- 装置PySpark
- 配置PyCharm以执行正确的Spark executor
- 测试是否一切正常
两个注意事项:
- 我假如你的零碎中正确装置了Java。
- 在Windows上,须要装置Winutils,这是运行Hadoop所需的一组二进制文件。查看此Git repo理解更多信息:https://github.com/stevelough...。
下载PyCharm社区版
侥幸的是,JetBrains有一个PyCharm的开源版本。咱们能够简略地从他们的网站下载最新版本。装置很简略。
下载Spark
咱们只须要从Spark官方网站下载一个压缩文件。在我写作时,有两个次要版本可用:3.0.1和2.4.7。对于文章的范畴,咱们能够抉择其中之一。
一旦下载实现,咱们只须要在一个适合的地位解压包。
装置PySpark
当初是运行PyCharm并装置所需的所有软件包的时候了。首先,让咱们关上PyCharm,创立一个新我的项目和一个新的虚拟环境。
最初,间接从PyCharm装置PySpark:
留神,为了启用提醒,咱们还应该装置pyspark-stubs包。
配置PyCharm以执行正确的Spark executor
心愿咱们没有呈现任何谬误,所以咱们只须要批示PyCharm运行正确的Spark执行器。它位于咱们解压缩Spark自身的文件夹中。让咱们为PyCharm我的项目创立一个运行配置。
测试是否一切正常
要测试Spark是否失常工作,只需运行以下代码片段
# 导入库import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *# 初始化Spark会话spark = SparkSession.builder \ .master("local") \ .appName("SparkLikeABoss") \ .getOrCreate()print(spark.version)
原文链接:https://towardsdatascience.co...
欢送关注磐创AI博客站:
http://panchuang.net/
sklearn机器学习中文官网文档:
http://sklearn123.com/
欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/