Skip to content

Commit 8c69303

Browse files
committed
Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes.
1 parent fbeab54 commit 8c69303

File tree

3 files changed

+25
-11
lines changed

3 files changed

+25
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,25 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
3131
*/
3232
def outputSet: AttributeSet = AttributeSet(output)
3333

34+
/**
35+
* All Attributes that appear in expressions from this operator. Note that this set does not
36+
* include attributes that are implicitly referenced by being passed through to the output tuple.
37+
*/
38+
def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))
39+
40+
/**
41+
* The set of all attributes that are input to this operator by its children.
42+
*/
43+
def inputSet: AttributeSet =
44+
AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
45+
46+
/**
47+
* Attributes that are referenced by expressions but not provided by this nodes children.
48+
* Subclasses should override this method if they produce attributes internally as it is used by
49+
* assertions designed to prevent the construction of invalid plans.
50+
*/
51+
def missingInput: AttributeSet = references -- inputSet
52+
3453
/**
3554
* Runs [[transform]] with `rule` on all expressions present in this query operator.
3655
* Users should not expect a specific directionality. If a specific directionality is needed,
@@ -132,4 +151,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
132151

133152
/** Prints out the schema in the tree format */
134153
def printSchema(): Unit = println(schemaString)
154+
155+
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
156+
157+
override def simpleString = statePrefix + super.simpleString
135158
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
5353
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
5454
}
5555

56-
/**
57-
* Returns the set of attributes that this node takes as
58-
* input from its children.
59-
*/
60-
lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
61-
6256
/**
6357
* Returns true if this expression and all its children have been resolved to a specific schema
6458
* and false if it still contains any unresolved placeholders. Implementations of LogicalPlan
@@ -68,6 +62,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
6862
*/
6963
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
7064

65+
override protected def statePrefix = if (!resolved) "'" else super.statePrefix
66+
7167
/**
7268
* Returns true if all its children of this query plan have been resolved.
7369
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,6 @@ case class Aggregate(
138138
child: LogicalPlan)
139139
extends UnaryNode {
140140

141-
/** The set of all AttributeReferences required for this aggregation. */
142-
def references =
143-
AttributeSet(
144-
groupingExpressions.flatMap(_.references) ++ aggregateExpressions.flatMap(_.references))
145-
146141
override def output = aggregateExpressions.map(_.toAttribute)
147142
}
148143

0 commit comments

Comments
 (0)