Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.UnknownRuleId
Expand Down Expand Up @@ -55,6 +56,32 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]

def output: Seq[Attribute]

override def nodeWithOutputColumnsString(maxColumns: Int): String = {
try {
nodeName + {
if (this.output.length > maxColumns) {
val outputWithNullability = this.output.take(maxColumns).map { attr =>
attr.toString + s"[nullable=${attr.nullable}]"
}

outputWithNullability.mkString(" <output=", ", ",
s" ... ${this.output.length - maxColumns} more columns>")
} else {
val outputWithNullability = this.output.map { attr =>
attr.toString + s"[nullable=${attr.nullable}]"
}

outputWithNullability.mkString(" <output=", ", ", ">")
}
}
} catch {
case _: UnresolvedException =>
// If we encounter an UnresolvedException, it's high likely that the call of `this.output`
// throws it. In this case, we may have to give up and only show the nodeName.
nodeName + " <output='Unresolved'>"
}
}

/**
* Returns the set of attributes that are output by this node.
*/
Expand Down Expand Up @@ -797,9 +824,10 @@ object QueryPlan extends PredicateHelper {
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): Unit = {
printOperatorId: Boolean = false,
printOutputColumns: Boolean = false): Unit = {
try {
plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId)
plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
} catch {
case e: AnalysisException => append(e.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,19 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
if (!newPlan.fastEquals(oldPlan)) {
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
def message(): MessageWithContext = {
val oldPlanStringWithOutput = oldPlan.treeString(verbose = false,
printOutputColumns = true)
val newPlanStringWithOutput = newPlan.treeString(verbose = false,
printOutputColumns = true)
// scalastyle:off line.size.limit
log"""
|=== Applying Rule ${MDC(RULE_NAME, ruleName)} ===
|${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
|
|Output Information:
|${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))}
""".stripMargin
// scalastyle:on line.size.limit
}

logBasedOnLevel(logLevel)(message())
Expand All @@ -74,10 +83,19 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
def message(): MessageWithContext = {
if (!oldPlan.fastEquals(newPlan)) {
val oldPlanStringWithOutput = oldPlan.treeString(verbose = false,
printOutputColumns = true)
val newPlanStringWithOutput = newPlan.treeString(verbose = false,
printOutputColumns = true)
// scalastyle:off line.size.limit
log"""
|=== Result of Batch ${MDC(BATCH_NAME, batchName)} ===
|${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we append the output info in the verbose treeString? Is the diff readable?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was pulled into other things.

I'm not sure I'm following your suggestion actually. Do you suggest to print out two trees when we call treeString with verbose = true? Or do you suggest to add output columns into node for verboseString?

In either way, they have their own issue. For former, sideBySide won't line up if the optimization removes out some nodes. (In some cases we will need to compare diagonal lines.) For latter, we will print out max 50 (25 * 2) columns which isn't easily to read.

|
|Output Information:
|${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))}
""".stripMargin
// scalastyle:on line.size.limit
} else {
log"Batch ${MDC(BATCH_NAME, batchName)} has no effect."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): String = {
printOperatorId: Boolean = false,
printOutputColumns: Boolean = false): String = {
val concat = new PlanStringConcat()
treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId)
treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
concat.toString
}

Expand All @@ -958,9 +959,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int,
printOperatorId: Boolean): Unit = {
printOperatorId: Boolean,
printOutputColumns: Boolean): Unit = {
generateTreeString(0, new java.util.ArrayList(), append, verbose, "", addSuffix, maxFields,
printOperatorId, 0)
printOperatorId, printOutputColumns, 0)
}

/**
Expand Down Expand Up @@ -1011,6 +1013,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
*/
def innerChildren: Seq[TreeNode[_]] = Seq.empty

def nodeWithOutputColumnsString(maxColumns: Int): String = simpleString(maxColumns)

/**
* Appends the string representation of this node and its children to the given Writer.
*
Expand All @@ -1029,6 +1033,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
(0 until indent).foreach(_ => append(" "))
if (depth > 0) {
Expand All @@ -1044,6 +1049,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else if (printNodeId) {
simpleStringWithNodeId()
} else if (printOutputColumns) {
nodeWithOutputColumnsString(maxFields)
} else {
simpleString(maxFields)
}
Expand All @@ -1057,15 +1064,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
lastChildren.add(false)
innerChildrenLocal.init.foreach(_.generateTreeString(
depth + 2, lastChildren, append, verbose,
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent))
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId,
printOutputColumns = printOutputColumns, indent = indent))
lastChildren.remove(lastChildren.size() - 1)
lastChildren.remove(lastChildren.size() - 1)

lastChildren.add(children.isEmpty)
lastChildren.add(true)
innerChildrenLocal.last.generateTreeString(
depth + 2, lastChildren, append, verbose,
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId,
printOutputColumns = printOutputColumns, indent = indent)
lastChildren.remove(lastChildren.size() - 1)
lastChildren.remove(lastChildren.size() - 1)
}
Expand All @@ -1074,14 +1083,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
lastChildren.add(false)
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren, append, verbose, prefix, addSuffix,
maxFields, printNodeId = printNodeId, indent = indent)
maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns,
indent = indent)
)
lastChildren.remove(lastChildren.size() - 1)

lastChildren.add(true)
children.last.generateTreeString(
depth + 1, lastChildren, append, verbose, prefix,
addSuffix, maxFields, printNodeId = printNodeId, indent = indent)
addSuffix, maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns,
indent = indent)
lastChildren.remove(lastChildren.size() - 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
Expand All @@ -70,11 +71,13 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
Option(logical).foreach { _ =>
lastChildren.add(true)
logical.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId,
printOutputColumns, indent)
lastChildren.remove(lastChildren.size() - 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
child.generateTreeString(
depth,
Expand All @@ -557,6 +558,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}

Expand Down Expand Up @@ -818,6 +820,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
child.generateTreeString(
depth,
Expand All @@ -828,6 +831,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ case class AdaptiveSparkPlanExec(
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(
depth,
Expand All @@ -440,6 +441,7 @@ case class AdaptiveSparkPlanExec(
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
if (currentPhysicalPlan.fastEquals(initialPlan)) {
lastChildren.add(true)
Expand All @@ -452,6 +454,7 @@ case class AdaptiveSparkPlanExec(
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent)
lastChildren.remove(lastChildren.size() - 1)
} else {
Expand All @@ -462,27 +465,29 @@ case class AdaptiveSparkPlanExec(
append,
verbose,
maxFields,
printNodeId)
printNodeId,
printOutputColumns)
generateTreeStringWithHeader(
"Initial Plan",
initialPlan,
depth,
append,
verbose,
maxFields,
printNodeId)
printNodeId,
printOutputColumns)
}
}


private def generateTreeStringWithHeader(
header: String,
plan: SparkPlan,
depth: Int,
append: String => Unit,
verbose: Boolean,
maxFields: Int,
printNodeId: Boolean): Unit = {
printNodeId: Boolean,
printOutputColumns: Boolean): Unit = {
append(" " * depth)
append(s"+- == $header ==\n")
plan.generateTreeString(
Expand All @@ -494,6 +499,7 @@ case class AdaptiveSparkPlanExec(
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent = depth + 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ abstract class QueryStageExec extends LeafExecNode {
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
Expand All @@ -143,10 +144,12 @@ abstract class QueryStageExec extends LeafExecNode {
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
lastChildren.add(true)
plan.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId,
printOutputColumns, indent)
lastChildren.remove(lastChildren.size() - 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ abstract class BaseSubqueryExec extends SparkPlan {
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
/**
* In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the
Expand All @@ -807,6 +808,7 @@ abstract class BaseSubqueryExec extends SparkPlan {
false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}
Seq("=== Applying Rule org.apache.spark.sql.execution",
"=== Result of Batch Preparations ===").foreach { expectedMsg =>
"=== Result of Batch Preparations ===",
"Output Information:").foreach { expectedMsg =>
assert(testAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(expectedMsg)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,8 @@ class AdaptiveQueryExecSuite
Seq("=== Result of Batch AQE Preparations ===",
"=== Result of Batch AQE Post Stage Creation ===",
"=== Result of Batch AQE Replanning ===",
"=== Result of Batch AQE Query Stage Optimization ===").foreach { expectedMsg =>
"=== Result of Batch AQE Query Stage Optimization ===",
"Output Information:").foreach { expectedMsg =>
assert(testAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(expectedMsg)))
}
Expand Down