主頁 >  其他 > Spark原始碼閱讀(五) --- Spark的支持的join方式以及join策略

Spark原始碼閱讀(五) --- Spark的支持的join方式以及join策略

2021-08-10 07:09:22 其他

目錄

Spark支持的七種Join方式

Inner Join

Cross Join

Left Outer Join

Right Outer Join

Full Outer Join

Left Semi Join

Left Anti Join

Spakr支持的五種Join策略

小表Join大表--Broadcast Hash Join與Shuffle Hash Join

Broadcast Hash Join

Broadcast Hash Join的總體流程

Broadcast Hash Join特點

Shuffle Hash Join

Shuffle Hash Join的總體流程

Shuffle Hash Join的特點

大表join大表--Sort Merge Join

非等值連接--Broadcast Nested Loop Join與Cartesian Join

Broadcast Nested Loop Join

Broadcast Nested Loop Join的特點

Cartesian Join(笛卡爾積)

Cartesian Join特點

Spark的Join選擇策略

調度順序總結

后記


Join無論SparkCore中還是在SparkSql都占據著至關重要的地位,今天來閱讀一下Join部分的原始碼

Spark支持的七種Join方式

直接看Join算子的原始碼,發現經過反復呼叫,最侄訓來到這個Join方法內部

def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
    // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
    // by creating a new instance for one of the branch.
    val joined = sparkSession.sessionState.executePlan(
      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
      .analyzed.asInstanceOf[Join]

    withPlan {
      Join(
        joined.left,
        joined.right,
        UsingJoin(JoinType(joinType), usingColumns),
        None)
    }
  }

該方法默認為Inner Join,可以看到傳入了String型別的名為joinType的param,被封裝到了JoinType的Object物件內,其apply方法原始碼如下

 def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    case "rightouter" | "right" => RightOuter
    case "leftsemi" => LeftSemi
    case "leftanti" => LeftAnti
    case "cross" => Cross
    case _ =>
      val supported = Seq(
        "inner",
        "outer", "full", "fullouter", "full_outer",
        "leftouter", "left", "left_outer",
        "rightouter", "right", "right_outer",
        "leftsemi", "left_semi",
        "leftanti", "left_anti",
        "cross")

      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
        "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
  }
}

Inner Join

Inner Join又稱為內連接,平日里書寫時,用join或者Inner Join,它join的結果就是符合關聯條件下的兩張表資料的交集,例如有如下代碼

Student表:

 sparkSession.sql(
      """
        |select 1 as id,'lili' as name
        |union all
        |select 2 as id,'hanhan' as name""".stripMargin)
      .createOrReplaceTempView("Student")

Score表:

sparkSession.sql("select 1 as id,'math' as subject,99 as score")
      .createOrReplaceTempView("Score")

join sql:

select t1.id,t1.name,t2.subject,t2.score
from Student t1
join Score t2
on t1.id=t2.id

結果如下:

Cross Join

Cross Join,直譯就是交叉Join,也就是所謂的笛卡爾積操作,笛卡爾積有很多實作的方法,比如非等值連接:

select *
from Student t1
join Score t2
on t1.id != t2.id

在from中同時from兩張表

select *
from Student,Score

join不寫條件

select *
from Student t1
join Score t2

on條件中用上or

select *
from Student t1
join Score t2
on t1.id = t2.id or t1.name =t2.name

或者直接寫cross Join

select *
from Student t1
cross join Score t2

都可以觸發笛卡爾積操作

它的結果或者執行程序中基本都會生成這樣一張表

就是所謂的交叉乘積,Student表中的每一條資料和Score表中的每一條資料都做了排列組合式聚合并最終輸出,

但是有個要注意的地方是,有時候并不會真的在底層做了笛卡爾積操作,因為sparkSql內部存有優化器,并且在非等值連接且條件允許的情況下,一般是對Broadcast Nested Loop Join進行呼叫,所以如何判斷是否真的產生了笛卡爾積?我們查詢執行計劃的時候如果看到

CartesianProduct

那就是底層真正采用了笛卡爾積操作,

關于sparkSql的Join策略將會在本文后半部分講述,

Left Outer Join

Left Outer Join,又稱左外連接,平常使用的時候寫left outer join或者left join都行,其作用是顯示全左表的資料,即便這部分資料不滿足Join條件

join sql:

select *
from Student t1
left join Score t2
on t1.id = t2.id

結果如下:

可以看到左表沒匹配到的欄位全部置為null

Right Outer Join

Right Outer Join,又稱右外連接,平常使用的時候寫right outer join或者right join都行,與left outer join類似,其作用是顯示全右表的資料,即便這部分資料不滿足Join條件

join sql:

select *
from Student t1
right join Score t2
on t1.id = t2.id

結果如下:

這里我將Student換到右表去,結果與預期一致

Full Outer Join

Full Outer Join,又稱全外連接,平常使用的時候寫full outer join或者full join都行,其作用是顯示全右左表的全部資料,即便這部分資料不滿足Join條件

join sql:

select *
from Score t1
full join Student t2
on t1.id = t2.name

結果如下:

我特意讓兩張表的join條件的欄位不一致,結果如預期一般全部顯示

Left Semi Join

Left Semi Join,又名左半連接,算是一種優化策略式的join方式,sparkSql的底層通常會將in操作優化為Left Semi Join,我們重點講解一下這個Join

首先注意它有幾個特點

1、右表的key僅僅只傳遞到map階段,所以它將不會在結果中出現

2、他的輸出結果與join類似,都是只輸出符合join條件的資料,但是它更快而且left semi join天生就是去重,可以看成一個更快的,去重的join

3、因為 left semi join 是 in(keySet) 的關系,遇到右表重復記錄,左表會跳過,比如左表中有一個time欄位中有2020-11-05這樣一條資料,右表有兩個欄位start_time和end_time組成一個時間段,這之中有兩條資料,分別是2020-10-12至2020-12-06和2020-9-17至2020-12-07,join的條件是time between start_time and end_time,那么join他會將這兩個條件都匹配上,致使資料發散,而left semi join只會匹配到一條滿足條件的資料,其他就會被跳過,這也是為什么left semi join本身自帶去重,因為沒有資料發散,又十分快,因為僅滿足條件就行,

下面舉幾個例子來說說它的特點

首先我們來兩張表

訂單表:

活動表:

join sql:

-- join
select *
from order t1
join campaign t2
on t1.order_dt between start_time and end_time
--left semi join
select *
from order t1
left semi join campaign t2
on t1.order_dt between start_time and end_time

結果如下:

join結果

left semi join結果

可以很明顯的看到join的結果資料發散了,而left semi join并沒有,這個結果符合了我們剛剛提到幾個特點

Left Anti Join

Left Anti Join,名為反左連接,其作用是顯示沒匹配的資料,其實就類似于not in,這也是一個經典的優化策略式的join方式,與left semi join相同,右表的資料也僅僅傳到map階段,并不會輸出到最終的結果,其底層的優化與left semi join類似

join sql:

select *
from Student t1
left anti join Score t2
on t1.id=t2.id

結果如下:

這里給個預期的結果,我就不多做贅述了

Spakr支持的五種Join策略

上述提到了spark的七種join方式,那么下面來說說spark的五種Join策略,它們分別是

1、Broadcast Hash Join

2、Shuffle Hash Join

3、Sort Merge Join

4、Cartesian Join

5、Broadcast Nested Loop Join

小表Join大表--Broadcast Hash Join與Shuffle Hash Join

我們知道spark是一個分布式計算引擎,其Join的核心其實仍是傳統資料庫中常見的各類Join策略,這邊先說說Broadcast Hash Join與Shuffle Hash Join用到的Hash Join是什么

舉例上文提到的Student表和Score表,假如有這么一個sql:select * from Student t1 join Score t2 on t1.id=t2.id.

那么Hash Join會做這么幾件事情,如下圖

1、首先Score表會作為一張Build Table,Student表會作為一張Probe Table,區分這兩者的因素是表的大小,為何選取小表做Build Table進而去生成Hash Table呢?因為小表體量小,可以完全加載到記憶體中,并且能夠比較輕松地被作為BroadCast分發出去,

2、掃描Score表全表,Score表會根據join條件的key做Hash操作,將該key放到對應的Bucket中,進而build一張Hash Table,

3、掃描Student表全表,根據Join條件對Join Key做Hash操作,映射到對應的Bucket上,此時會跟Hash Table中Score映射出來tuple的Key匹配上,此時還會再做一次Join條件和filter條件的判斷,才會最終輸出匹配的資料,

Broadcast Hash Join

我們直接上原始碼


case class BroadcastHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin with CodegenSupport {
//......
}

我們可以看到該class動態混入了 HashJoin和CodegenSupport,Hash Join我就不多說了,CodegenSupport則和Codegen有關,Codegen是Spark Runtime優化性能的關鍵技術,核心在于動態生成java代碼、即時compile和加載,把解釋執行轉化為編譯執行,Spark Codegen分為Expression級別和WholeStage級別,簡而言之Codegen就是管理執行計劃優化和生成的老大了,CodegenContext是它的核心類代碼,這里參考阿里云社區bean_stalk大佬文章內的一張圖(文章地址我在本文文末貼出)

Expression

我們可以看到起點是execute(),隨后是doExecute()方法,doExecute()主要做了兩件事,分為資料獲取代碼生成兩部分,

資料獲取走的是inputRdd->inputRdds->execute()這條路,假設物理算子節點 A 支持代碼生成,物理算子節點 B 不支持代碼生成,因此 B 會采用 InputAdapter 封裝

代碼生成則是produce()/doProduce()和consume()/doConsume()這條路,produce和consume是專門用以生成代碼的

因此我們直接看doProduce()和doConsume()兩被被override的原始碼

?
override def doProduce(ctx: CodegenContext): String = {
    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
  }

  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    joinType match {
      case _: InnerLike => codegenInner(ctx, input)
      case LeftOuter | RightOuter => codegenOuter(ctx, input)
      case LeftSemi => codegenSemi(ctx, input)
      case LeftAnti => codegenAnti(ctx, input)
      case j: ExistenceJoin => codegenExistence(ctx, input)
      case x =>
        throw new IllegalArgumentException(
          s"BroadcastHashJoin should not take $x as the JoinType")
    }
  }

?

Produce()方法就是生成java code的執行計劃,我們來看下doConsume()方法,可以看到里面有很多case,有Inner、Outer、semi、Anti各種Join型別,其中ExistenceJoin是底層自己呼叫的Join型別,我們并不會使用到,

任意點進一個case的回傳類,以InnerLike為例,以下是CodegenInner的原始碼

/**
   * Generates the code for Inner join.
   */
  private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
    val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
    val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
    val numOutput = metricTerm(ctx, "numOutputRows")

    val resultVars = buildSide match {
      case BuildLeft => buildVars ++ input
      case BuildRight => input ++ buildVars
    }
    if (broadcastRelation.value.keyIsUnique) {
      s"""
         |// generate join key for stream side
         |${keyEv.code}
         |// find matches from HashedRelation
         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
         |if ($matched != null) {
         |  $checkCondition {
         |    $numOutput.add(1);
         |    ${consume(ctx, resultVars)}
         |  }
         |}
       """.stripMargin

    } else {
      val matches = ctx.freshName("matches")
      val iteratorCls = classOf[Iterator[UnsafeRow]].getName
      s"""
         |// generate join key for stream side
         |${keyEv.code}
         |// find matches from HashRelation
         |$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value});
         |if ($matches != null) {
         |  while ($matches.hasNext()) {
         |    UnsafeRow $matched = (UnsafeRow) $matches.next();
         |    $checkCondition {
         |      $numOutput.add(1);
         |      ${consume(ctx, resultVars)}
         |    }
         |  }
         |}
       """.stripMargin
    }
  }

總體流程如下:

1、prepareBroadcast,生成廣播變數以便發送出去

2、genStreamSideJoinKey,生成能生成Join Key的代碼

3、getJoinCondition,生成用于過濾row的Join Condition的代碼

4、最后回傳Inner Join的代碼

以上就是BroadcastHashJoinExec做的一些基本的事,prepareBroadcast是其核心,

Broadcast Hash Join的總體流程

資料收集階段:利用 collect 算子將小表的資料先收集到Driver端上

Broadcast階段:選取小表生成broadcast分發到個Executor上,進而避免Shuffle

Hash Join階段:各點上自己做Hash Join

其實就是分布式操作(Broadcast分發)+Hash Join = Broadcast Hash Join,總體還是比較好理解的

這里再總結一下特點

Broadcast Hash Join特點

  • 僅支持等值連接,join key不需要排序
  • 除full outer join外其他join型別均支持
  • 需要對小表構建Hash map,如果小表比較大可能會OOM,同時將在Driver端存盤小表資料,可以通過spark.sql.autoBroadcastJoinThreshold設定,默認為10M
  • 被廣播表的大小閾值不能超過8GB,這點在原始碼中有所體現

Shuffle Hash Join

Shuffle Hash Join同樣也是大表join小表的一種處理方式,只不過這里的小表略有特殊,這里的小表是由一張較大的表切分而出的,因為在我們真實的環境中,不可能每次參與join的物件都是很小的表,所Shuffle Hash Join誕生了,采用了分治的思想,將大表分而治之,也就是根據join Key先將資料做一次Shuffle,

直接上原始碼

case class ShuffledHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin {

  override lazy val metrics = Map(
    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
    "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))

  override def requiredChildDistribution: Seq[Distribution] =
    HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

  private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
    val buildDataSize = longMetric("buildDataSize")
    val buildTime = longMetric("buildTime")
    val start = System.nanoTime()
    val context = TaskContext.get()
    val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
    buildTime += (System.nanoTime() - start) / 1000000
    buildDataSize += relation.estimatedSize
    // This relation is usually used until the end of task.
    context.addTaskCompletionListener[Unit](_ => relation.close())
    relation
  }

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
      val hashed = buildHashedRelation(buildIter)
      join(streamIter, hashed, numOutputRows)
    }
  }
}

我們發現其原始碼并不多,只有doExecute()方法,那我們從doExecute中開始看起,該方法回傳了一個RDD,內部首先做了Hashed的獲得,其次呼叫了Hash Join的join方法進行了對不同join的呼叫,從這里可以看出它的核心流程還是Hash Join,

Shuffle Hash Join的總體流程

Shuffle階段:將兩張表根據Join條件Shuffle到不同的partition上

HashJoin階段:在每個Executor上做單點的Hash Join

Shuffle Hash Join的特點

  • 僅支持等值連接,join key不需要排序
  • 除full outer join外其他join型別均支持
  • 仍需要對小表構建Hash map,如果小表比較大可能會OOM,雖然對較小的大表進行了劃分,但是劃分出的小表不會超過spark.sql.autoBroadcastJoinThreshold的設定,一樣可以通過spark.sql.autoBroadcastJoinThreshold設定,默認為10M
  • 被shuffle的小表大小還要額外滿足兩個條件,一個是Size(小表) < spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions,第二個是Size(小表)*3 <=Size(大表)
  • 由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Sort > Shuffle Hash Join,所以將引數spark.sql.join.prefersortmergeJoin(默認為true)置為false,有利于提高Shuffle Hash Join的使用率

大表join大表--Sort Merge Join

由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join,所以當Broadcast Hash Join不滿足觸發條件時,spark底層會自動選擇Sort Merge Join執行,

Sort Merge Join,意如其名,其中有排序的操作,主要用于兩張大表join時的處理

直接上原始碼

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    val spillThreshold = getSpillThreshold
    val inMemoryThreshold = getInMemoryThreshold
    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
      val boundCondition: (InternalRow) => Boolean = {
        condition.map { cond =>
          newPredicate(cond, left.output ++ right.output).eval _
        }.getOrElse {
          (r: InternalRow) => true
        }
      }

      // An ordering that can be used to compare keys from both sides.
      val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
      val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)

      joinType match {
        //,,,
      }

    }
  }

我們直接看關鍵部分,我們可以很明顯的看到先做了newNaturalAscendingOrdering這個明顯的Key排序操作,接下來才做了JoinType的匹配,進行兩張表的Join,

Sort Merge Join的總體流程

Shuffle階段:將兩張表根據Join條件的key,Shuffle相應的資料到相同的partition上

Sort階段:兩邊Key做升序排序

Join階段:兩邊Key進行Join,符合條件就匹配輸出結果

Sort Merge Join的特點

  • 僅支持等值連接,join key需要排序
  • 所有join型別均支持
  • 由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Sort > Shuffle Hash Join,所以引數spark.sql.join.prefersortmergeJoin默認為true

非等值連接--Broadcast Nested Loop Join與Cartesian Join

Broadcast Nested Loop Join

當上述3種Join策略都無法觸發時,spark底層將會自動選擇Braodcast Nested Loop Join作為Join策略,它和Broadcast Hash join的分布式處理思想其實有點類似,都是發送Broadcast以避免Shuffle,但是由于非等值連接的原因,他回圈判斷很多次,所以其名字中有Loop的存在

Broadcast Nested Loop Join的特點

  • 支持等值和非等值連接
  • 支持所有的JOIN型別,當右外連接時要廣播左表,當左外連接時要廣播右表,當內連接時,要廣播左右兩張表

Cartesian Join(笛卡爾積)

就是笛卡爾積操作,具體實作方法在本文前半部分描述笛卡爾積Cross Join已經描述了,這里就不多贅述了,就描述一下其

Cartesian Join特點

  • 同時支持等值和不等值連接
  • 僅支持Inner Join
  • 需要開啟引數spark.sql.crossJoin.enabled=true,否則會被系統提示
  • 由于spark底層的Join優化機制,有時候就算使用了Cross Join也不會觸發Cartesian Join,在資料量小的時候基本會觸發Broadcast Nested Loop Join

Spark的Join選擇策略

上面提到了Spark五種Join選擇策略,那么在底層選擇這五種策略的原始碼如下

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      // --- BroadcastHashJoin --------------------------------------------------------------------

      // broadcast hints were specified
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

      // broadcast hints were not specified, so need to infer it from size and configuration.
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

      // --- ShuffledHashJoin ---------------------------------------------------------------------

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

      // --- SortMergeJoin ------------------------------------------------------------

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

      // --- Without joining keys ------------------------------------------------------------

      // Pick BroadcastNestedLoopJoin if one side could be broadcast
      case j @ logical.Join(left, right, joinType, condition)
          if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      case j @ logical.Join(left, right, joinType, condition)
          if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      // Pick CartesianProduct for InnerJoin
      case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

      case logical.Join(left, right, joinType, condition) =>
        val buildSide = broadcastSide(
          left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
        // This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      // --- Cases where this strategy does not apply ---------------------------------------------

      case _ => Nil
    }
  }

我們從順序往下看,

1、首先是Broadcast Hash Join,可以看到使用了

if canBroadcastByHints(joinType, left, right)

來進行判斷,就是我們是否在sql中使用了hint優化指令,比如

select /*+ BRAODCASTJOIN(B) */A.col1, B.col2
FROM A 
JOIN B 
ON A.col1 = B.col2; 

如果有則該方法判斷通過,直接使用Broadcast Hash Join

2、可看到第二個case仍然是Broadcast Hash Join,但是判斷條件換成了

if canBroadcastBySizes(joinType, left, right)

其意思就是通過大小來判斷是否應該執行Broadcast Hash Join,上文有提到一個引數spark.sql.autoBroadcastJoinThreshold,在其其底層判斷就用到了,原始碼如下

private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
    }

3、第三個是Shuffle Hash Join的判斷(基表為左表,右表為小表的情況),我們直接看if條件

if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys)

首先我們可以看到conf.preferSortMergeJoin這個條件,還記得上文提到的的spark.sql.preferSortMergeJoin這個引數嗎,由于其默認設為true,所以Shuffl Hash Join將不會在這里被呼叫,我們假設該引數已經被我們設為了true,接著往下,首先是對Join Key的一個判斷,然后是對右表是否能被做成HashMap的判斷,最后的muchSmaller()則是比較兩邊表的大小是否符合條件(本文上部分提到過應該滿足小表的Size<=大表Size的1/3

在或條件旁還存在一個判斷條件RowOrdering.isOrderable(),主要是判斷Join key是否能夠排序,因為Sort Merge Join是需要對Key進行排序的,如果Key不能排序,自然不會去執行Sort Merge Join,

4、第四個是Shuffle Hash Join的判斷(基表為右表,左表為小表的情況),條件都是一樣,只是Left和Right交換了位置,這里我就不再多做贅述了,

5、第五個是Sort Merge Join的判斷,內容很簡單

if RowOrdering.isOrderable(leftKeys)

就一個RowOrdering.isOrderable()的判斷,前文已經提過了,主要是判斷Join key是否能夠排序,因為Sort Merge Join是需要對Key進行排序的,如果Key不能排序,自然不會去執行Sort Merge Join,

6、從下面開始就是非等值連接了,首先是對Broadcast Nested Loop Join的判斷

if canBroadcastByHints(joinType, left, right)

可以看到和Broadcast Hash Join的判斷基本是一樣的,首先是對Hints的判斷

7、仍然是對Broadcast Nested Loop Join的判斷

if canBroadcastBySizes(joinType, left, right)

是對Size的判斷

8、如果上述對Broadcast Nested Loop Join呼叫不成功,則執行笛卡爾積操作Cartesian Join,我們可以到看到沒有判斷條件,它僅僅只能對inner Join生效

9、仍然是Broadcast Nested Loop Join,屬于實在是無可奈何的情況了,只能被迫選擇這種呼叫方式,我們可以看到一句有趣的注釋

// This join could be very slow or OOM

可能會很慢或者OOM(Out Of Memory)

調度順序總結

1、有Hint,使用Broadcast Hash Join

2、無Hint,按照Broadcast Hash Join -> Shuffle Hash Join(conf引數被更改) -> Sort Merge Join -> Broadcast Nested Loop Join(滿足條件) ->

Cartesian Join -> Broadcast Nested Loop Join(滿足條件)

后記

本文旨在學習和交流,筆者也查閱了許多資料和閱讀原始碼與動手實踐的出來的經驗,如有問題歡迎指出,若需轉載請進行標注,

這是本文中參考的文章地址Spark Codegen淺析-阿里云開發者社區 (aliyun.com)

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292667.html

標籤:其他

上一篇:windows 10系統下安裝Hadoop

下一篇:Flume【基礎知識 01】【簡介 + 基本架構 + 核心概念 + 架構模式 + Agent內部原理 + 配置格式】(一篇就可入門flume)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more