diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3eb4cdef9ea9b..8a87cdcbb0fbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.UnknownRuleId @@ -55,6 +56,32 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] def output: Seq[Attribute] + override def nodeWithOutputColumnsString(maxColumns: Int): String = { + try { + nodeName + { + if (this.output.length > maxColumns) { + val outputWithNullability = this.output.take(maxColumns).map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } + + outputWithNullability.mkString(" ") + } else { + val outputWithNullability = this.output.map { attr => + attr.toString + s"[nullable=${attr.nullable}]" + } + + outputWithNullability.mkString(" ") + } + } + } catch { + case _: UnresolvedException => + // If we encounter an UnresolvedException, it's high likely that the call of `this.output` + // throws it. In this case, we may have to give up and only show the nodeName. + nodeName + " " + } + } + /** * Returns the set of attributes that are output by this node. */ @@ -797,9 +824,10 @@ object QueryPlan extends PredicateHelper { verbose: Boolean, addSuffix: Boolean, maxFields: Int = SQLConf.get.maxToStringFields, - printOperatorId: Boolean = false): Unit = { + printOperatorId: Boolean = false, + printOutputColumns: Boolean = false): Unit = { try { - plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId) + plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns) } catch { case e: AnalysisException => append(e.toString) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index c1fbdb710efe0..cc2d25ecf2dce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -59,10 +59,19 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (!newPlan.fastEquals(oldPlan)) { if (logRules.isEmpty || logRules.get.contains(ruleName)) { def message(): MessageWithContext = { + val oldPlanStringWithOutput = oldPlan.treeString(verbose = false, + printOutputColumns = true) + val newPlanStringWithOutput = newPlan.treeString(verbose = false, + printOutputColumns = true) + // scalastyle:off line.size.limit log""" |=== Applying Rule ${MDC(RULE_NAME, ruleName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} + | + |Output Information: + |${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))} """.stripMargin + // scalastyle:on line.size.limit } logBasedOnLevel(logLevel)(message()) @@ -74,10 +83,19 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { if (logBatches.isEmpty || logBatches.get.contains(batchName)) { def message(): MessageWithContext = { if (!oldPlan.fastEquals(newPlan)) { + val oldPlanStringWithOutput = oldPlan.treeString(verbose = false, + printOutputColumns = true) + val newPlanStringWithOutput = newPlan.treeString(verbose = false, + printOutputColumns = true) + // scalastyle:off line.size.limit log""" |=== Result of Batch ${MDC(BATCH_NAME, batchName)} === |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))} + | + |Output Information: + |${MDC(QUERY_PLAN, sideBySide(oldPlanStringWithOutput, newPlanStringWithOutput).mkString("\n"))} """.stripMargin + // scalastyle:on line.size.limit } else { log"Batch ${MDC(BATCH_NAME, batchName)} has no effect." } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index a2ace2596d6a4..0ada02e6d29ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -947,9 +947,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields, - printOperatorId: Boolean = false): String = { + printOperatorId: Boolean = false, + printOutputColumns: Boolean = false): String = { val concat = new PlanStringConcat() - treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId) + treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns) concat.toString } @@ -958,9 +959,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] verbose: Boolean, addSuffix: Boolean, maxFields: Int, - printOperatorId: Boolean): Unit = { + printOperatorId: Boolean, + printOutputColumns: Boolean): Unit = { generateTreeString(0, new java.util.ArrayList(), append, verbose, "", addSuffix, maxFields, - printOperatorId, 0) + printOperatorId, printOutputColumns, 0) } /** @@ -1011,6 +1013,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] */ def innerChildren: Seq[TreeNode[_]] = Seq.empty + def nodeWithOutputColumnsString(maxColumns: Int): String = simpleString(maxColumns) + /** * Appends the string representation of this node and its children to the given Writer. * @@ -1029,6 +1033,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { (0 until indent).foreach(_ => append(" ")) if (depth > 0) { @@ -1044,6 +1049,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else if (printNodeId) { simpleStringWithNodeId() + } else if (printOutputColumns) { + nodeWithOutputColumnsString(maxFields) } else { simpleString(maxFields) } @@ -1057,7 +1064,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(false) innerChildrenLocal.init.foreach(_.generateTreeString( depth + 2, lastChildren, append, verbose, - addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, + printOutputColumns = printOutputColumns, indent = indent)) lastChildren.remove(lastChildren.size() - 1) lastChildren.remove(lastChildren.size() - 1) @@ -1065,7 +1073,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(true) innerChildrenLocal.last.generateTreeString( depth + 2, lastChildren, append, verbose, - addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent) + addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, + printOutputColumns = printOutputColumns, indent = indent) lastChildren.remove(lastChildren.size() - 1) lastChildren.remove(lastChildren.size() - 1) } @@ -1074,14 +1083,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] lastChildren.add(false) children.init.foreach(_.generateTreeString( depth + 1, lastChildren, append, verbose, prefix, addSuffix, - maxFields, printNodeId = printNodeId, indent = indent) + maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns, + indent = indent) ) lastChildren.remove(lastChildren.size() - 1) lastChildren.add(true) children.last.generateTreeString( depth + 1, lastChildren, append, verbose, prefix, - addSuffix, maxFields, printNodeId = printNodeId, indent = indent) + addSuffix, maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns, + indent = indent) lastChildren.remove(lastChildren.size() - 1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index a0c3d7b51c2c3..70c9e5359e2ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -61,6 +61,7 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString(depth, lastChildren, @@ -70,11 +71,13 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo addSuffix, maxFields, printNodeId, + printOutputColumns, indent) Option(logical).foreach { _ => lastChildren.add(true) logical.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1ee467ef3554b..e7a398cf9659b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -547,6 +547,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, @@ -557,6 +558,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent) } @@ -818,6 +820,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, @@ -828,6 +831,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) false, maxFields, printNodeId, + printOutputColumns, indent) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 996e01a0ea936..8bf8b3c39ca28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -430,6 +430,7 @@ case class AdaptiveSparkPlanExec( addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString( depth, @@ -440,6 +441,7 @@ case class AdaptiveSparkPlanExec( addSuffix, maxFields, printNodeId, + printOutputColumns, indent) if (currentPhysicalPlan.fastEquals(initialPlan)) { lastChildren.add(true) @@ -452,6 +454,7 @@ case class AdaptiveSparkPlanExec( addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } else { @@ -462,7 +465,8 @@ case class AdaptiveSparkPlanExec( append, verbose, maxFields, - printNodeId) + printNodeId, + printOutputColumns) generateTreeStringWithHeader( "Initial Plan", initialPlan, @@ -470,11 +474,11 @@ case class AdaptiveSparkPlanExec( append, verbose, maxFields, - printNodeId) + printNodeId, + printOutputColumns) } } - private def generateTreeStringWithHeader( header: String, plan: SparkPlan, @@ -482,7 +486,8 @@ case class AdaptiveSparkPlanExec( append: String => Unit, verbose: Boolean, maxFields: Int, - printNodeId: Boolean): Unit = { + printNodeId: Boolean, + printOutputColumns: Boolean): Unit = { append(" " * depth) append(s"+- == $header ==\n") plan.generateTreeString( @@ -494,6 +499,7 @@ case class AdaptiveSparkPlanExec( addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent = depth + 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 0a5bdefea7bc5..be58bccd1489a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -134,6 +134,7 @@ abstract class QueryStageExec extends LeafExecNode { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { super.generateTreeString(depth, lastChildren, @@ -143,10 +144,12 @@ abstract class QueryStageExec extends LeafExecNode { addSuffix, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.add(true) plan.generateTreeString( - depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, + printOutputColumns, indent) lastChildren.remove(lastChildren.size() - 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 995f857bbf635..70ade390c7336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -790,6 +790,7 @@ abstract class BaseSubqueryExec extends SparkPlan { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { /** * In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the @@ -807,6 +808,7 @@ abstract class BaseSubqueryExec extends SparkPlan { false, maxFields, printNodeId, + printOutputColumns, indent) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index a649024370869..96cedfd76d74a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -238,7 +238,8 @@ class QueryExecutionSuite extends SharedSparkSession { } } Seq("=== Applying Rule org.apache.spark.sql.execution", - "=== Result of Batch Preparations ===").foreach { expectedMsg => + "=== Result of Batch Preparations ===", + "Output Information:").foreach { expectedMsg => assert(testAppender.loggingEvents.exists( _.getMessage.getFormattedMessage.contains(expectedMsg))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 13de81065cb77..b32ccfe571e97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1740,7 +1740,8 @@ class AdaptiveQueryExecSuite Seq("=== Result of Batch AQE Preparations ===", "=== Result of Batch AQE Post Stage Creation ===", "=== Result of Batch AQE Replanning ===", - "=== Result of Batch AQE Query Stage Optimization ===").foreach { expectedMsg => + "=== Result of Batch AQE Query Stage Optimization ===", + "Output Information:").foreach { expectedMsg => assert(testAppender.loggingEvents.exists( _.getMessage.getFormattedMessage.contains(expectedMsg))) }