背景
在之前的文章中Spark DPP(動態磁區裁剪)導致的DataSourceScanExec NullPointerException問題分析以及解決,我們直接跳過了動態代碼生成失敗這版本一步部分,這次我們來分析一下,SQL還是在以上提到的文章中,
分析
運行完該sql,我們可以看到如下的物理計劃:


我們看到FilterExec和ColumnarRoRowExec并沒有在一個WholeStageCodegen 中, 這是為什么呢?
這是因為exists方法是繼承自CodegenFallback Trait,
我們可以跟蹤一下物理規則CollapseCodegenStages,對應的代碼如下:
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = {
plan match {
// For operators that will output domain object, do not insert WholeStageCodegen for it as
// domain object can not be written into unsafe row.
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: LocalTableScanExec =>
// Do not make LogicalTableScanExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: CodegenSupport if supportCodegen(plan) =>
// The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
// it can't support whole-stage-codegen at the same time.
assert(!plan.supportsColumnar)
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
在FilterExec做supportCodegen判斷的時候,還會掃描他的運算式是否存在CodegenFallback的子類,如果存在則不會把FilterExec做全代碼代碼生成,顯然這里是不滿足條件的, 而他的child,也就是ColumnarRoRowExec 他是符合的,所以ColumnarRoRowExec是可以用全代碼代碼生成的,
所以在driver端生成RDD的時候,FilterExec還是會走自身的doExecute方法,也就是先會運行createCodeGeneratedObject代碼部分,最侄訓是會呼叫到subexpressionElimination這個方法,從而報錯,
其實我們可以修改一下對應的sql,讓它走全代碼生成,去掉exists的過濾條件.
FROM test_b where scenes='gogo' and exists(array(date1),x-> x =='2021-03-04') -> FROM test_b where scenes='gogo'
這樣我們可以得到如下的物理計劃:


可以看到FilterExec和ColumnerExec是在一個WholeStageCodegen中,從代碼級別來說,就是會生成類似如下的資料結構:
WholeStageCodegenExec(FilterExec(ColumnarToRowExec(InputAdapter(FileSourceScanExec))))
所在在driver端生成對應RDD的時候,就會走到WholeStageCodegenExec的doExecute方法,如下:
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// try to compile and fallback if it failed
val (_, compiledCodeStats) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
return child.execute()
}
...
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter)
...
而 doCodeGen方法還是按照produce->doProduce->consume-doConsume>的呼叫方式來組織各個物理計劃間的代碼,
而最侄訓是會呼叫到FilterExec的doConsume代碼,而這里面涉及到的DPP的運算式代碼就是通過代碼生成的:
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val numOutput = metricTerm(ctx, "numOutputRows")
/**
* Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
*/
def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
val bound = BindReferences.bindReference(c, attrs)
val evaluated = evaluateRequiredVariables(child.output, in, c.references)
// Generate the code for the predicate.
val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
val nullCheck = if (bound.nullable) {
s"${ev.isNull} || "
} else {
...
通過genCode方法呼叫expression的 doGenCode方法,
對應到我們sql對應的運算式為DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) .
而DynamicPruningExpression的還是呼叫child的genCode的方法,也就是最侄訓呼叫到InSubqueryExec的doGenCode方法:
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
prepareResult()
inSet.doGenCode(ctx, ev)
}
...
private def prepareResult(): Unit = {
require(resultBroadcast != null, s"$this has not finished")
if (result == null) {
result = resultBroadcast.value
}
}
prepareResult方法是準備broadcast的值,
inSet 方法只是正常對一行row進行計算,
全代碼生成完后,進行代碼的編譯,如下:
val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
這里的rdds是會呼叫inputRDDs方法,而該方法是遞回呼叫child的inputRDDs方法,最侄訓呼叫到ColumnarToRowExec的inputRDDs方法,
從而呼叫InputAdapter的executeColumnar方法,最侄訓呼叫到FileSourceScanExec的doExecuteColumnar方法,從而生成對應的RDD,
生成的代碼會在executor端再次被編譯,從而進行運算,
注意:resultBroadcast的值是在WholeStageCodegenExec的方法execute中完成的:
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
其中executeQuery方法如下:
protected final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
waitForSubqueries()
query
}
}
...
final def prepare(): Unit = {
// doPrepare() may depend on it's children, we should call prepare() on all the children first.
children.foreach(_.prepare())
synchronized {
if (!prepared) {
prepareSubqueries()
doPrepare()
prepared = true
}
}
}
prepare和prepareSubqueries方法遞回呼叫,從而使子節點都能把準備作業做好,如這里的driver端的廣播,從而在executor端能夠獲取對應的廣播變數,
至此,分析就完成了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438093.html
標籤:其他
