Spark SQL原理決議前言:
Spark SQL原始碼剖析(一)SQL決議框架Catalyst流程概述
Spark SQL原始碼決議(二)Antlr4決議Sql并生成樹
Spark SQL原始碼決議(三)Analysis階段分析
Spark SQL原始碼決議(四)Optimization和Physical Planning階段決議
SparkPlan準備階段介紹
前面經過千辛萬苦,終于生成可實際執行的SparkPlan(即PhysicalPlan),但在真正執行前,還需要做一些準備作業,包括在必要的地方插入一些shuffle作業,在需要的地方進行資料格式轉換等等,
這部分內容都在org.apache.spark.sql.execution.QueryExecution類中,我們看看代碼
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
......其他代碼
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
//呼叫下面的preparations,然后使用foldLeft遍歷preparations中的Rule并應用到SparkPlan
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
/** A sequence of rules that will be applied in order to the physical plan before execution. */
//定義各個Rule
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
......其他代碼
}
準備階段是去呼叫prepareForExecution方法,而prepareForExecution也簡單,還是我們早先看過的Rule那一套東西,定義一系列的Rule,然后讓Rule去匹配SparkPlan然后轉換一遍,
這里在于每條Rule都是干嘛用的,這里介紹一下吧,
PlanSubqueries(sparkSession)
生成子查詢,在比較早的版本,Spark SQL還是不支持子查詢的,不過現在加上了,這條Rule其實是對子查詢的SQL新生成一個QueryExecution(就是我們一直分析的這個流程),還記得QueryExecution里面的變數基本都是懶加載的吧,這些不會立即執行,都是到最后一并執行的,說白了就有點遞回的意思,
EnsureRequirements(sparkSession.sessionState.conf)
這條是比較重要的,代碼量也多,主要就是驗證輸出的磁區(partition)和我們要的磁區是不是一樣,不一樣那自然需要加入shuffle處理重磁區,如果有排序需求還會排序,
CollapseCodegenStages
這個是和一個優化相關的,先介紹下相關背景,Whole stage Codegen在一些MPP資料庫被用來提高性能,主要就是將一串的算子,轉換成一段代碼(Spark sql轉換成java代碼),從而提高性能,比如下圖,一串的算子操作,可以轉換成一個java方法,這一一來性能會有一定的提升,

這一步就是在支持Codegen的SparkPlan上添加一個WholeStageCodegenExec,不支持Codegen的SparkPlan則會添加一個InputAdapter,這一點在下面看preparations階段結果的時候能看到,還有這個優化是默認開啟的,
ReuseExchange和ReuseSubquery
這兩個都是大概同樣的功能就放一塊說了,首先Exchange是對shuffle如何進行的描述,可以理解為就是shuffle吧,
這里的ReuseExchange是一個優化措施,去找有重復的Exchange的地方,然后將結果替換過去,避免重復計算,
ReuseSubquery也是同樣的道理,如果一條SQL陳述句中有多個相同的子查詢,那么是不會重復計算的,會將計算的結果直接替換到重復的子查詢中去,提高性能,
這里我略過了CollapseCodegenStages,這部分比較復雜,也沒什么時間看,就先跳過了,大概知道這個東西是一個優化措施就行了,
那再來看看這一階段后,示例代碼會變成什么樣吧,先看示例代碼:
//生成DataFrame
val df = Seq((1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
//呼叫spark.sql
val queryCaseWhen = sql("select key from src ")
結果生成如下:
Project [_1#2 AS key#5]
+- LocalTableScan [_1#2, _2#3]
好吧這里看還是和之前Optimation階段一樣,不過斷點看就不大一樣了,

由于我們的SQL比較簡單,所以只多了兩個SparkPlan,就是WholeStageCodegenExec和InputAdapter,和上面說的是一致的!
OK,經過以上的準備之后,就要開始最后的執行階段了,
SparkPlan執行生成RDD階段
依舊是在QueryExecution里面,
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
......其他代碼
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
......其他代碼
}
這里實際上是呼叫了之前生成的SparkPlan的execute()方法,這個方法最侄訓再呼叫它的doExecute()方法,而這個方法是各個子類自己實作的,也就是說,不同的SparkPlan執行的doExecute()是不一樣的,
通過上面的階段,我們得到了一棵4層的樹,不過其中WholeStageCodegenExec和InputAdapter是為Codegen優化生成的,這里就不討論了,忽略這兩個其實結果是一樣的,也就是說這里只介紹ProjectExec和LocalTableScanExec兩個SparkPlan的doExecute()方法,
先是ProjectExec這個SparkPlan,我們看看它的doExecute()代碼,
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
......其他代碼
protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
project.initialize(index)
iter.map(project)
}
}
......其他代碼
}
可以看到它是先遞回去呼叫child(也就是LocalTableScanExec)的doExecute()方法,還是得先去看看LocalTableScanExec生成什么東西呀,
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow]) extends LeafExecNode {
......其他代碼
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rdd.map { r =>
numOutputRows += 1
r
}
}
......其他代碼
可以看到最底層的rdd就是在這里實作的,LocalTableScanExec一開始就會生成一個lazy的rdd,在需要的時候回傳,而在doExecute()方法中的numOutputRows可以理解為僅是一個測量值,暫時不用理會,總之這里我們就發現LocalTableScanExec的doExecute()其實就是回傳一個parallelize生成的rdd,然后再回到ProjectExec去,
它呼叫child.execute().mapPartitionsWithIndexInternal {......},這里的mapPartitionsWithIndexInternal和rdd的mapPartitionsWithIndex是類似的,區別只在于mapPartitionsWithIndexInternal只會在內部模塊使用,如果有童鞋不明白mapPartitionsWithIndex這個API,可以百度查查看,然后重點看mapPartitionsWithIndexInternal的內部邏輯,
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
project.initialize(index)
iter.map(project)
}
這里最后一行iter.map(project),其實還是scala的語法糖,實際大概是這樣iter.map(i => project.apply(i)),就是呼叫project的apply方法,對每行資料處理,然后通過追蹤,可以發現project的實體是InterpretedUnsafeProjection,我們看看它的apply方法,
class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection {
......其他代碼
override def apply(row: InternalRow): UnsafeRow = {
// Put the expression results in the intermediate row.
var i = 0
while (i < numFields) {
values(i) = expressions(i).eval(row)
i += 1
}
// Write the intermediate row to an unsafe row.
rowWriter.reset()
writer(intermediate)
rowWriter.getRow()
}
......其他代碼
這里其實重點在最后三行,就是將結果寫入到result row,再回傳回去,當執行完畢的時候,就會得到最終的RDD[InternalRow],再剩下的,就交給spark core去處理了,
小結
OK,那到這里基本就把Spark整個流程給講完了,回顧一下整個流程,

其實可以發現流程是挺簡單的,很多其他SQL決議框架(比如calcite)也是類似的流程,只是在設計上在某些方面的取舍會有偏差,而后深入到代碼的時候容易陷入一些細節中,當然這幾篇也省略了很多細節,很多時候細節才是真正精髓的地方,以后有如果涉及到的時候再寫文章討論吧(/偷笑),如果在開放程序中涉及到SQL決議這方面的開放,應該都會是在優化方面,也就是Optimization階段增加或處理Rule,這塊就需要對代數優化理論和代碼有一些了解了,
限于本人水平,介紹spark sql的這幾篇文章難免有疏漏和不足的地方,歡迎在評論區評論,先謝過了~~
以上~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/179626.html
標籤:Java
上一篇:自定義持久層框架設計實作思路
