diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 11610cf5f0c1..4bb15d6b01c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -40,14 +40,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT @transient lazy val outputSet: AttributeSet = AttributeSet(output) - /** - * All Attributes that appear in expressions from this operator. Note that this set does not - * include attributes that are implicitly referenced by being passed through to the output tuple. - */ - @transient - lazy val references: AttributeSet = - AttributeSet.fromAttributeSets(expressions.map(_.references)) - /** * The set of all attributes that are input to this operator by its children. */ @@ -59,12 +51,19 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ def producedAttributes: AttributeSet = AttributeSet.empty + /** + * All Attributes that appear in expressions from this operator. Note that this set does not + * include attributes that are implicitly referenced by being passed through to the output tuple. + */ + @transient + lazy val references: AttributeSet = { + AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes + } + /** * Attributes that are referenced by expressions but not provided by this node's children. - * Subclasses should override this method if they produce attributes internally as it is used by - * assertions designed to prevent the construction of invalid plans. */ - def missingInput: AttributeSet = references -- inputSet -- producedAttributes + final def missingInput: AttributeSet = references -- inputSet /** * Runs [[transformExpressionsDown]] with `rule` on all expressions present diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 273a4389ba40..6fd446017b14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -586,8 +586,7 @@ case class View( output: Seq[Attribute], child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation { - @transient - override lazy val references: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet override lazy val resolved: Boolean = child.resolved diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 757e46a68672..ae93c760f056 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -59,8 +59,7 @@ trait BaseEvalPython extends UnaryNode { override def output: Seq[Attribute] = child.output ++ resultAttrs - @transient - override lazy val references: AttributeSet = AttributeSet(udfs.flatMap(_.references)) + override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 2549b9e1537a..4c9efdbf2ba6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -68,7 +68,7 @@ case class GenerateExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - override def producedAttributes: AttributeSet = AttributeSet(output) + override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) override def outputPartitioning: Partitioning = child.outputPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 16a5094fa8d4..1de2b6e0a85d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -186,8 +186,6 @@ case class InMemoryRelation( cacheBuilder, outputOrdering) - override def producedAttributes: AttributeSet = outputSet - @transient val partitionStatistics = new PartitionStatistics(output) def cachedPlan: SparkPlan = cacheBuilder.cachedPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index a3be473de7d5..9a76e144b885 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -451,8 +451,6 @@ case class FlatMapGroupsInRExec( override def outputPartitioning: Partitioning = child.outputPartitioning - override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) - override def requiredChildDistribution: Seq[Distribution] = if (groupingAttributes.isEmpty) { AllTuples :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index daad7728ed20..3554bdb5c9e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -62,8 +62,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], override def output: Seq[Attribute] = child.output ++ resultAttrs - @transient - override lazy val references: AttributeSet = AttributeSet(udfs.flatMap(_.references)) + override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { udf.children match {