From 6b4cba349eb840a8fba65ee385f1943ce2b93fa1 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Wed, 31 Mar 2021 15:16:50 -0700 Subject: [PATCH 1/8] Metadata output should be empty by default Signed-off-by: Karen Feng --- .../plans/logical/EventTimeWatermark.scala | 2 + .../catalyst/plans/logical/LogicalPlan.scala | 7 +- .../plans/logical/basicLogicalOperators.scala | 44 ++++++++++-- .../sql/catalyst/plans/logical/hints.scala | 3 + .../sql/catalyst/plans/logical/object.scala | 6 ++ .../logical/pythonLogicalOperators.scala | 2 + .../sql/connector/DataSourceV2SQLSuite.scala | 67 +++++++++++++++++++ 7 files changed, 125 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index b6bf7cd85d472..a591a9e160a8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -61,4 +61,6 @@ case class EventTimeWatermark( a } } + + override val metadataOutput: Seq[Attribute] = child.metadataOutput } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7129c6984cf3f..3660ea284a45f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -34,8 +34,11 @@ abstract class LogicalPlan with QueryPlanConstraints with Logging { - /** Metadata fields that can be projected from this node */ - def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) + /** + * Metadata fields that can be projected from this node. + * Should be non-empty if the plan propagates its children's output. + */ + def metadataOutput: Seq[Attribute] = Nil /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 24ccc613f1623..101ef058ef480 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler /** - * When planning take() or collect() operations, this special node that is inserted at the top of + * When planning take() or collect() operations, this special node is inserted at the top of * the logical plan before invoking the query planner. * * Rules can pattern-match on this node in order to apply transformations that only take effect @@ -40,6 +40,7 @@ import org.apache.spark.util.random.RandomSampler case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -51,6 +52,7 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { */ case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } object Subquery { @@ -107,10 +109,10 @@ case class Generate( child: LogicalPlan) extends UnaryNode { - lazy val requiredChildOutput: Seq[Attribute] = { - val unrequiredSet = unrequiredChildIndex.toSet + val unrequiredSet: Set[Int] = unrequiredChildIndex.toSet + + lazy val requiredChildOutput: Seq[Attribute] = child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) - } override lazy val resolved: Boolean = { generator.resolved && @@ -134,11 +136,14 @@ case class Generate( } def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput + override def metadataOutput: Seq[Attribute] = + child.metadataOutput.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) } case class Filter(condition: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode with PredicateHelper { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = child.maxRows @@ -187,6 +192,8 @@ case class Intersect( leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) } + override def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) + override protected lazy val validConstraints: ExpressionSet = leftConstraints.union(rightConstraints) @@ -206,6 +213,7 @@ case class Except( override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" ) /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output + override def metadataOutput: Seq[Attribute] = left.metadataOutput override protected lazy val validConstraints: ExpressionSet = leftConstraints } @@ -270,6 +278,8 @@ case class Union( } } + override def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) + override lazy val resolved: Boolean = { // allChildrenCompatible needs to be evaluated after childrenResolved def allChildrenCompatible: Boolean = @@ -364,6 +374,17 @@ case class Join( } } + override def metadataOutput: Seq[Attribute] = { + joinType match { + case j: ExistenceJoin => + left.metadataOutput + case LeftExistence(_) => + left.metadataOutput + case _ => + children.flatMap(_.metadataOutput) + } + } + override protected lazy val validConstraints: ExpressionSet = { joinType match { case _: InnerLike if condition.isDefined => @@ -466,6 +487,8 @@ case class View( override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.output + override def simpleString(maxFields: Int): String = { s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" } @@ -520,6 +543,8 @@ object View { case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput + override def simpleString(maxFields: Int): String = { val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields) s"CTE $cteAliases" @@ -532,6 +557,7 @@ case class WithWindowDefinition( windowDefinitions: Map[String, WindowSpecDefinition], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -545,6 +571,7 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = child.maxRows override def outputOrdering: Seq[SortOrder] = order } @@ -669,6 +696,7 @@ case class Window( override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output ++ windowExpressions.map(_.toAttribute) + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def producedAttributes: AttributeSet = windowOutputSet @@ -861,6 +889,7 @@ object Limit { */ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = { limitExpr match { case IntegerLiteral(limit) => Some(limit) @@ -877,6 +906,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderP */ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRowsPerPartition: Option[Long] = { limitExpr match { @@ -898,6 +928,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr */ case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = { limitExpr match { case IntegerLiteral(limit) => Some(limit) @@ -983,6 +1014,7 @@ case class Sample( override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -991,6 +1023,7 @@ case class Sample( case class Distinct(child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1001,6 +1034,7 @@ abstract class RepartitionOperation extends UnaryNode { def numPartitions: Int override final def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1084,6 +1118,7 @@ case class Deduplicate( child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1112,4 +1147,5 @@ case class CollectMetrics( } override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 4b5e278fccdfb..8130d5ecebc6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -31,6 +31,7 @@ case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan override lazy val resolved: Boolean = false override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -42,6 +43,8 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput + override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index d383532cbd3d3..569ef3bd882c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -238,6 +238,8 @@ case class TypedFilter( override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = child.metadataOutput + def withObjectProducerChild(obj: LogicalPlan): Filter = { assert(obj.output.length == 1) Filter(typedCondition(obj.output.head), obj) @@ -333,6 +335,8 @@ case class AppendColumns( override def output: Seq[Attribute] = child.output ++ newColumns + override def metadataOutput: Seq[Attribute] = child.metadataOutput + def newColumns: Seq[Attribute] = serializer.map(_.toAttribute) } @@ -346,6 +350,8 @@ case class AppendColumnsWithObject( child: LogicalPlan) extends ObjectConsumer { override def output: Seq[Attribute] = (childSerializer ++ newColumnsSerializer).map(_.toAttribute) + + override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** Factory for constructing new `MapGroups` nodes. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index c4f741cd2cec8..4acddb1ff2de3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -74,6 +74,8 @@ trait BaseEvalPython extends UnaryNode { override def output: Seq[Attribute] = child.output ++ resultAttrs + override def metadataOutput: Seq[Attribute] = child.metadataOutput + override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c4abed32bf624..7ac635c060f70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2690,6 +2690,73 @@ class DataSourceV2SQLSuite } } + test("SPARK-34923: do not propagate metadata columns through Project") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + assertThrows[AnalysisException] { + sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)") + } + assertThrows[AnalysisException] { + spark.table(t1).select("id", "data").select("index", "_partition") + } + } + } + + test("SPARK-34923: propagate metadata columns through Filter") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1") + val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + + test("SPARK-34923: propagate metadata columns through Sort") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id") + val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + + test("SPARK-34923: propagate metadata columns through RepartitionBy") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql( + s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1") + val tbl = spark.table(t1) + val dfQuery = tbl.repartitionByRange(3, tbl.col("id")) + .select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From 6c3dde274e36ab5665b72703435b41332ef60f65 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Wed, 31 Mar 2021 15:22:05 -0700 Subject: [PATCH 2/8] Clean up Signed-off-by: Karen Feng --- .../catalyst/plans/logical/basicLogicalOperators.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 101ef058ef480..af6f47dfc81f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -111,8 +111,10 @@ case class Generate( val unrequiredSet: Set[Int] = unrequiredChildIndex.toSet - lazy val requiredChildOutput: Seq[Attribute] = + lazy val requiredChildOutput: Seq[Attribute] = { + val unrequiredSet = unrequiredChildIndex.toSet child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) + } override lazy val resolved: Boolean = { generator.resolved && @@ -136,8 +138,7 @@ case class Generate( } def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput - override def metadataOutput: Seq[Attribute] = - child.metadataOutput.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) + override def metadataOutput: Seq[Attribute] = child.metadataOutput } case class Filter(condition: Expression, child: LogicalPlan) @@ -376,7 +377,7 @@ case class Join( override def metadataOutput: Seq[Attribute] = { joinType match { - case j: ExistenceJoin => + case ExistenceJoin(_) => left.metadataOutput case LeftExistence(_) => left.metadataOutput From ce3ac0e5465b158e70ec317c403203511c96568a Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Wed, 31 Mar 2021 22:48:53 -0700 Subject: [PATCH 3/8] Revert unintended change Signed-off-by: Karen Feng --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index af6f47dfc81f4..400a061c94594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -109,8 +109,6 @@ case class Generate( child: LogicalPlan) extends UnaryNode { - val unrequiredSet: Set[Int] = unrequiredChildIndex.toSet - lazy val requiredChildOutput: Seq[Attribute] = { val unrequiredSet = unrequiredChildIndex.toSet child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) From a4a7d0500d094d2b1ef0a0cb756c5a7ce1f3b9f1 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 5 Apr 2021 11:08:33 -0700 Subject: [PATCH 4/8] By default, propagate metadata output Signed-off-by: Karen Feng --- .../plans/logical/EventTimeWatermark.scala | 2 -- .../catalyst/plans/logical/LogicalPlan.scala | 4 ++-- .../plans/logical/basicLogicalOperators.scala | 19 +------------------ .../sql/catalyst/plans/logical/hints.scala | 3 --- .../sql/catalyst/plans/logical/object.scala | 6 ------ .../logical/pythonLogicalOperators.scala | 2 -- 6 files changed, 3 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index a591a9e160a8d..b6bf7cd85d472 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -61,6 +61,4 @@ case class EventTimeWatermark( a } } - - override val metadataOutput: Seq[Attribute] = child.metadataOutput } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 3660ea284a45f..cef0e06c357f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -36,9 +36,9 @@ abstract class LogicalPlan /** * Metadata fields that can be projected from this node. - * Should be non-empty if the plan propagates its children's output. + * Should be overridden if the plan does not propagate its children's output. */ - def metadataOutput: Seq[Attribute] = Nil + def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f40c9c3d9f780..0f853a7a7a001 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler /** - * When planning take() or collect() operations, this special node is inserted at the top of + * When planning take() or collect() operations, this special node that is inserted at the top of * the logical plan before invoking the query planner. * * Rules can pattern-match on this node in order to apply transformations that only take effect @@ -40,7 +40,6 @@ import org.apache.spark.util.random.RandomSampler case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -52,7 +51,6 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { */ case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } object Subquery { @@ -136,13 +134,11 @@ case class Generate( } def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput - override def metadataOutput: Seq[Attribute] = child.metadataOutput } case class Filter(condition: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode with PredicateHelper { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = child.maxRows @@ -535,8 +531,6 @@ object View { case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput - override def simpleString(maxFields: Int): String = { val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields) s"CTE $cteAliases" @@ -549,7 +543,6 @@ case class WithWindowDefinition( windowDefinitions: Map[String, WindowSpecDefinition], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -563,7 +556,6 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = child.maxRows override def outputOrdering: Seq[SortOrder] = order } @@ -688,7 +680,6 @@ case class Window( override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output ++ windowExpressions.map(_.toAttribute) - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def producedAttributes: AttributeSet = windowOutputSet @@ -881,7 +872,6 @@ object Limit { */ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = { limitExpr match { case IntegerLiteral(limit) => Some(limit) @@ -898,7 +888,6 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderP */ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRowsPerPartition: Option[Long] = { limitExpr match { @@ -920,7 +909,6 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr */ case class Tail(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput override def maxRows: Option[Long] = { limitExpr match { case IntegerLiteral(limit) => Some(limit) @@ -1001,7 +989,6 @@ case class Sample( override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1010,7 +997,6 @@ case class Sample( case class Distinct(child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1021,7 +1007,6 @@ abstract class RepartitionOperation extends UnaryNode { def numPartitions: Int override final def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput def partitioning: Partitioning } @@ -1116,7 +1101,6 @@ case class Deduplicate( child: LogicalPlan) extends UnaryNode { override def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -1145,5 +1129,4 @@ case class CollectMetrics( } override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 8130d5ecebc6c..4b5e278fccdfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -31,7 +31,6 @@ case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan override lazy val resolved: Boolean = false override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** @@ -43,8 +42,6 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput - override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 569ef3bd882c8..d383532cbd3d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -238,8 +238,6 @@ case class TypedFilter( override def output: Seq[Attribute] = child.output - override def metadataOutput: Seq[Attribute] = child.metadataOutput - def withObjectProducerChild(obj: LogicalPlan): Filter = { assert(obj.output.length == 1) Filter(typedCondition(obj.output.head), obj) @@ -335,8 +333,6 @@ case class AppendColumns( override def output: Seq[Attribute] = child.output ++ newColumns - override def metadataOutput: Seq[Attribute] = child.metadataOutput - def newColumns: Seq[Attribute] = serializer.map(_.toAttribute) } @@ -350,8 +346,6 @@ case class AppendColumnsWithObject( child: LogicalPlan) extends ObjectConsumer { override def output: Seq[Attribute] = (childSerializer ++ newColumnsSerializer).map(_.toAttribute) - - override def metadataOutput: Seq[Attribute] = child.metadataOutput } /** Factory for constructing new `MapGroups` nodes. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 4acddb1ff2de3..c4f741cd2cec8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -74,8 +74,6 @@ trait BaseEvalPython extends UnaryNode { override def output: Seq[Attribute] = child.output ++ resultAttrs - override def metadataOutput: Seq[Attribute] = child.metadataOutput - override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) } From 5a04a7e2ad1e26cadac6a088c9286db23bb6a176 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 5 Apr 2021 11:50:20 -0700 Subject: [PATCH 5/8] Use blocklist Signed-off-by: Karen Feng --- .../plans/logical/basicLogicalOperators.scala | 19 ++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 36 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0f853a7a7a001..49b09cc74c7cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -61,6 +61,7 @@ object Subquery { case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override def metadataOutput: Seq[Attribute] = Nil override def maxRows: Option[Long] = child.maxRows override lazy val resolved: Boolean = { @@ -187,6 +188,8 @@ case class Intersect( leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) } + override def metadataOutput: Seq[Attribute] = Nil + override protected lazy val validConstraints: ExpressionSet = leftConstraints.union(rightConstraints) @@ -207,6 +210,8 @@ case class Except( /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output + override def metadataOutput: Seq[Attribute] = Nil + override protected lazy val validConstraints: ExpressionSet = leftConstraints } @@ -270,6 +275,8 @@ case class Union( } } + override def metadataOutput: Seq[Attribute] = Nil + override lazy val resolved: Boolean = { // allChildrenCompatible needs to be evaluated after childrenResolved def allChildrenCompatible: Boolean = @@ -451,6 +458,7 @@ case class InsertIntoDir( extends UnaryNode { override def output: Seq[Attribute] = Seq.empty + override def metadataOutput: Seq[Attribute] = Nil override lazy val resolved: Boolean = false } @@ -477,6 +485,8 @@ case class View( override def output: Seq[Attribute] = child.output + override def metadataOutput: Seq[Attribute] = Nil + override def simpleString(maxFields: Int): String = { s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" } @@ -658,6 +668,7 @@ case class Aggregate( } override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) + override def metadataOutput: Seq[Attribute] = Nil override def maxRows: Option[Long] = { if (groupingExpressions.isEmpty) { Some(1L) @@ -793,6 +804,8 @@ case class Expand( override lazy val references: AttributeSet = AttributeSet(projections.flatten.flatMap(_.references)) + override def metadataOutput: Seq[Attribute] = Nil + override def producedAttributes: AttributeSet = AttributeSet(output diff child.output) // This operator can reuse attributes (for example making them null when doing a roll up) so @@ -829,6 +842,7 @@ case class Pivot( } groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg } + override def metadataOutput: Seq[Attribute] = Nil } /** @@ -935,6 +949,11 @@ case class SubqueryAlias( child.output.map(_.withQualifier(qualifierList)) } + override def metadataOutput: Seq[Attribute] = { + val qualifierList = identifier.qualifier :+ alias + child.metadataOutput.map(_.withQualifier(qualifierList)) + } + override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 7ac635c060f70..e0dcdc794b7c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2706,6 +2706,23 @@ class DataSourceV2SQLSuite } } + test("SPARK-34923: do not propagate metadata columns through View") { + val t1 = s"${catalogAndNamespace}table" + val view = "view" + + withTable(t1) { + withTempView(view) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"CACHE TABLE $view AS SELECT * FROM $t1") + assertThrows[AnalysisException] { + sql(s"SELECT index, _partition FROM $view") + } + } + } + } + test("SPARK-34923: propagate metadata columns through Filter") { val t1 = s"${catalogAndNamespace}table" withTable(t1) { @@ -2757,6 +2774,25 @@ class DataSourceV2SQLSuite } } + test("SPARK-34923: propagate metadata columns through SubqueryAlias") { + val t1 = s"${catalogAndNamespace}table" + val sbq = "sbq" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql( + s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq") + val dfQuery = spark.table(t1).as(sbq).select( + s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From 73219d46d3673de7d91ccafa3bd8e5b68e0ec1ed Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 5 Apr 2021 14:05:23 -0700 Subject: [PATCH 6/8] Retrigger tests Signed-off-by: Karen Feng From b1c0183d288459780d92ff7983e33fc177254d37 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 5 Apr 2021 18:11:16 -0700 Subject: [PATCH 7/8] Retrigger tests Signed-off-by: Karen Feng From e8e6e7d4e89b9f36ae642c7ab1a16d0707e91acc Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Mon, 5 Apr 2021 21:49:42 -0700 Subject: [PATCH 8/8] Retrigger tests Signed-off-by: Karen Feng