Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
@transient
lazy val outputSet: AttributeSet = AttributeSet(output)

/**
* All Attributes that appear in expressions from this operator. Note that this set does not
* include attributes that are implicitly referenced by being passed through to the output tuple.
*/
@transient
lazy val references: AttributeSet =
AttributeSet.fromAttributeSets(expressions.map(_.references))

/**
* The set of all attributes that are input to this operator by its children.
*/
Expand All @@ -59,12 +51,19 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
def producedAttributes: AttributeSet = AttributeSet.empty

/**
* All Attributes that appear in expressions from this operator. Note that this set does not
* include attributes that are implicitly referenced by being passed through to the output tuple.
*/
@transient
lazy val references: AttributeSet = {
AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes
}

/**
* Attributes that are referenced by expressions but not provided by this node's children.
* Subclasses should override this method if they produce attributes internally as it is used by
* assertions designed to prevent the construction of invalid plans.
*/
def missingInput: AttributeSet = references -- inputSet -- producedAttributes
final def missingInput: AttributeSet = references -- inputSet

/**
* Runs [[transformExpressionsDown]] with `rule` on all expressions present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,7 @@ case class View(
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {

@transient
override lazy val references: AttributeSet = AttributeSet.empty
override def producedAttributes: AttributeSet = outputSet

override lazy val resolved: Boolean = child.resolved

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ trait BaseEvalPython extends UnaryNode {

override def output: Seq[Attribute] = child.output ++ resultAttrs

@transient
override lazy val references: AttributeSet = AttributeSet(udfs.flatMap(_.references))
override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class GenerateExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override def producedAttributes: AttributeSet = AttributeSet(output)
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ case class InMemoryRelation(
cacheBuilder,
outputOrdering)

override def producedAttributes: AttributeSet = outputSet

@transient val partitionStatistics = new PartitionStatistics(output)

def cachedPlan: SparkPlan = cacheBuilder.cachedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,6 @@ case class FlatMapGroupsInRExec(

override def outputPartitioning: Partitioning = child.outputPartitioning

override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't outputObjAttr produced by FlatMapGroupsInRExec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op override, it's the same as ObjectProducerExec.producedAttributes


override def requiredChildDistribution: Seq[Distribution] =
if (groupingAttributes.isEmpty) {
AllTuples :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute],

override def output: Seq[Attribute] = child.output ++ resultAttrs

@transient
override lazy val references: AttributeSet = AttributeSet(udfs.flatMap(_.references))
override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)

private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = {
udf.children match {
Expand Down