JOIN操作是十分常见的数据处理操作,Spark作为一个对立的大数据处理引擎,提供了十分丰盛的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,心愿对你有所帮忙。本文次要包含以下内容:
- 影响JOIN操作的因素
- Spark中JOIN执行的5种策略
- Spark是如何抉择JOIN策略的
影响JOIN操作的因素
数据集的大小
参加JOIN的数据集的大小会间接影响Join操作的执行效率。同样,也会影响JOIN机制的抉择和JOIN的执行效率。
JOIN的条件
JOIN的条件会波及字段之间的逻辑比拟。依据JOIN的条件,JOIN可分为两大类:等值连贯和非等值连贯。等值连贯会波及一个或多个须要同时满足的相等条件。在两个输出数据集的属性之间利用每个等值条件。当应用其余运算符(运算连接符不为=)时,称之为非等值连贯。
JOIN的类型
在输出数据集的记录之间利用连贯条件之后,JOIN类型会影响JOIN操作的后果。次要有以下几种JOIN类型:
- 内连贯(Inner Join):仅从输出数据集中输入匹配连贯条件的记录。
- 外连贯(Outer Join):又分为左外连贯、右外链接和全外连贯。
- 半连贯(Semi Join):右表只用于过滤左表的数据而不呈现在后果集中。
- 穿插连贯(Cross Join):穿插联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。穿插联接也称作笛卡尔积。
Spark中JOIN执行的5种策略
Spark提供了5种JOIN机制来执行具体的JOIN操作。该5种JOIN机制如下所示:
- Shuffle Hash Join
- Broadcast Hash Join
- Sort Merge Join
- Cartesian Join
- Broadcast Nested Loop Join
Shuffle Hash Join
简介
当要JOIN的表数据量比拟大时,能够抉择Shuffle Hash Join。这样能够将大表进行依照JOIN的key进行重分区,保障每个雷同的JOIN key都发送到同一个分区中。如下图示:
如上图所示:Shuffle Hash Join的根本步骤次要有以下两点:
- 首先,对于两张参加JOIN的表,别离依照join key进行重分区,该过程会波及Shuffle,其目标是将雷同join key的数据发送到同一个分区,不便分区内进行join。
- 其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,而后依据join key与大表的分区数据记录进行匹配。
条件与特点
- 仅反对等值连贯,join key不须要排序
- 反对除了全外连贯(full outer joins)之外的所有join类型
- 须要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比拟大,可能会造成OOM
- 将参数spark.sql.join.prefersortmergeJoin (default true)置为false
Broadcast Hash Join
简介
也称之为Map端JOIN。当有一张表较小时,咱们通常抉择Broadcast Hash Join,这样能够防止Shuffle带来的开销,从而进步性能。比方事实表与维表进行JOIN时,因为维表的数据通常会很小,所以能够应用Broadcast Hash Join将维表进行Broadcast。这样能够防止数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而进步JOIN的效率。在进行 Broadcast Join 之前,Spark 须要把处于 Executor 端的数据先发送到 Driver 端,而后 Driver 端再把数据播送到 Executor 端。如果咱们须要播送的数据比拟多,会造成 Driver 端呈现 OOM。具体如下图示:
Broadcast Hash Join次要包含两个阶段:
- Broadcast阶段 :小表被缓存在executor中
- Hash Join阶段:在每个 executor中执行Hash Join
条件与特点
- 仅反对等值连贯,join key不须要排序
- 反对除了全外连贯(full outer joins)之外的所有join类型
- Broadcast Hash Join相比其余的JOIN机制而言,效率更高。然而,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,须要在Driver端缓存数据,所以当小表的数据量较大时,会呈现OOM的状况
- 被播送的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)
- 被播送表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize if (dataSize >= (8L << 30)) { throw new SparkException( s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB") }
- 基表不能被broadcast,比方左连贯时,只能将右表进行播送。形如:fact_table.join(broadcast(dimension_table),能够不应用broadcast提醒,当满足条件时会主动转为该JOIN形式。
Sort Merge Join
简介
该JOIN机制是Spark默认的,能够通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先应用Sort Merge Join。个别在两张大表进行JOIN时,应用该形式。Sort Merge Join能够缩小集群中的数据传输,该形式不会先加载所有数据的到内存,而后进行hashjoin,然而在JOIN之前须要对join key进行排序。具体图示:
Sort Merge Join次要包含三个阶段:
- Shuffle Phase : 两张大表依据Join key进行Shuffle重分区
- Sort Phase: 每个分区内的数据进行排序
- Merge Phase: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连贯具备雷同Join key值的行来合并数据集
条件与特点
- 仅反对等值连贯
- 反对所有join类型
- Join Keys是排序的
- 参数spark.sql.join.prefersortmergeJoin (默认true)设定为true
Cartesian Join
简介
如果 Spark 中两张参加 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 失去的后果其实就是两张行数的乘积。
条件
- 仅反对内连贯
- 反对等值和不等值连贯
- 开启参数spark.sql.crossJoin.enabled=true
Broadcast Nested Loop Join
简介
该形式是在没有适合的JOIN机制可供选择时,最终会抉择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
在Cartesian 与Broadcast Nested Loop Join之间,如果是内连贯,或者非等值连贯,则优先选择Broadcast Nested Loop策略,过后非等值连贯并且一张表能够被播送时,会抉择Cartesian Join。
条件与特点
- 反对等值和非等值连贯
反对所有的JOIN类型,次要优化点如下:
- 当右外连贯时要播送左表
- 当左外连贯时要播送右表
- 当内连贯时,要播送左右两张表
Spark是如何抉择JOIN策略的
等值连贯的状况
有join提醒(hints)的状况,依照上面的程序
- 1.Broadcast Hint:如果join类型反对,则抉择broadcast hash join
- 2.Sort merge hint:如果join key是排序的,则抉择 sort-merge join
- 3.shuffle hash hint:如果join类型反对, 抉择 shuffle hash join
- 4.shuffle replicate NL hint: 如果是内连贯,抉择笛卡尔积形式
没有join提醒(hints)的状况,则一一对照上面的规定
- 1.如果join类型反对,并且其中一张表可能被播送(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则抉择 broadcast hash join
- 2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(能够构建一个hash map) ,则抉择shuffle hash join
- 3.如果join keys 是排序的,则抉择sort-merge join
- 4.如果是内连贯,抉择 cartesian join
- 5.如果可能会产生OOM或者没有能够抉择的执行策略,则最终抉择broadcast nested loop join
非等值连贯状况
有join提醒(hints),依照上面的程序
- 1.broadcast hint:抉择broadcast nested loop join.
- 2.shuffle replicate NL hint: 如果是内连贯,则抉择cartesian product join
没有join提醒(hints),则一一对照上面的规定
- 1.如果一张表足够小(能够被播送),则抉择 broadcast nested loop join
- 2.如果是内连贯,则抉择cartesian product join
- 3.如果可能会产生OOM或者没有能够抉择的执行策略,则最终抉择broadcast nested loop join
join策略抉择的源码片段
object JoinSelection extends Strategy with PredicateHelper with JoinSelectionHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) => def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = { getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { buildSide => Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, nonEquiCond, planLater(left), planLater(right))) } } def createShuffleHashJoin(onlyLookingAtHint: Boolean) = { getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { buildSide => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, buildSide, nonEquiCond, planLater(left), planLater(right))) } } def createSortMergeJoin() = { if (RowOrdering.isOrderable(leftKeys)) { Some(Seq(joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right)))) } else { None } } def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition))) } else { None } } def createJoinWithoutHint() = { createBroadcastHashJoin(false) .orElse { if (!conf.preferSortMergeJoin) { createShuffleHashJoin(false) } else { None } } .orElse(createSortMergeJoin()) .orElse(createCartesianProduct()) .getOrElse { val buildSide = getSmallerSide(left, right) Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, nonEquiCond)) } } createBroadcastHashJoin(true) .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } .orElse(createShuffleHashJoin(true)) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) if (canBuildLeft(joinType)) BuildLeft else BuildRight } def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { val maybeBuildSide = if (buildLeft && buildRight) { Some(desiredBuildSide) } else if (buildLeft) { Some(BuildLeft) } else if (buildRight) { Some(BuildRight) } else { None } maybeBuildSide.map { buildSide => Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition)) } } def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) } else { None } } def createJoinWithoutHint() = { createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf)) .orElse(createCartesianProduct()) .getOrElse { Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), desiredBuildSide, joinType, condition)) } } createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) case _ => Nil } }
总结
本文次要介绍了Spark提供的5种JOIN策略,并对三种比拟重要的JOIN策略进行了图示解析。首先对影响JOIN的因素进行了梳理,而后介绍了5种Spark的JOIN策略,并对每种JOIN策略的具体含意和触发条件进行了论述,最初给出了JOIN策略抉择对应的源码片段。心愿本文可能对你有所帮忙。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包