JOIN操作是十分常见的数据处理操作,Spark作为一个对立的大数据处理引擎,提供了十分丰盛的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,心愿对你有所帮忙。本文次要包含以下内容:

  • 影响JOIN操作的因素
  • Spark中JOIN执行的5种策略
  • Spark是如何抉择JOIN策略的

影响JOIN操作的因素

数据集的大小

参加JOIN的数据集的大小会间接影响Join操作的执行效率。同样,也会影响JOIN机制的抉择和JOIN的执行效率。

JOIN的条件

JOIN的条件会波及字段之间的逻辑比拟。依据JOIN的条件,JOIN可分为两大类:等值连贯非等值连贯。等值连贯会波及一个或多个须要同时满足的相等条件。在两个输出数据集的属性之间利用每个等值条件。当应用其余运算符(运算连接符不为=)时,称之为非等值连贯。

JOIN的类型

在输出数据集的记录之间利用连贯条件之后,JOIN类型会影响JOIN操作的后果。次要有以下几种JOIN类型:

  • 内连贯(_Inner Join_):仅从输出数据集中输入匹配连贯条件的记录。
  • 外连贯(_Outer Join_):又分为左外连贯、右外链接和全外连贯。
  • 半连贯(_Semi Join_):右表只用于过滤左表的数据而不呈现在后果集中。
  • 穿插连贯(_Cross Join_):穿插联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。穿插联接也称作笛卡尔积。

Spark中JOIN执行的5种策略

Spark提供了5种JOIN机制来执行具体的JOIN操作。该5种JOIN机制如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

简介

当要JOIN的表数据量比拟大时,能够抉择Shuffle Hash Join。这样能够将大表进行依照JOIN的key进行重分区,保障每个雷同的JOIN key都发送到同一个分区中。如下图示:

image

如上图所示:Shuffle Hash Join的根本步骤次要有以下两点:

  • 首先,对于两张参加JOIN的表,别离依照join key进行重分区,该过程会波及Shuffle,其目标是将雷同join key的数据发送到同一个分区,不便分区内进行join。
  • 其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,而后依据join key与大表的分区数据记录进行匹配。

条件与特点

  • 仅反对等值连贯,join key不须要排序
  • 反对除了全外连贯(full outer joins)之外的所有join类型
  • 须要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比拟大,可能会造成OOM
  • 将参数_spark.sql.join.prefersortmergeJoin (default true)_置为false

Broadcast Hash Join

简介

也称之为Map端JOIN。当有一张表较小时,咱们通常抉择Broadcast Hash Join,这样能够防止Shuffle带来的开销,从而进步性能。比方事实表与维表进行JOIN时,因为维表的数据通常会很小,所以能够应用Broadcast Hash Join将维表进行Broadcast。这样能够防止数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而进步JOIN的效率。在进行 Broadcast Join 之前,Spark 须要把处于 Executor 端的数据先发送到 Driver 端,而后 Driver 端再把数据播送到 Executor 端。如果咱们须要播送的数据比拟多,会造成 Driver 端呈现 OOM。具体如下图示:

image

Broadcast Hash Join次要包含两个阶段:

  • Broadcast阶段 :小表被缓存在executor中
  • Hash Join阶段:在每个 executor中执行Hash Join

条件与特点

  • 仅反对等值连贯,join key不须要排序
  • 反对除了全外连贯(full outer joins)之外的所有join类型
  • Broadcast Hash Join相比其余的JOIN机制而言,效率更高。然而,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,须要在Driver端缓存数据,所以当小表的数据量较大时,会呈现OOM的状况
  • 被播送的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)
  • 被播送表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize          if (dataSize >= (8L << 30)) {            throw new SparkException(              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")          } 
  • 基表不能被broadcast,比方左连贯时,只能将右表进行播送。形如:fact_table.join(broadcast(dimension_table),能够不应用broadcast提醒,当满足条件时会主动转为该JOIN形式。

Sort Merge Join

简介

该JOIN机制是Spark默认的,能够通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先应用Sort Merge Join。个别在两张大表进行JOIN时,应用该形式。Sort Merge Join能够缩小集群中的数据传输,该形式不会先加载所有数据的到内存,而后进行hashjoin,然而在JOIN之前须要对join key进行排序。具体图示:

image

Sort Merge Join次要包含三个阶段:

  • Shuffle Phase : 两张大表依据Join key进行Shuffle重分区
  • Sort Phase: 每个分区内的数据进行排序
  • Merge Phase: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连贯具备雷同Join key值的行来合并数据集

条件与特点

  • 仅反对等值连贯
  • 反对所有join类型
  • Join Keys是排序的
  • 参数spark.sql.join.prefersortmergeJoin (默认true)设定为true

Cartesian Join

简介

如果 Spark 中两张参加 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 失去的后果其实就是两张行数的乘积。

条件

  • 仅反对内连贯
  • 反对等值和不等值连贯
  • 开启参数spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

简介

该形式是在没有适合的JOIN机制可供选择时,最终会抉择该种join策略。优先级为:_Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join_.

在Cartesian 与Broadcast Nested Loop Join之间,如果是内连贯,或者非等值连贯,则优先选择Broadcast Nested Loop策略,过后非等值连贯并且一张表能够被播送时,会抉择Cartesian Join。

条件与特点

  • 反对等值和非等值连贯
  • 反对所有的JOIN类型,次要优化点如下:

    • 当右外连贯时要播送左表
    • 当左外连贯时要播送右表
    • 当内连贯时,要播送左右两张表

Spark是如何抉择JOIN策略的

等值连贯的状况

有join提醒(hints)的状况,依照上面的程序

  • 1.Broadcast Hint:如果join类型反对,则抉择broadcast hash join
  • 2.Sort merge hint:如果join key是排序的,则抉择 sort-merge join
  • 3.shuffle hash hint:如果join类型反对, 抉择 shuffle hash join
  • 4.shuffle replicate NL hint: 如果是内连贯,抉择笛卡尔积形式

没有join提醒(hints)的状况,则一一对照上面的规定

  • 1.如果join类型反对,并且其中一张表可能被播送(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则抉择 broadcast hash join
  • 2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(能够构建一个hash map) ,则抉择shuffle hash join
  • 3.如果join keys 是排序的,则抉择sort-merge join
  • 4.如果是内连贯,抉择 cartesian join
  • 5.如果可能会产生OOM或者没有能够抉择的执行策略,则最终抉择broadcast nested loop join

非等值连贯状况

有join提醒(hints),依照上面的程序

  • 1.broadcast hint:抉择broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是内连贯,则抉择cartesian product join

没有join提醒(hints),则一一对照上面的规定

  • 1.如果一张表足够小(能够被播送),则抉择 broadcast nested loop join
  • 2.如果是内连贯,则抉择cartesian product join
  • 3.如果可能会产生OOM或者没有能够抉择的执行策略,则最终抉择broadcast nested loop join

join策略抉择的源码片段

 object JoinSelection extends Strategy    with PredicateHelper    with JoinSelectionHelper {    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {            buildSide =>              Seq(joins.BroadcastHashJoinExec(                leftKeys,                rightKeys,                joinType,                buildSide,                nonEquiCond,                planLater(left),                planLater(right)))          }        }        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {            buildSide =>              Seq(joins.ShuffledHashJoinExec(                leftKeys,                rightKeys,                joinType,                buildSide,                nonEquiCond,                planLater(left),                planLater(right)))          }        }        def createSortMergeJoin() = {          if (RowOrdering.isOrderable(leftKeys)) {            Some(Seq(joins.SortMergeJoinExec(              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))          } else {            None          }        }        def createCartesianProduct() = {          if (joinType.isInstanceOf[InnerLike]) {            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))          } else {            None          }        }        def createJoinWithoutHint() = {          createBroadcastHashJoin(false)            .orElse {              if (!conf.preferSortMergeJoin) {                createShuffleHashJoin(false)              } else {                None              }            }            .orElse(createSortMergeJoin())            .orElse(createCartesianProduct())            .getOrElse {              val buildSide = getSmallerSide(left, right)              Seq(joins.BroadcastNestedLoopJoinExec(                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))            }        }        createBroadcastHashJoin(true)          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }          .orElse(createShuffleHashJoin(true))          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }          .getOrElse(createJoinWithoutHint())              if (canBuildLeft(joinType)) BuildLeft else BuildRight        }        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {          val maybeBuildSide = if (buildLeft && buildRight) {            Some(desiredBuildSide)          } else if (buildLeft) {            Some(BuildLeft)          } else if (buildRight) {            Some(BuildRight)          } else {            None          }          maybeBuildSide.map { buildSide =>            Seq(joins.BroadcastNestedLoopJoinExec(              planLater(left), planLater(right), buildSide, joinType, condition))          }        }        def createCartesianProduct() = {          if (joinType.isInstanceOf[InnerLike]) {            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))          } else {            None          }        }        def createJoinWithoutHint() = {          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))            .orElse(createCartesianProduct())            .getOrElse {              Seq(joins.BroadcastNestedLoopJoinExec(                planLater(left), planLater(right), desiredBuildSide, joinType, condition))            }        }        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }          .getOrElse(createJoinWithoutHint())      case _ => Nil    }  } 

总结

本文次要介绍了Spark提供的5种JOIN策略,并对三种比拟重要的JOIN策略进行了图示解析。首先对影响JOIN的因素进行了梳理,而后介绍了5种Spark的JOIN策略,并对每种JOIN策略的具体含意和触发条件进行了论述,最初给出了JOIN策略抉择对应的源码片段。心愿本文可能对你有所帮忙。