diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2432b8127840b..44cb264714482 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -284,7 +284,8 @@ def explain(self, extended=None, mode=None): == Physical Plan == * Scan ExistingRDD (1) (1) Scan ExistingRDD [codegen id : 1] - Output: [age#0, name#1] + Output [2]: [age#0, name#1] + ... .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index c5abb6378ff7c..12482667efa0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag} -import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -189,9 +188,19 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT val codegenIdStr = getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") - s""" - |($operatorId) $nodeName $codegenIdStr - """.stripMargin + val baseStr = s"($operatorId) $nodeName $codegenIdStr" + val argumentString = argString(SQLConf.get.maxToStringFields) + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |Arguments: $argumentString + """.stripMargin + } else { + s""" + |$baseStr + """.stripMargin + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0d759085a7e2c..c1f7f0a195491 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -76,7 +76,7 @@ trait DataSourceScanExec extends LeafExecNode { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} |${metadataStr.mkString("\n")} """.stripMargin } @@ -377,7 +377,7 @@ case class FileSourceScanExec( s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} |${metadataStr.mkString("\n")} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index d4fe272f8c95f..5d4309357895b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.trees.TreeNodeTag object ExplainUtils { /** @@ -171,7 +170,7 @@ object ExplainUtils { var currentCodegenId = -1 plan.foreach { case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId - case p: InputAdapter => currentCodegenId = -1 + case _: InputAdapter => currentCodegenId = -1 case other: QueryPlan[_] => if (currentCodegenId != -1) { other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) @@ -182,6 +181,17 @@ object ExplainUtils { } } + /** + * Generate detailed field string with different format based on type of input value + */ + def generateFieldString(fieldName: String, values: Any): String = values match { + case iter: Iterable[_] if (iter.size == 0) => s"${fieldName}: []" + case iter: Iterable[_] => s"${fieldName} [${iter.size}]: ${iter.mkString("[", ", ", "]")}" + case str: String if (str == null || str.isEmpty) => s"${fieldName}: None" + case str: String => s"${fieldName}: ${str}" + case _ => throw new IllegalArgumentException(s"Unsupported type for argument values: $values") + } + /** * Given a input plan, returns an array of tuples comprising of : * 1. Hosting opeator id. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 3301e9b5ab180..f5bb554682eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch object SparkPlan { @@ -512,10 +513,22 @@ trait LeafExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet override def verboseStringWithOperatorId(): String = { - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} - """.stripMargin + val argumentString = argString(SQLConf.get.maxToStringFields) + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val outputStr = s"${ExplainUtils.generateFieldString("Output", producedAttributes)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$outputStr + |Arguments: $argumentString + """.stripMargin + } else { + s""" + |$baseStr + |$outputStr + """.stripMargin + } } } @@ -531,10 +544,22 @@ trait UnaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = child :: Nil override def verboseStringWithOperatorId(): String = { - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input: ${child.output.mkString("[", ", ", "]")} - """.stripMargin + val argumentString = argString(SQLConf.get.maxToStringFields) + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val inputStr = s"${ExplainUtils.generateFieldString("Input", child.output)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$inputStr + |Arguments: $argumentString + """.stripMargin + } else { + s""" + |$baseStr + |$inputStr + """.stripMargin + } } } @@ -544,10 +569,24 @@ trait BinaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Seq(left, right) override def verboseStringWithOperatorId(): String = { - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left output: ${left.output.mkString("[", ", ", "]")} - |Right output: ${right.output.mkString("[", ", ", "]")} - """.stripMargin + val argumentString = argString(SQLConf.get.maxToStringFields) + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val leftOutputStr = s"${ExplainUtils.generateFieldString("Left output", left.output)}" + val rightOutputStr = s"${ExplainUtils.generateFieldString("Right output", right.output)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$leftOutputStr + |$rightOutputStr + |Arguments: $argumentString + """.stripMargin + } else { + s""" + |$baseStr + |$leftOutputStr + |$rightOutputStr + """.stripMargin + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index 0eaa0f53fdacd..19d7263feb2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -31,18 +31,13 @@ trait BaseAggregateExec extends UnaryExecNode { def resultExpressions: Seq[NamedExpression] override def verboseStringWithOperatorId(): String = { - val inputString = child.output.mkString("[", ", ", "]") - val keyString = groupingExpressions.mkString("[", ", ", "]") - val functionString = aggregateExpressions.mkString("[", ", ", "]") - val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]") - val resultString = resultExpressions.mkString("[", ", ", "]") s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input: $inputString - |Keys: $keyString - |Functions: $functionString - |Aggregate Attributes: $aggregateAttributeString - |Results: $resultString + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} + |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} + |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} + |${ExplainUtils.generateFieldString("Results", resultExpressions)} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c35c48496e1c9..e7d2b627d0815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -86,8 +86,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def verboseStringWithOperatorId(): String = { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output : ${projectList.mkString("[", ", ", "]")} - |Input : ${child.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", projectList)} + |${ExplainUtils.generateFieldString("Input", child.output)} """.stripMargin } } @@ -243,7 +243,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def verboseStringWithOperatorId(): String = { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input : ${child.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Input", child.output)} |Condition : ${condition} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 849ff384c130a..dda9a637194fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -92,7 +92,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan val reuse_op_str = ExplainUtils.getOpId(child) s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${cdgen} [Reuses operator id: $reuse_op_str] - |Output : ${output} + |${ExplainUtils.generateFieldString("Output", output)} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 29645a736548c..7e2f487fdcc5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -73,7 +73,7 @@ case class CartesianProductExec( s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Join condition: ${joinCondStr} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 137f0b87a2f3d..99cf60273bf08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -49,9 +49,9 @@ trait HashJoin { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left keys: ${leftKeys} - |Right keys: ${rightKeys} - |Join condition: ${joinCondStr} + |${ExplainUtils.generateFieldString("Left keys", leftKeys)} + |${ExplainUtils.generateFieldString("Right keys", rightKeys)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 62eea611556ff..7a08dd1afd3a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -63,9 +63,9 @@ case class SortMergeJoinExec( } else "None" s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left keys : ${leftKeys} - |Right keys: ${rightKeys} - |Join condition : ${joinCondStr} + |${ExplainUtils.generateFieldString("Left keys", leftKeys)} + |${ExplainUtils.generateFieldString("Right keys", rightKeys)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index bc28d7f87bf00..40b26e6fb64ce 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -65,45 +65,48 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] - +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (7) HashAggregate [codegen id : 2] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) Exchange -Input: [key#x, max(val)#x] - +Input [2]: [key#x, max(val)#x] +Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] + (9) Sort [codegen id : 3] -Input: [key#x, max(val)#x] +Input [2]: [key#x, max(val)#x] +Arguments: [key#x ASC NULLS FIRST], true, 0 -- !query @@ -129,47 +132,48 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] - +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (7) HashAggregate [codegen id : 2] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] (8) Filter [codegen id : 2] -Input : [key#x, max(val)#x, max(val#x)#x] +Input [3]: [key#x, max(val)#x, max(val#x)#x] Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) (9) Project [codegen id : 2] -Output : [key#x, max(val)#x] -Input : [key#x, max(val)#x, max(val#x)#x] +Output [2]: [key#x, max(val)#x] +Input [3]: [key#x, max(val)#x, max(val#x)#x] -- !query @@ -196,59 +200,60 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (6) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (7) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (8) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) Union - + (10) HashAggregate [codegen id : 3] -Input: [key#x, val#x] -Keys: [key#x, val#x] +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] -Results: [key#x, val#x] +Results [2]: [key#x, val#x] (11) Exchange -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] + (12) HashAggregate [codegen id : 4] -Input: [key#x, val#x] -Keys: [key#x, val#x] +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] -Results: [key#x, val#x] +Results [2]: [key#x, val#x] -- !query @@ -274,47 +279,48 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key)] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (4) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (8) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) BroadcastExchange -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + (10) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -339,38 +345,39 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (5) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (6) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (7) BroadcastExchange -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + (8) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -396,22 +403,22 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] ===== Subqueries ===== @@ -426,39 +433,40 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (5) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) (8) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (9) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (10) Exchange -Input: [max#x] - +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] + (11) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (18) @@ -471,39 +479,40 @@ Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery (12) Scan parquet default.explain_temp3 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (13) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (14) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (15) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (16) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (17) Exchange -Input: [max#x] - +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] + (18) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] -- !query @@ -527,16 +536,16 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) ===== Subqueries ===== @@ -552,39 +561,40 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (4) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (5) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (6) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (7) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (8) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (9) Exchange -Input: [max#x] - +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] + (10) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (17) @@ -597,39 +607,40 @@ Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (11) Scan parquet default.explain_temp3 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (12) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (13) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (14) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (15) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes: [sum#x, count#xL] -Results: [sum#x, count#xL] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] (16) Exchange -Input: [sum#x, count#xL] - +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, true, [id=#x] + (17) HashAggregate [codegen id : 2] -Input: [sum#x, count#xL] +Input [2]: [sum#x, count#xL] Keys: [] -Functions: [avg(cast(key#x as bigint))] -Aggregate Attributes: [avg(cast(key#x as bigint))#x] -Results: [avg(cast(key#x as bigint))#x AS avg(key)#x] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] -- !query @@ -653,10 +664,10 @@ ReadSchema: struct<> (2) ColumnarToRow [codegen id : 1] Input: [] - + (3) Project [codegen id : 1] -Output : [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] -Input : [] +Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] +Input: [] ===== Subqueries ===== @@ -669,30 +680,31 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (4) Scan parquet default.explain_temp1 -Output: [key#x] +Output [1]: [key#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (5) ColumnarToRow [codegen id : 1] -Input: [key#x] - +Input [1]: [key#x] + (6) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes: [sum#x, count#xL] -Results: [sum#x, count#xL] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] (7) Exchange -Input: [sum#x, count#xL] - +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, true, [id=#x] + (8) HashAggregate [codegen id : 2] -Input: [sum#x, count#xL] +Input [2]: [sum#x, count#xL] Keys: [] -Functions: [avg(cast(key#x as bigint))] -Aggregate Attributes: [avg(cast(key#x as bigint))#x] -Results: [avg(cast(key#x as bigint))#x AS avg(key)#x] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] @@ -722,47 +734,48 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (4) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (8) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) BroadcastExchange -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + (10) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -793,56 +806,58 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] - +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (7) HashAggregate [codegen id : 4] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) ReusedExchange [Reuses operator id: 6] -Output : ArrayBuffer(key#x, max#x) +Output [2]: [key#x, max#x] (9) HashAggregate [codegen id : 3] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (10) BroadcastExchange -Input: [key#x, max(val)#x] - +Input [2]: [key#x, max(val)#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + (11) BroadcastHashJoin [codegen id : 4] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -862,12 +877,15 @@ Execute CreateViewCommand (1) (1) Execute CreateViewCommand Output: [] - + (2) CreateViewCommand - +Arguments: `explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView + (3) UnresolvedRelation - -(4) Project +Arguments: [explain_temp1] + +(4) Project +Arguments: ['key, 'val] -- !query @@ -888,30 +906,31 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) HashAggregate -Input: [key#x, val#x] +Input [2]: [key#x, val#x] Keys: [] -Functions: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] -Aggregate Attributes: [count#xL, sum#xL, count#xL] -Results: [count#xL, sum#xL, count#xL] +Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] +Aggregate Attributes [3]: [count#xL, sum#xL, count#xL] +Results [3]: [count#xL, sum#xL, count#xL] (4) Exchange -Input: [count#xL, sum#xL, count#xL] - +Input [3]: [count#xL, sum#xL, count#xL] +Arguments: SinglePartition, true, [id=#x] + (5) HashAggregate [codegen id : 2] -Input: [count#xL, sum#xL, count#xL] +Input [3]: [count#xL, sum#xL, count#xL] Keys: [] -Functions: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] -Aggregate Attributes: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] -Results: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] +Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] +Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] +Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] -- !query @@ -931,30 +950,31 @@ ObjectHashAggregate (5) (1) Scan parquet default.explain_temp4 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) ObjectHashAggregate -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_collect_set(val#x, 0, 0)] -Aggregate Attributes: [buf#x] -Results: [key#x, buf#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [buf#x] +Results [2]: [key#x, buf#x] (4) Exchange -Input: [key#x, buf#x] - +Input [2]: [key#x, buf#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (5) ObjectHashAggregate -Input: [key#x, buf#x] -Keys: [key#x] -Functions: [collect_set(val#x, 0, 0)] -Aggregate Attributes: [collect_set(val#x, 0, 0)#x] -Results: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] +Input [2]: [key#x, buf#x] +Keys [1]: [key#x] +Functions [1]: [collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x] +Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] -- !query @@ -976,36 +996,39 @@ SortAggregate (7) (1) Scan parquet default.explain_temp4 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Sort [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + (4) SortAggregate -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_min(val#x)] -Aggregate Attributes: [min#x] -Results: [key#x, min#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_min(val#x)] +Aggregate Attributes [1]: [min#x] +Results [2]: [key#x, min#x] (5) Exchange -Input: [key#x, min#x] - +Input [2]: [key#x, min#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (6) Sort [codegen id : 2] -Input: [key#x, min#x] - +Input [2]: [key#x, min#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + (7) SortAggregate -Input: [key#x, min#x] -Keys: [key#x] -Functions: [min(val#x)] -Aggregate Attributes: [min(val#x)#x] -Results: [key#x, min(val#x)#x AS min(val)#x] +Input [2]: [key#x, min#x] +Keys [1]: [key#x] +Functions [1]: [min(val#x)] +Aggregate Attributes [1]: [min(val#x)#x] +Results [2]: [key#x, min(val#x)#x AS min(val)#x] -- !query