关于spark:Spark的五种JOIN策略

8次阅读

共计 6796 个字符,预计需要花费 17 分钟才能阅读完成。

JOIN 操作是十分常见的数据处理操作,Spark 作为一个对立的大数据处理引擎,提供了十分丰盛的 JOIN 场景。本文分享将介绍 Spark 所提供的 5 种 JOIN 策略,心愿对你有所帮忙。本文次要包含以下内容:

  • 影响 JOIN 操作的因素
  • Spark 中 JOIN 执行的 5 种策略
  • Spark 是如何抉择 JOIN 策略的

影响 JOIN 操作的因素

数据集的大小

参加 JOIN 的数据集的大小会间接影响 Join 操作的执行效率。同样,也会影响 JOIN 机制的抉择和 JOIN 的执行效率。

JOIN 的条件

JOIN 的条件会波及字段之间的逻辑比拟。依据 JOIN 的条件,JOIN 可分为两大类:等值连贯 非等值连贯 。等值连贯会波及一个或多个须要同时满足的相等条件。在两个输出数据集的属性之间利用每个等值条件。当应用其余运算符(运算连接符不为=) 时,称之为非等值连贯。

JOIN 的类型

在输出数据集的记录之间利用连贯条件之后,JOIN 类型会影响 JOIN 操作的后果。次要有以下几种 JOIN 类型:

  • 内连贯(_Inner Join_):仅从输出数据集中输入匹配连贯条件的记录。
  • 外连贯(_Outer Join_):又分为左外连贯、右外链接和全外连贯。
  • 半连贯(_Semi Join_):右表只用于过滤左表的数据而不呈现在后果集中。
  • 穿插连贯(_Cross Join_):穿插联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。穿插联接也称作笛卡尔积。

Spark 中 JOIN 执行的 5 种策略

Spark 提供了 5 种 JOIN 机制来执行具体的 JOIN 操作。该 5 种 JOIN 机制如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

简介

当要 JOIN 的表数据量比拟大时,能够抉择 Shuffle Hash Join。这样能够将大表进行 依照 JOIN 的 key 进行重分区,保障每个雷同的 JOIN key 都发送到同一个分区中。如下图示:

image

如上图所示:Shuffle Hash Join 的根本步骤次要有以下两点:

  • 首先,对于两张参加 JOIN 的表,别离依照 join key 进行重分区,该过程会波及 Shuffle,其目标是将雷同 join key 的数据发送到同一个分区,不便分区内进行 join。
  • 其次,对于每个 Shuffle 之后的分区,会将小表的分区数据构建成一个 Hash table,而后依据 join key 与大表的分区数据记录进行匹配。

条件与特点

  • 仅反对等值连贯,join key 不须要排序
  • 反对除了全外连贯 (full outer joins) 之外的所有 join 类型
  • 须要对小表构建 Hash map,属于内存密集型的操作,如果构建 Hash 表的一侧数据比拟大,可能会造成 OOM
  • 将参数_spark.sql.join.prefersortmergeJoin (default true)_置为 false

Broadcast Hash Join

简介

也称之为Map 端 JOIN。当有一张表较小时,咱们通常抉择 Broadcast Hash Join,这样能够防止 Shuffle 带来的开销,从而进步性能。比方事实表与维表进行 JOIN 时,因为维表的数据通常会很小,所以能够应用 Broadcast Hash Join 将维表进行 Broadcast。这样能够防止数据的 Shuffle(在 Spark 中 Shuffle 操作是很耗时的),从而进步 JOIN 的效率。在进行 Broadcast Join 之前,Spark 须要把处于 Executor 端的数据先发送到 Driver 端,而后 Driver 端再把数据播送到 Executor 端。如果咱们须要播送的数据比拟多,会造成 Driver 端呈现 OOM。具体如下图示:

image

Broadcast Hash Join 次要包含两个阶段:

  • Broadcast阶段:小表被缓存在 executor 中
  • Hash Join阶段:在每个 executor 中执行 Hash Join

条件与特点

  • 仅反对等值连贯,join key 不须要排序
  • 反对除了全外连贯 (full outer joins) 之外的所有 join 类型
  • Broadcast Hash Join 相比其余的 JOIN 机制而言,效率更高。然而,Broadcast Hash Join 属于网络密集型的操作(数据冗余传输),除此之外,须要在 Driver 端缓存数据,所以当小表的数据量较大时,会呈现 OOM 的状况
  • 被播送的小表的数据量要小于 spark.sql.autoBroadcastJoinThreshold 值,默认是 10MB(10485760)
  • 被播送表的大小阈值不能超过 8GB,spark2.4 源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          } 
  • 基表不能被 broadcast,比方左连贯时,只能将右表进行播送。形如:fact_table.join(broadcast(dimension_table),能够不应用 broadcast 提醒,当满足条件时会主动转为该 JOIN 形式。

Sort Merge Join

简介

该 JOIN 机制是 Spark 默认的,能够通过参数 spark.sql.join.preferSortMergeJoin 进行配置,默认是 true,即优先应用 Sort Merge Join。个别在两张大表进行 JOIN 时,应用该形式。Sort Merge Join 能够缩小集群中的数据传输,该形式不会先加载所有数据的到内存,而后进行 hashjoin,然而在 JOIN 之前须要对 join key 进行排序。具体图示:

image

Sort Merge Join 次要包含三个阶段:

  • Shuffle Phase : 两张大表依据 Join key 进行 Shuffle 重分区
  • Sort Phase: 每个分区内的数据进行排序
  • Merge Phase: 对来自不同表的排序好的分区数据进行 JOIN,通过遍历元素,连贯具备雷同 Join key 值的行来合并数据集

条件与特点

  • 仅反对等值连贯
  • 反对所有 join 类型
  • Join Keys 是排序的
  • 参数 spark.sql.join.prefersortmergeJoin (默认 true) 设定为 true

Cartesian Join

简介

如果 Spark 中两张参加 Join 的表没指定 join key(ON 条件)那么会产生 Cartesian product join,这个 Join 失去的后果其实就是两张行数的乘积。

条件

  • 仅反对内连贯
  • 反对等值和不等值连贯
  • 开启参数 spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

简介

该形式是在没有适合的 JOIN 机制可供选择时,最终会抉择该种 join 策略。优先级为:_Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join_.

在 Cartesian 与 Broadcast Nested Loop Join 之间,如果是内连贯,或者非等值连贯,则优先选择 Broadcast Nested Loop 策略,过后非等值连贯并且一张表能够被播送时,会抉择 Cartesian Join。

条件与特点

  • 反对等值和非等值连贯
  • 反对所有的 JOIN 类型,次要优化点如下:

    • 当右外连贯时要播送左表
    • 当左外连贯时要播送右表
    • 当内连贯时,要播送左右两张表

Spark 是如何抉择 JOIN 策略的

等值连贯的状况

有 join 提醒 (hints) 的状况,依照上面的程序

  • 1.Broadcast Hint:如果 join 类型反对,则抉择 broadcast hash join
  • 2.Sort merge hint:如果 join key 是排序的,则抉择 sort-merge join
  • 3.shuffle hash hint:如果 join 类型反对,抉择 shuffle hash join
  • 4.shuffle replicate NL hint:如果是内连贯,抉择笛卡尔积形式

没有 join 提醒 (hints) 的状况,则一一对照上面的规定

  • 1. 如果 join 类型反对,并且其中一张表可能被播送 (spark.sql.autoBroadcastJoinThreshold 值,默认是 10MB),则抉择 broadcast hash join
  • 2. 如果参数spark.sql.join.preferSortMergeJoin 设定为 false,且一张表足够小(能够构建一个 hash map),则抉择 shuffle hash join
  • 3. 如果 join keys 是排序的,则抉择 sort-merge join
  • 4. 如果是内连贯,抉择 cartesian join
  • 5. 如果可能会产生 OOM 或者没有能够抉择的执行策略,则最终抉择 broadcast nested loop join

非等值连贯状况

有 join 提醒(hints),依照上面的程序

  • 1.broadcast hint:抉择 broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是内连贯,则抉择 cartesian product join

没有 join 提醒(hints),则一一对照上面的规定

  • 1. 如果一张表足够小(能够被播送),则抉择 broadcast nested loop join
  • 2. 如果是内连贯,则抉择 cartesian product join
  • 3. 如果可能会产生 OOM 或者没有能够抉择的执行策略,则最终抉择 broadcast nested loop join

join 策略抉择的源码片段

 object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {None}
        }

        def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {None}
        }

        def createJoinWithoutHint() = {createBroadcastHashJoin(false)
            .orElse {if (!conf.preferSortMergeJoin) {createShuffleHashJoin(false)
              } else {None}
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse {if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None}
          .orElse(createShuffleHashJoin(true))
          .orElse {if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None}
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {val maybeBuildSide = if (buildLeft && buildRight) {Some(desiredBuildSide)
          } else if (buildLeft) {Some(BuildLeft)
          } else if (buildRight) {Some(BuildRight)
          } else {None}

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {if (joinType.isInstanceOf[InnerLike]) {Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {None}
        }

        def createJoinWithoutHint() = {createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse {if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None}
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  } 

总结

本文次要介绍了 Spark 提供的 5 种 JOIN 策略,并对三种比拟重要的 JOIN 策略进行了图示解析。首先对影响 JOIN 的因素进行了梳理,而后介绍了 5 种 Spark 的 JOIN 策略,并对每种 JOIN 策略的具体含意和触发条件进行了论述,最初给出了 JOIN 策略抉择对应的源码片段。心愿本文可能对你有所帮忙。

正文完
 0