2021SC@SDUSC
1.執行物理執行計劃:
經過分析、優化、邏輯計劃轉換為物理計劃的懶執行,最終呼叫SparkPlan的execute方法執行物理計劃,以execution.Project為例,其execute方法見代碼:
Project及其execute方法:
@DeveloperApi
case class Project (projectList: Seq [NamedExpression], chi1d: SparkPlan) extends
UnaryNode {
override def output = projectList .map(_.toAttribute)
@transient 1azy val bui ldProjection = newMutableProjection (projectList, child.output)
def execute() = child.execute().mapPartitions { iter =>
val resuableProjection = bu1ldProjection ().
iter.map (resuableProjection)
}
}
Project的execute方法執行步驟:
1.呼叫child的execute方法,以保證將要投影的輸入資料已經經過處理,
2.呼叫SparkOlan的newMutableProjection來處理其投影操作,newMutableProjection的實作代碼:
SparkPlan的newMutableProjection方法:
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug (
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen: $codegenEnabled")
if (codegenEnabled) {
GenerateMutableProjection (expressions,inputSchema)
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
newMuablePrijction默認情況下使用InterpretedMutableProjection處理投影,其實作見代碼,BindReferences. bindReference再次使用了transform 方法,用于給運算式系結參考,比如將List(name#1)替換為List(input[1]),最終的投影由InterpretedMutableProjection的apply方法來完成,BindReferences.bindReference 的實作見代碼:
Projection.scala中的InterpretedMutableProjection實作:
case class InterpretedMutableProjection (expressions: Seq[Expression]) extends
MutableProjection {
def this lexpressions: Seq lExpression], inputSchema: Seq[Attribute]l =
this (expressions.map (BindRe ferences.bindReference(_, input Schema1)l
privatelthis] val exprArray = expressions.toArray
private[this] var mutableRow: MutableRow = new GenericMutableRow (exprArray.size)
def currentvalue: Row = mutableRow
override def target (xow: MutableRow): MutableProjectlon = {
mutableRow = row
this
}
override def apply(input: Row): Row - {
var i = 0
while (i < exptArray.length) {
mutableRow(i) = exprArray(i).eval (input)
i += 1
}
mutableRow
}
}
BindReferences. bindReference的實作
object BindReferences extends Logging {
def bindReference[A <: Expression] (
expression: A,
input: Seq [Attribute],
allowFailures: Boolean = false): A = {
expression. transform { case a: AttributeReference =>
attachTree(a, "Binding attribute") (
val ordinal = input. indexWhere(_.exprId = a.exprId)
if (ordinal == -1) {
if (allowFailures) (
a
} else {
sys .error(s"Couldn't find $a in ${input.mkString("[",",", "]")}")
}
} else {
BoundReference (ordinal, a.dataType, a.nullable)
}
}
}.asInstanceof[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
}
}
再以execution.Filter為例,其execute方法見代碼:
Filter的execute方法的執行步驟如下:
1 )呼叫child的execute方法,以保證將要過濾的輸入資料已經經過處理,
2)呼叫SparkPlan的newPredicate來處理其過濾操作,newPredicate 的實作見代碼:
Filter及其excute方法:
@DeveloperApi
case class Filter (condition: Expression, child: SparkPlan) extends UnaryNode {
override def output = child. output
@transient lazy val conditionEvaluator = newPredicate (condition, child. output)
def execute() = child. execute() .mapPartitions { iter =>
iter. filter (conditionEvaluator)
}
}
newPredicate默認使用InterpretedPredicate處理過濾,其實作見下面代碼,BindReferences. bindReference方法在此處將(age#0 >= 13) && (age#0 <= 19))轉換為[(input[0] >=13), (input[0] <= 19)], 最終的過濾由InterpretedPredicate 的第二個apply 方法來完成,
SparkPlan的newPredicate方法:
protected def newPredicate (
expression: Expression, inputSchema: Seq [Attribute]): (Row) => Boolean = {
if (codegenEnabled){
GeneratePredicate (expression, inputSchema)
} else{
InterpretedPredicate (expression,inputSchema)
}
}
InterpretedPredicate的實作:
object InterpretedPredicate {
def apply (expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
apply (BindReferences.bindReference (expression, inputSchema) )
def apply (expression: Expression) : (Row => Boolean) = {
(r: Row) => expression.eval (r).asInstanceOf[Boolean]
}
}
execution.Project和execution.Filter都有child,并不是所有的SparkPlan的子類都有child,
比如execution.PhysicalRDD是沒有child的,因為execution.PhysicalRDD一般是作為最底層的LogicalPlan 節點,其代碼實作如下,
case class PhysicalRDD (output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute () = rdd
}
基于整個SparkPlan的execute體系,就可以保證先執行低層(孩子)的SparkPlan的轉換動作,然后才執行當前SparkPlan的轉換動作,最終完成SQL的執行,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/385527.html
標籤:其他
上一篇:Hbase基礎
