目錄
Spark支持的七種Join方式
Inner Join
Cross Join
Left Outer Join
Right Outer Join
Full Outer Join
Left Semi Join
Left Anti Join
Spakr支持的五種Join策略
小表Join大表--Broadcast Hash Join與Shuffle Hash Join
Broadcast Hash Join
Broadcast Hash Join的總體流程
Broadcast Hash Join特點
Shuffle Hash Join
Shuffle Hash Join的總體流程
Shuffle Hash Join的特點
大表join大表--Sort Merge Join
非等值連接--Broadcast Nested Loop Join與Cartesian Join
Broadcast Nested Loop Join
Broadcast Nested Loop Join的特點
Cartesian Join(笛卡爾積)
Cartesian Join特點
Spark的Join選擇策略
調度順序總結
后記
Join無論SparkCore中還是在SparkSql都占據著至關重要的地位,今天來閱讀一下Join部分的原始碼
Spark支持的七種Join方式
直接看Join算子的原始碼,發現經過反復呼叫,最侄訓來到這個Join方法內部
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sparkSession.sessionState.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]
withPlan {
Join(
joined.left,
joined.right,
UsingJoin(JoinType(joinType), usingColumns),
None)
}
}
該方法默認為Inner Join,可以看到傳入了String型別的名為joinType的param,被封裝到了JoinType的Object物件內,其apply方法原始碼如下
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" => LeftSemi
case "leftanti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi",
"leftanti", "left_anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
Inner Join
Inner Join又稱為內連接,平日里書寫時,用join或者Inner Join,它join的結果就是符合關聯條件下的兩張表資料的交集,例如有如下代碼
Student表:
sparkSession.sql(
"""
|select 1 as id,'lili' as name
|union all
|select 2 as id,'hanhan' as name""".stripMargin)
.createOrReplaceTempView("Student")
Score表:
sparkSession.sql("select 1 as id,'math' as subject,99 as score")
.createOrReplaceTempView("Score")
join sql:
select t1.id,t1.name,t2.subject,t2.score
from Student t1
join Score t2
on t1.id=t2.id
結果如下:


Cross Join
Cross Join,直譯就是交叉Join,也就是所謂的笛卡爾積操作,笛卡爾積有很多實作的方法,比如非等值連接:
select *
from Student t1
join Score t2
on t1.id != t2.id
在from中同時from兩張表
select *
from Student,Score
join不寫條件
select *
from Student t1
join Score t2
on條件中用上or
select *
from Student t1
join Score t2
on t1.id = t2.id or t1.name =t2.name
或者直接寫cross Join
select *
from Student t1
cross join Score t2
都可以觸發笛卡爾積操作
它的結果或者執行程序中基本都會生成這樣一張表

就是所謂的交叉乘積,Student表中的每一條資料和Score表中的每一條資料都做了排列組合式聚合并最終輸出,
但是有個要注意的地方是,有時候并不會真的在底層做了笛卡爾積操作,因為sparkSql內部存有優化器,并且在非等值連接且條件允許的情況下,一般是對Broadcast Nested Loop Join進行呼叫,所以如何判斷是否真的產生了笛卡爾積?我們查詢執行計劃的時候如果看到
CartesianProduct
那就是底層真正采用了笛卡爾積操作,
關于sparkSql的Join策略將會在本文后半部分講述,
Left Outer Join
Left Outer Join,又稱左外連接,平常使用的時候寫left outer join或者left join都行,其作用是顯示全左表的資料,即便這部分資料不滿足Join條件
join sql:
select *
from Student t1
left join Score t2
on t1.id = t2.id
結果如下:

可以看到左表沒匹配到的欄位全部置為null
Right Outer Join
Right Outer Join,又稱右外連接,平常使用的時候寫right outer join或者right join都行,與left outer join類似,其作用是顯示全右表的資料,即便這部分資料不滿足Join條件
join sql:
select *
from Student t1
right join Score t2
on t1.id = t2.id
結果如下:

這里我將Student換到右表去,結果與預期一致
Full Outer Join
Full Outer Join,又稱全外連接,平常使用的時候寫full outer join或者full join都行,其作用是顯示全右左表的全部資料,即便這部分資料不滿足Join條件
join sql:
select *
from Score t1
full join Student t2
on t1.id = t2.name
結果如下:

我特意讓兩張表的join條件的欄位不一致,結果如預期一般全部顯示
Left Semi Join
Left Semi Join,又名左半連接,算是一種優化策略式的join方式,sparkSql的底層通常會將in操作優化為Left Semi Join,我們重點講解一下這個Join
首先注意它有幾個特點
1、右表的key僅僅只傳遞到map階段,所以它將不會在結果中出現
2、他的輸出結果與join類似,都是只輸出符合join條件的資料,但是它更快而且left semi join天生就是去重,可以看成一個更快的,去重的join
3、因為 left semi join 是 in(keySet) 的關系,遇到右表重復記錄,左表會跳過,比如左表中有一個time欄位中有2020-11-05這樣一條資料,右表有兩個欄位start_time和end_time組成一個時間段,這之中有兩條資料,分別是2020-10-12至2020-12-06和2020-9-17至2020-12-07,join的條件是time between start_time and end_time,那么join他會將這兩個條件都匹配上,致使資料發散,而left semi join只會匹配到一條滿足條件的資料,其他就會被跳過,這也是為什么left semi join本身自帶去重,因為沒有資料發散,又十分快,因為僅滿足條件就行,
下面舉幾個例子來說說它的特點
首先我們來兩張表
訂單表:

活動表:

join sql:
-- join
select *
from order t1
join campaign t2
on t1.order_dt between start_time and end_time
--left semi join
select *
from order t1
left semi join campaign t2
on t1.order_dt between start_time and end_time
結果如下:
join結果

left semi join結果

可以很明顯的看到join的結果資料發散了,而left semi join并沒有,這個結果符合了我們剛剛提到幾個特點
Left Anti Join
Left Anti Join,名為反左連接,其作用是顯示沒匹配的資料,其實就類似于not in,這也是一個經典的優化策略式的join方式,與left semi join相同,右表的資料也僅僅傳到map階段,并不會輸出到最終的結果,其底層的優化與left semi join類似
join sql:
select *
from Student t1
left anti join Score t2
on t1.id=t2.id
結果如下:
這里給個預期的結果,我就不多做贅述了
Spakr支持的五種Join策略
上述提到了spark的七種join方式,那么下面來說說spark的五種Join策略,它們分別是
1、Broadcast Hash Join
2、Shuffle Hash Join
3、Sort Merge Join
4、Cartesian Join
5、Broadcast Nested Loop Join
小表Join大表--Broadcast Hash Join與Shuffle Hash Join
我們知道spark是一個分布式計算引擎,其Join的核心其實仍是傳統資料庫中常見的各類Join策略,這邊先說說Broadcast Hash Join與Shuffle Hash Join用到的Hash Join是什么
舉例上文提到的Student表和Score表,假如有這么一個sql:select * from Student t1 join Score t2 on t1.id=t2.id.
那么Hash Join會做這么幾件事情,如下圖

1、首先Score表會作為一張Build Table,Student表會作為一張Probe Table,區分這兩者的因素是表的大小,為何選取小表做Build Table進而去生成Hash Table呢?因為小表體量小,可以完全加載到記憶體中,并且能夠比較輕松地被作為BroadCast分發出去,
2、掃描Score表全表,Score表會根據join條件的key做Hash操作,將該key放到對應的Bucket中,進而build一張Hash Table,
3、掃描Student表全表,根據Join條件對Join Key做Hash操作,映射到對應的Bucket上,此時會跟Hash Table中Score映射出來tuple的Key匹配上,此時還會再做一次Join條件和filter條件的判斷,才會最終輸出匹配的資料,
Broadcast Hash Join
我們直接上原始碼
case class BroadcastHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport {
//......
}
我們可以看到該class動態混入了 HashJoin和CodegenSupport,Hash Join我就不多說了,CodegenSupport則和Codegen有關,Codegen是Spark Runtime優化性能的關鍵技術,核心在于動態生成java代碼、即時compile和加載,把解釋執行轉化為編譯執行,Spark Codegen分為Expression級別和WholeStage級別,簡而言之Codegen就是管理執行計劃優化和生成的老大了,CodegenContext是它的核心類代碼,這里參考阿里云社區bean_stalk大佬文章內的一張圖(文章地址我在本文文末貼出)

我們可以看到起點是execute(),隨后是doExecute()方法,doExecute()主要做了兩件事,分為資料獲取與代碼生成兩部分,
資料獲取走的是inputRdd->inputRdds->execute()這條路,假設物理算子節點 A 支持代碼生成,物理算子節點 B 不支持代碼生成,因此 B 會采用 InputAdapter 封裝
代碼生成則是produce()/doProduce()和consume()/doConsume()這條路,produce和consume是專門用以生成代碼的
因此我們直接看doProduce()和doConsume()兩被被override的原始碼
?
override def doProduce(ctx: CodegenContext): String = {
streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
joinType match {
case _: InnerLike => codegenInner(ctx, input)
case LeftOuter | RightOuter => codegenOuter(ctx, input)
case LeftSemi => codegenSemi(ctx, input)
case LeftAnti => codegenAnti(ctx, input)
case j: ExistenceJoin => codegenExistence(ctx, input)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
}
}
?
Produce()方法就是生成java code的執行計劃,我們來看下doConsume()方法,可以看到里面有很多case,有Inner、Outer、semi、Anti各種Join型別,其中ExistenceJoin是底層自己呼叫的Join型別,我們并不會使用到,
任意點進一個case的回傳類,以InnerLike為例,以下是CodegenInner的原始碼
/**
* Generates the code for Inner join.
*/
private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
val resultVars = buildSide match {
case BuildLeft => buildVars ++ input
case BuildRight => input ++ buildVars
}
if (broadcastRelation.value.keyIsUnique) {
s"""
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashedRelation
|UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
|if ($matched != null) {
| $checkCondition {
| $numOutput.add(1);
| ${consume(ctx, resultVars)}
| }
|}
""".stripMargin
} else {
val matches = ctx.freshName("matches")
val iteratorCls = classOf[Iterator[UnsafeRow]].getName
s"""
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashRelation
|$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value});
|if ($matches != null) {
| while ($matches.hasNext()) {
| UnsafeRow $matched = (UnsafeRow) $matches.next();
| $checkCondition {
| $numOutput.add(1);
| ${consume(ctx, resultVars)}
| }
| }
|}
""".stripMargin
}
}
總體流程如下:
1、prepareBroadcast,生成廣播變數以便發送出去
2、genStreamSideJoinKey,生成能生成Join Key的代碼
3、getJoinCondition,生成用于過濾row的Join Condition的代碼
4、最后回傳Inner Join的代碼
以上就是BroadcastHashJoinExec做的一些基本的事,prepareBroadcast是其核心,
Broadcast Hash Join的總體流程
資料收集階段:利用 collect 算子將小表的資料先收集到Driver端上
Broadcast階段:選取小表生成broadcast分發到個Executor上,進而避免Shuffle
Hash Join階段:各點上自己做Hash Join
其實就是分布式操作(Broadcast分發)+Hash Join = Broadcast Hash Join,總體還是比較好理解的
這里再總結一下特點
Broadcast Hash Join特點
- 僅支持等值連接,join key不需要排序
- 除full outer join外其他join型別均支持
- 需要對小表構建Hash map,如果小表比較大可能會OOM,同時將在Driver端存盤小表資料,可以通過spark.sql.autoBroadcastJoinThreshold設定,默認為10M
- 被廣播表的大小閾值不能超過8GB,這點在原始碼中有所體現
Shuffle Hash Join
Shuffle Hash Join同樣也是大表join小表的一種處理方式,只不過這里的小表略有特殊,這里的小表是由一張較大的表切分而出的,因為在我們真實的環境中,不可能每次參與join的物件都是很小的表,所Shuffle Hash Join誕生了,采用了分治的思想,將大表分而治之,也就是根據join Key先將資料做一次Shuffle,
直接上原始碼
case class ShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin {
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
val buildTime = longMetric("buildTime")
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
buildTime += (System.nanoTime() - start) / 1000000
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener[Unit](_ => relation.close())
relation
}
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
join(streamIter, hashed, numOutputRows)
}
}
}
我們發現其原始碼并不多,只有doExecute()方法,那我們從doExecute中開始看起,該方法回傳了一個RDD,內部首先做了Hashed的獲得,其次呼叫了Hash Join的join方法進行了對不同join的呼叫,從這里可以看出它的核心流程還是Hash Join,
Shuffle Hash Join的總體流程
Shuffle階段:將兩張表根據Join條件Shuffle到不同的partition上
HashJoin階段:在每個Executor上做單點的Hash Join
Shuffle Hash Join的特點
- 僅支持等值連接,join key不需要排序
- 除full outer join外其他join型別均支持
- 仍需要對小表構建Hash map,如果小表比較大可能會OOM,雖然對較小的大表進行了劃分,但是劃分出的小表不會超過spark.sql.autoBroadcastJoinThreshold的設定,一樣可以通過spark.sql.autoBroadcastJoinThreshold設定,默認為10M
- 被shuffle的小表大小還要額外滿足兩個條件,一個是Size(小表) < spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions,第二個是Size(小表)*3 <=Size(大表)
- 由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Sort > Shuffle Hash Join,所以將引數spark.sql.join.prefersortmergeJoin(默認為true)置為false,有利于提高Shuffle Hash Join的使用率
大表join大表--Sort Merge Join
由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join,所以當Broadcast Hash Join不滿足觸發條件時,spark底層會自動選擇Sort Merge Join執行,
Sort Merge Join,意如其名,其中有排序的操作,主要用于兩張大表join時的處理
直接上原始碼
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
val boundCondition: (InternalRow) => Boolean = {
condition.map { cond =>
newPredicate(cond, left.output ++ right.output).eval _
}.getOrElse {
(r: InternalRow) => true
}
}
// An ordering that can be used to compare keys from both sides.
val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)
joinType match {
//,,,
}
}
}
我們直接看關鍵部分,我們可以很明顯的看到先做了newNaturalAscendingOrdering這個明顯的Key排序操作,接下來才做了JoinType的匹配,進行兩張表的Join,
Sort Merge Join的總體流程
Shuffle階段:將兩張表根據Join條件的key,Shuffle相應的資料到相同的partition上
Sort階段:兩邊Key做升序排序
Join階段:兩邊Key進行Join,符合條件就匹配輸出結果
Sort Merge Join的特點
- 僅支持等值連接,join key需要排序
- 所有join型別均支持
- 由于在spark底層的Join策略的優先級順序是Broadcast Hash Join > Sort Merge Sort > Shuffle Hash Join,所以引數spark.sql.join.prefersortmergeJoin默認為true
非等值連接--Broadcast Nested Loop Join與Cartesian Join
Broadcast Nested Loop Join
當上述3種Join策略都無法觸發時,spark底層將會自動選擇Braodcast Nested Loop Join作為Join策略,它和Broadcast Hash join的分布式處理思想其實有點類似,都是發送Broadcast以避免Shuffle,但是由于非等值連接的原因,他回圈判斷很多次,所以其名字中有Loop的存在
Broadcast Nested Loop Join的特點
- 支持等值和非等值連接
- 支持所有的JOIN型別,當右外連接時要廣播左表,當左外連接時要廣播右表,當內連接時,要廣播左右兩張表
Cartesian Join(笛卡爾積)
就是笛卡爾積操作,具體實作方法在本文前半部分描述笛卡爾積Cross Join已經描述了,這里就不多贅述了,就描述一下其
Cartesian Join特點
- 同時支持等值和不等值連接
- 僅支持Inner Join
- 需要開啟引數spark.sql.crossJoin.enabled=true,否則會被系統提示
- 由于spark底層的Join優化機制,有時候就算使用了Cross Join也不會觸發Cartesian Join,在資料量小的時候基本會觸發Broadcast Nested Loop Join
Spark的Join選擇策略
上面提到了Spark五種Join選擇策略,那么在底層選擇這五種策略的原始碼如下
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
}
}
我們從順序往下看,
1、首先是Broadcast Hash Join,可以看到使用了
if canBroadcastByHints(joinType, left, right)
來進行判斷,就是我們是否在sql中使用了hint優化指令,比如
select /*+ BRAODCASTJOIN(B) */A.col1, B.col2
FROM A
JOIN B
ON A.col1 = B.col2;
如果有則該方法判斷通過,直接使用Broadcast Hash Join
2、可看到第二個case仍然是Broadcast Hash Join,但是判斷條件換成了
if canBroadcastBySizes(joinType, left, right)
其意思就是通過大小來判斷是否應該執行Broadcast Hash Join,上文有提到一個引數spark.sql.autoBroadcastJoinThreshold,在其其底層判斷就用到了,原始碼如下
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
3、第三個是Shuffle Hash Join的判斷(基表為左表,右表為小表的情況),我們直接看if條件
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys)
首先我們可以看到conf.preferSortMergeJoin這個條件,還記得上文提到的的spark.sql.preferSortMergeJoin這個引數嗎,由于其默認設為true,所以Shuffl Hash Join將不會在這里被呼叫,我們假設該引數已經被我們設為了true,接著往下,首先是對Join Key的一個判斷,然后是對右表是否能被做成HashMap的判斷,最后的muchSmaller()則是比較兩邊表的大小是否符合條件(本文上部分提到過應該滿足小表的Size<=大表Size的1/3)
在或條件旁還存在一個判斷條件RowOrdering.isOrderable(),主要是判斷Join key是否能夠排序,因為Sort Merge Join是需要對Key進行排序的,如果Key不能排序,自然不會去執行Sort Merge Join,
4、第四個是Shuffle Hash Join的判斷(基表為右表,左表為小表的情況),條件都是一樣,只是Left和Right交換了位置,這里我就不再多做贅述了,
5、第五個是Sort Merge Join的判斷,內容很簡單
if RowOrdering.isOrderable(leftKeys)
就一個RowOrdering.isOrderable()的判斷,前文已經提過了,主要是判斷Join key是否能夠排序,因為Sort Merge Join是需要對Key進行排序的,如果Key不能排序,自然不會去執行Sort Merge Join,
6、從下面開始就是非等值連接了,首先是對Broadcast Nested Loop Join的判斷
if canBroadcastByHints(joinType, left, right)
可以看到和Broadcast Hash Join的判斷基本是一樣的,首先是對Hints的判斷
7、仍然是對Broadcast Nested Loop Join的判斷
if canBroadcastBySizes(joinType, left, right)
是對Size的判斷
8、如果上述對Broadcast Nested Loop Join呼叫不成功,則執行笛卡爾積操作Cartesian Join,我們可以到看到沒有判斷條件,它僅僅只能對inner Join生效
9、仍然是Broadcast Nested Loop Join,屬于實在是無可奈何的情況了,只能被迫選擇這種呼叫方式,我們可以看到一句有趣的注釋
// This join could be very slow or OOM
可能會很慢或者OOM(Out Of Memory)
調度順序總結
1、有Hint,使用Broadcast Hash Join
2、無Hint,按照Broadcast Hash Join -> Shuffle Hash Join(conf引數被更改) -> Sort Merge Join -> Broadcast Nested Loop Join(滿足條件) ->
Cartesian Join -> Broadcast Nested Loop Join(滿足條件)
后記
本文旨在學習和交流,筆者也查閱了許多資料和閱讀原始碼與動手實踐的出來的經驗,如有問題歡迎指出,若需轉載請進行標注,
這是本文中參考的文章地址Spark Codegen淺析-阿里云開發者社區 (aliyun.com)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292667.html
標籤:其他
下一篇:Flume【基礎知識 01】【簡介 + 基本架構 + 核心概念 + 架構模式 + Agent內部原理 + 配置格式】(一篇就可入門flume)
