JOIN 操作是十分常见的数据处理操作,Spark 作为一个对立的大数据处理引擎,提供了十分丰盛的 JOIN 场景。本文分享将介绍 Spark 所提供的 5 种 JOIN 策略,心愿对你有所帮忙。本文次要包含以下内容:
- 影响 JOIN 操作的因素
- Spark 中 JOIN 执行的 5 种策略
- Spark 是如何抉择 JOIN 策略的
影响 JOIN 操作的因素
数据集的大小
参加 JOIN 的数据集的大小会间接影响 Join 操作的执行效率。同样,也会影响 JOIN 机制的抉择和 JOIN 的执行效率。
JOIN 的条件
JOIN 的条件会波及字段之间的逻辑比拟。依据 JOIN 的条件,JOIN 可分为两大类:等值连贯 和非等值连贯 。等值连贯会波及一个或多个须要同时满足的相等条件。在两个输出数据集的属性之间利用每个等值条件。当应用其余运算符(运算连接符不为=) 时,称之为非等值连贯。
JOIN 的类型
在输出数据集的记录之间利用连贯条件之后,JOIN 类型会影响 JOIN 操作的后果。次要有以下几种 JOIN 类型:
- 内连贯(_Inner Join_):仅从输出数据集中输入匹配连贯条件的记录。
- 外连贯(_Outer Join_):又分为左外连贯、右外链接和全外连贯。
- 半连贯(_Semi Join_):右表只用于过滤左表的数据而不呈现在后果集中。
- 穿插连贯(_Cross Join_):穿插联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。穿插联接也称作笛卡尔积。
Spark 中 JOIN 执行的 5 种策略
Spark 提供了 5 种 JOIN 机制来执行具体的 JOIN 操作。该 5 种 JOIN 机制如下所示:
- Shuffle Hash Join
- Broadcast Hash Join
- Sort Merge Join
- Cartesian Join
- Broadcast Nested Loop Join
Shuffle Hash Join
简介
当要 JOIN 的表数据量比拟大时,能够抉择 Shuffle Hash Join。这样能够将大表进行 依照 JOIN 的 key 进行重分区,保障每个雷同的 JOIN key 都发送到同一个分区中。如下图示:
image
如上图所示:Shuffle Hash Join 的根本步骤次要有以下两点:
- 首先,对于两张参加 JOIN 的表,别离依照 join key 进行重分区,该过程会波及 Shuffle,其目标是将雷同 join key 的数据发送到同一个分区,不便分区内进行 join。
- 其次,对于每个 Shuffle 之后的分区,会将小表的分区数据构建成一个 Hash table,而后依据 join key 与大表的分区数据记录进行匹配。
条件与特点
- 仅反对等值连贯,join key 不须要排序
- 反对除了全外连贯 (full outer joins) 之外的所有 join 类型
- 须要对小表构建 Hash map,属于内存密集型的操作,如果构建 Hash 表的一侧数据比拟大,可能会造成 OOM
- 将参数_spark.sql.join.prefersortmergeJoin (default true)_置为 false
Broadcast Hash Join
简介
也称之为Map 端 JOIN。当有一张表较小时,咱们通常抉择 Broadcast Hash Join,这样能够防止 Shuffle 带来的开销,从而进步性能。比方事实表与维表进行 JOIN 时,因为维表的数据通常会很小,所以能够应用 Broadcast Hash Join 将维表进行 Broadcast。这样能够防止数据的 Shuffle(在 Spark 中 Shuffle 操作是很耗时的),从而进步 JOIN 的效率。在进行 Broadcast Join 之前,Spark 须要把处于 Executor 端的数据先发送到 Driver 端,而后 Driver 端再把数据播送到 Executor 端。如果咱们须要播送的数据比拟多,会造成 Driver 端呈现 OOM。具体如下图示:
image
Broadcast Hash Join 次要包含两个阶段:
- Broadcast阶段:小表被缓存在 executor 中
- Hash Join阶段:在每个 executor 中执行 Hash Join
条件与特点
- 仅反对等值连贯,join key 不须要排序
- 反对除了全外连贯 (full outer joins) 之外的所有 join 类型
- Broadcast Hash Join 相比其余的 JOIN 机制而言,效率更高。然而,Broadcast Hash Join 属于网络密集型的操作(数据冗余传输),除此之外,须要在 Driver 端缓存数据,所以当小表的数据量较大时,会呈现 OOM 的状况
- 被播送的小表的数据量要小于 spark.sql.autoBroadcastJoinThreshold 值,默认是 10MB(10485760)
- 被播送表的大小阈值不能超过 8GB,spark2.4 源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
if (dataSize >= (8L << 30)) {
throw new SparkException(s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
}
- 基表不能被 broadcast,比方左连贯时,只能将右表进行播送。形如:fact_table.join(broadcast(dimension_table),能够不应用 broadcast 提醒,当满足条件时会主动转为该 JOIN 形式。
Sort Merge Join
简介
该 JOIN 机制是 Spark 默认的,能够通过参数 spark.sql.join.preferSortMergeJoin 进行配置,默认是 true,即优先应用 Sort Merge Join。个别在两张大表进行 JOIN 时,应用该形式。Sort Merge Join 能够缩小集群中的数据传输,该形式不会先加载所有数据的到内存,而后进行 hashjoin,然而在 JOIN 之前须要对 join key 进行排序。具体图示:
image
Sort Merge Join 次要包含三个阶段:
- Shuffle Phase : 两张大表依据 Join key 进行 Shuffle 重分区
- Sort Phase: 每个分区内的数据进行排序
- Merge Phase: 对来自不同表的排序好的分区数据进行 JOIN,通过遍历元素,连贯具备雷同 Join key 值的行来合并数据集
条件与特点
- 仅反对等值连贯
- 反对所有 join 类型
- Join Keys 是排序的
- 参数 spark.sql.join.prefersortmergeJoin (默认 true) 设定为 true
Cartesian Join
简介
如果 Spark 中两张参加 Join 的表没指定 join key(ON 条件)那么会产生 Cartesian product join,这个 Join 失去的后果其实就是两张行数的乘积。
条件
- 仅反对内连贯
- 反对等值和不等值连贯
- 开启参数 spark.sql.crossJoin.enabled=true
Broadcast Nested Loop Join
简介
该形式是在没有适合的 JOIN 机制可供选择时,最终会抉择该种 join 策略。优先级为:_Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join_.
在 Cartesian 与 Broadcast Nested Loop Join 之间,如果是内连贯,或者非等值连贯,则优先选择 Broadcast Nested Loop 策略,过后非等值连贯并且一张表能够被播送时,会抉择 Cartesian Join。
条件与特点
- 反对等值和非等值连贯
-
反对所有的 JOIN 类型,次要优化点如下:
- 当右外连贯时要播送左表
- 当左外连贯时要播送右表
- 当内连贯时,要播送左右两张表
Spark 是如何抉择 JOIN 策略的
等值连贯的状况
有 join 提醒 (hints) 的状况,依照上面的程序
- 1.Broadcast Hint:如果 join 类型反对,则抉择 broadcast hash join
- 2.Sort merge hint:如果 join key 是排序的,则抉择 sort-merge join
- 3.shuffle hash hint:如果 join 类型反对,抉择 shuffle hash join
- 4.shuffle replicate NL hint:如果是内连贯,抉择笛卡尔积形式
没有 join 提醒 (hints) 的状况,则一一对照上面的规定
- 1. 如果 join 类型反对,并且其中一张表可能被播送 (spark.sql.autoBroadcastJoinThreshold 值,默认是 10MB),则抉择 broadcast hash join
- 2. 如果参数spark.sql.join.preferSortMergeJoin 设定为 false,且一张表足够小(能够构建一个 hash map),则抉择 shuffle hash join
- 3. 如果 join keys 是排序的,则抉择 sort-merge join
- 4. 如果是内连贯,抉择 cartesian join
- 5. 如果可能会产生 OOM 或者没有能够抉择的执行策略,则最终抉择 broadcast nested loop join
非等值连贯状况
有 join 提醒(hints),依照上面的程序
- 1.broadcast hint:抉择 broadcast nested loop join.
- 2.shuffle replicate NL hint: 如果是内连贯,则抉择 cartesian product join
没有 join 提醒(hints),则一一对照上面的规定
- 1. 如果一张表足够小(能够被播送),则抉择 broadcast nested loop join
- 2. 如果是内连贯,则抉择 cartesian product join
- 3. 如果可能会产生 OOM 或者没有能够抉择的执行策略,则最终抉择 broadcast nested loop join
join 策略抉择的源码片段
object JoinSelection extends Strategy
with PredicateHelper
with JoinSelectionHelper {def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
nonEquiCond,
planLater(left),
planLater(right)))
}
}
def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.ShuffledHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
nonEquiCond,
planLater(left),
planLater(right)))
}
}
def createSortMergeJoin() = {if (RowOrdering.isOrderable(leftKeys)) {
Some(Seq(joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
} else {None}
}
def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
} else {None}
}
def createJoinWithoutHint() = {createBroadcastHashJoin(false)
.orElse {if (!conf.preferSortMergeJoin) {createShuffleHashJoin(false)
} else {None}
}
.orElse(createSortMergeJoin())
.orElse(createCartesianProduct())
.getOrElse {val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
}
}
createBroadcastHashJoin(true)
.orElse {if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None}
.orElse(createShuffleHashJoin(true))
.orElse {if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None}
.getOrElse(createJoinWithoutHint())
if (canBuildLeft(joinType)) BuildLeft else BuildRight
}
def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {val maybeBuildSide = if (buildLeft && buildRight) {Some(desiredBuildSide)
} else if (buildLeft) {Some(BuildLeft)
} else if (buildRight) {Some(BuildRight)
} else {None}
maybeBuildSide.map { buildSide =>
Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition))
}
}
def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
} else {None}
}
def createJoinWithoutHint() = {createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
.orElse(createCartesianProduct())
.getOrElse {
Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), desiredBuildSide, joinType, condition))
}
}
createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
.orElse {if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None}
.getOrElse(createJoinWithoutHint())
case _ => Nil
}
}
总结
本文次要介绍了 Spark 提供的 5 种 JOIN 策略,并对三种比拟重要的 JOIN 策略进行了图示解析。首先对影响 JOIN 的因素进行了梳理,而后介绍了 5 种 Spark 的 JOIN 策略,并对每种 JOIN 策略的具体含意和触发条件进行了论述,最初给出了 JOIN 策略抉择对应的源码片段。心愿本文可能对你有所帮忙。