From 365598c23ddef430d7ae9f3e32bb9717703d30a6 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 9 May 2025 13:06:23 +0900 Subject: [PATCH 1/6] [WIP] Change the plan change log to also produce output columns with nullability --- .../spark/sql/catalyst/plans/QueryPlan.scala | 19 +++++++++++++++++ .../sql/catalyst/rules/RuleExecutor.scala | 6 ++++++ .../spark/sql/catalyst/trees/TreeNode.scala | 21 +++++++++++++++---- .../sql/execution/EmptyRelationExec.scala | 5 ++++- .../sql/execution/WholeStageCodegenExec.scala | 4 ++++ .../adaptive/AdaptiveSparkPlanExec.scala | 14 +++++++++---- .../execution/adaptive/QueryStageExec.scala | 5 ++++- .../execution/basicPhysicalOperators.scala | 2 ++ 8 files changed, 66 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3eb4cdef9ea9b..0e8c49888626b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -55,6 +55,25 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] def output: Seq[Attribute] + override def nodeWithOutputColumnsString(maxColumns: Int): String = { + nodeName + { + if (this.output.length > maxColumns) { + val outputWithNullability = this.output.take(maxColumns).map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } + + outputWithNullability.mkString(" ") + } else { + val outputWithNullability = this.output.map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } + + outputWithNullability.mkString(" ") + } + } + } + /** * Returns the set of attributes that are output by this node. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index c1fbdb710efe0..273199bed32e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -62,6 +62,9 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { log""" |=== Applying Rule ${MDC(RULE_NAME, ruleName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} + | + |Output Information: + |${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)} """.stripMargin } @@ -77,6 +80,9 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { log""" |=== Result of Batch ${MDC(BATCH_NAME, batchName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} + | + |Output Information: + |${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)} """.stripMargin } else { log"Batch ${MDC(BATCH_NAME, batchName)} has no effect." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index a2ace2596d6a4..93fac9cf2fa19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -943,13 +943,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] /** Returns a string representation of the nodes in this tree */ final def treeString: String = treeString(verbose = true) + final def treeStringWithOutputColumns: String = { + treeString(verbose = false, printOutputColumns = true) + } + final def treeString( verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields, - printOperatorId: Boolean = false): String = { + printOperatorId: Boolean = false, + printOutputColumns: Boolean = false): String = { val concat = new PlanStringConcat() - treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId) + treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns) concat.toString } @@ -958,9 +963,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] verbose: Boolean, addSuffix: Boolean, maxFields: Int, - printOperatorId: Boolean): Unit = { + printOperatorId: Boolean, + printOutputColumns: Boolean): Unit = { generateTreeString(0, new java.util.ArrayList(), append, verbose, "", addSuffix, maxFields, - printOperatorId, 0) + printOperatorId, printOutputColumns, 0) } /** @@ -1011,6 +1017,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] */ def innerChildren: Seq[TreeNode[_]] = Seq.empty + def nodeWithOutputColumnsString(maxColumns: Int): String = { + throw new UnsupportedOperationException("TreeNode does not have output columns") + } + /** * Appends the string representation of this node and its children to the given Writer. * @@ -1029,6 +1039,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { (0 until indent).foreach(_ => append(" ")) if (depth > 0) { @@ -1044,6 +1055,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else if (printNodeId) { simpleStringWithNodeId() + } else if (printOutputColumns) { + nodeWithOutputColumnsString(maxFields) } else { simpleString(maxFields) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index a0c3d7b51c2c3..70c9e5359e2ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -61,6 +61,7 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString(depth, lastChildren, @@ -70,11 +71,13 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo addSuffix, maxFields, printNodeId, + printOutputColumns, indent) Option(logical).foreach { _ => lastChildren.add(true) logical.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1ee467ef3554b..e7a398cf9659b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -547,6 +547,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, @@ -557,6 +558,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent) } @@ -818,6 +820,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, @@ -828,6 +831,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) false, maxFields, printNodeId, + printOutputColumns, indent) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 996e01a0ea936..8bf8b3c39ca28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -430,6 +430,7 @@ case class AdaptiveSparkPlanExec( addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString( depth, @@ -440,6 +441,7 @@ case class AdaptiveSparkPlanExec( addSuffix, maxFields, printNodeId, + printOutputColumns, indent) if (currentPhysicalPlan.fastEquals(initialPlan)) { lastChildren.add(true) @@ -452,6 +454,7 @@ case class AdaptiveSparkPlanExec( addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } else { @@ -462,7 +465,8 @@ case class AdaptiveSparkPlanExec( append, verbose, maxFields, - printNodeId) + printNodeId, + printOutputColumns) generateTreeStringWithHeader( "Initial Plan", initialPlan, @@ -470,11 +474,11 @@ case class AdaptiveSparkPlanExec( append, verbose, maxFields, - printNodeId) + printNodeId, + printOutputColumns) } } - private def generateTreeStringWithHeader( header: String, plan: SparkPlan, @@ -482,7 +486,8 @@ case class AdaptiveSparkPlanExec( append: String => Unit, verbose: Boolean, maxFields: Int, - printNodeId: Boolean): Unit = { + printNodeId: Boolean, + printOutputColumns: Boolean): Unit = { append(" " * depth) append(s"+- == $header ==\n") plan.generateTreeString( @@ -494,6 +499,7 @@ case class AdaptiveSparkPlanExec( addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent = depth + 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 0a5bdefea7bc5..2c14a27c2b7a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -134,6 +134,7 @@ abstract class QueryStageExec extends LeafExecNode { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString(depth, lastChildren, @@ -143,10 +144,12 @@ abstract class QueryStageExec extends LeafExecNode { addSuffix, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.add(true) plan.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, printOutputColumns, + indent) lastChildren.remove(lastChildren.size() - 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 995f857bbf635..70ade390c7336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -790,6 +790,7 @@ abstract class BaseSubqueryExec extends SparkPlan { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { /** * In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the @@ -807,6 +808,7 @@ abstract class BaseSubqueryExec extends SparkPlan { false, maxFields, printNodeId, + printOutputColumns, indent) } } From e243f9d5f28fc3bd26c35039afb7bea8612d5d85 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 9 May 2025 17:26:14 +0900 Subject: [PATCH 2/6] Compilation is fixed --- .../apache/spark/sql/catalyst/plans/QueryPlan.scala | 5 +++-- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 12 ++++++++---- .../sql/execution/adaptive/QueryStageExec.scala | 4 ++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0e8c49888626b..e145c4468e6b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -816,9 +816,10 @@ object QueryPlan extends PredicateHelper { verbose: Boolean, addSuffix: Boolean, maxFields: Int = SQLConf.get.maxToStringFields, - printOperatorId: Boolean = false): Unit = { + printOperatorId: Boolean = false, + printOutputColumns: Boolean = false): Unit = { try { - plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId) + plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns) } catch { case e: AnalysisException => append(e.toString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 93fac9cf2fa19..e1c6f112c462f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -1070,7 +1070,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(false) innerChildrenLocal.init.foreach(_.generateTreeString( depth + 2, lastChildren, append, verbose, - addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, + printOutputColumns = printOutputColumns, indent = indent)) lastChildren.remove(lastChildren.size() - 1) lastChildren.remove(lastChildren.size() - 1) @@ -1078,7 +1079,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(true) innerChildrenLocal.last.generateTreeString( depth + 2, lastChildren, append, verbose, - addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, + printOutputColumns = printOutputColumns, indent = indent) lastChildren.remove(lastChildren.size() - 1) lastChildren.remove(lastChildren.size() - 1) } @@ -1087,14 +1089,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(false) children.init.foreach(_.generateTreeString( depth + 1, lastChildren, append, verbose, prefix, addSuffix, - maxFields, printNodeId = printNodeId, indent = indent) + maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns, + indent = indent) ) lastChildren.remove(lastChildren.size() - 1) lastChildren.add(true) children.last.generateTreeString( depth + 1, lastChildren, append, verbose, prefix, - addSuffix, maxFields, printNodeId = printNodeId, indent = indent) + addSuffix, maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns, + indent = indent) lastChildren.remove(lastChildren.size() - 1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 2c14a27c2b7a5..be58bccd1489a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -148,8 +148,8 @@ abstract class QueryStageExec extends LeafExecNode { indent) lastChildren.add(true) plan.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, printOutputColumns, - indent) + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } From 7a61566fb6fd9b0c6dec85eb0b791bd46e57ac54 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 9 May 2025 21:44:41 +0900 Subject: [PATCH 3/6] fix AQE test failure --- .../spark/sql/catalyst/plans/QueryPlan.scala | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index e145c4468e6b0..8a87cdcbb0fbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.UnknownRuleId @@ -56,21 +57,28 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] def output: Seq[Attribute] override def nodeWithOutputColumnsString(maxColumns: Int): String = { - nodeName + { - if (this.output.length > maxColumns) { - val outputWithNullability = this.output.take(maxColumns).map { attr => - attr.toString + s"[nullable=${attr.nullable}]" - } + try { + nodeName + { + if (this.output.length > maxColumns) { + val outputWithNullability = this.output.take(maxColumns).map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } - outputWithNullability.mkString(" ") - } else { - val outputWithNullability = this.output.map { attr => - attr.toString + s"[nullable=${attr.nullable}]" - } + outputWithNullability.mkString(" ") + } else { + val outputWithNullability = this.output.map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } - outputWithNullability.mkString(" ") + outputWithNullability.mkString(" ") + } } + } catch { + case _: UnresolvedException => + // If we encounter an UnresolvedException, it's high likely that the call of `this.output` + // throws it. In this case, we may have to give up and only show the nodeName. + nodeName + " " } } From 94c62c49c74f3a49593d1bfc57b79cdb39744fc9 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Sat, 10 May 2025 18:48:24 +0900 Subject: [PATCH 4/6] modified test to cover the change --- .../org/apache/spark/sql/execution/QueryExecutionSuite.scala | 3 ++- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index a649024370869..96cedfd76d74a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -238,7 +238,8 @@ class QueryExecutionSuite extends SharedSparkSession { } } Seq("=== Applying Rule org.apache.spark.sql.execution", - "=== Result of Batch Preparations ===").foreach { expectedMsg => + "=== Result of Batch Preparations ===", + "Output Information:").foreach { expectedMsg => assert(testAppender.loggingEvents.exists( _.getMessage.getFormattedMessage.contains(expectedMsg))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 13de81065cb77..b32ccfe571e97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1740,7 +1740,8 @@ class AdaptiveQueryExecSuite Seq("=== Result of Batch AQE Preparations ===", "=== Result of Batch AQE Post Stage Creation ===", "=== Result of Batch AQE Replanning ===", - "=== Result of Batch AQE Query Stage Optimization ===").foreach { expectedMsg => + "=== Result of Batch AQE Query Stage Optimization ===", + "Output Information:").foreach { expectedMsg => assert(testAppender.loggingEvents.exists( _.getMessage.getFormattedMessage.contains(expectedMsg))) } From fee4a2bac346204b1c2ec1a0d8a687f90d47f2fd Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 21 May 2025 22:47:21 +0900 Subject: [PATCH 5/6] first pass reviews --- .../apache/spark/sql/catalyst/rules/RuleExecutor.scala | 8 ++++++-- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 8 +------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 273199bed32e8..8e7a8d6cd36c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -59,13 +59,15 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (!newPlan.fastEquals(oldPlan)) { if (logRules.isEmpty || logRules.get.contains(ruleName)) { def message(): MessageWithContext = { + // scalastyle:off line.size.limit log""" |=== Applying Rule ${MDC(RULE_NAME, ruleName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} | |Output Information: - |${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)} + |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString(verbose = false, printOutputColumns = true), newPlan.treeString(verbose = false, printOutputColumns = true)))} """.stripMargin + // scalastyle:on line.size.limit } logBasedOnLevel(logLevel)(message()) @@ -77,13 +79,15 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (logBatches.isEmpty || logBatches.get.contains(batchName)) { def message(): MessageWithContext = { if (!oldPlan.fastEquals(newPlan)) { + // scalastyle:off line.size.limit log""" |=== Result of Batch ${MDC(BATCH_NAME, batchName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} | |Output Information: - |${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)} + |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString(verbose = false, printOutputColumns = true), newPlan.treeString(verbose = false, printOutputColumns = true)))} """.stripMargin + // scalastyle:on line.size.limit } else { log"Batch ${MDC(BATCH_NAME, batchName)} has no effect." } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e1c6f112c462f..0ada02e6d29ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -943,10 +943,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] /** Returns a string representation of the nodes in this tree */ final def treeString: String = treeString(verbose = true) - final def treeStringWithOutputColumns: String = { - treeString(verbose = false, printOutputColumns = true) - } - final def treeString( verbose: Boolean, addSuffix: Boolean = false, @@ -1017,9 +1013,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] */ def innerChildren: Seq[TreeNode[_]] = Seq.empty - def nodeWithOutputColumnsString(maxColumns: Int): String = { - throw new UnsupportedOperationException("TreeNode does not have output columns") - } + def nodeWithOutputColumnsString(maxColumns: Int): String = simpleString(maxColumns) /** * Appends the string representation of this node and its children to the given Writer. From 63dbb95b37eb18453af242aa37fd9a5a3802770f Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 23 May 2025 22:46:28 +0900 Subject: [PATCH 6/6] feedback --- .../spark/sql/catalyst/rules/RuleExecutor.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 8e7a8d6cd36c7..cc2d25ecf2dce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -59,13 +59,17 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (!newPlan.fastEquals(oldPlan)) { if (logRules.isEmpty || logRules.get.contains(ruleName)) { def message(): MessageWithContext = { + val oldPlanStringWithOutput = oldPlan.treeString(verbose = false, + printOutputColumns = true) + val newPlanStringWithOutput = newPlan.treeString(verbose = false, + printOutputColumns = true) // scalastyle:off line.size.limit log""" |=== Applying Rule ${MDC(RULE_NAME, ruleName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} | |Output Information: - |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString(verbose = false, printOutputColumns = true), newPlan.treeString(verbose = false, printOutputColumns = true)))} + |${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))} """.stripMargin // scalastyle:on line.size.limit } @@ -79,13 +83,17 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (logBatches.isEmpty || logBatches.get.contains(batchName)) { def message(): MessageWithContext = { if (!oldPlan.fastEquals(newPlan)) { + val oldPlanStringWithOutput = oldPlan.treeString(verbose = false, + printOutputColumns = true) + val newPlanStringWithOutput = newPlan.treeString(verbose = false, + printOutputColumns = true) // scalastyle:off line.size.limit log""" |=== Result of Batch ${MDC(BATCH_NAME, batchName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} | |Output Information: - |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString(verbose = false, printOutputColumns = true), newPlan.treeString(verbose = false, printOutputColumns = true)))} + |${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))} """.stripMargin // scalastyle:on line.size.limit } else {