Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression {

final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_EXPRESSION) ++ nodePatternsInternal

override lazy val deterministic: Boolean = children.forall(_.deterministic) &&
plan.deterministic

// Subclasses can override this function to provide more TreePatterns.
def nodePatternsInternal(): Seq[TreePattern] = Seq()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object InlineCTE extends Rule[LogicalPlan] {
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
// 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first.
refCount == 1 ||
cteDef.child.find(_.expressions.exists(!_.deterministic)).isEmpty ||
cteDef.deterministic ||
cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes
}

/**
* Returns true when the all the expressions in the current node as well as all of its children
* are deterministic
*/
lazy val deterministic: Boolean = expressions.forall(_.deterministic) &&
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: should we mark all non-deterministic plans as so? e.g. Sample?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. why is this in query plan? What about physical plans vs logical plans? should both be marked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this to logical plan only since it doesn't make sense physical plans have different determinism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physical plan can override this lazy val if it has custom logic, right?

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can physical plan have a different determinism to ones in logical plan?

e.g., Sample is non-deterministic. I think physical plans of Sample should always be non-deterministic. Otherwise, the output will be inconsistent for which physical plan is used. The opposite case is the same too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, if we override this lazy val in a logical plan, we should do it in the corresponding physical plan as well.

Moving this to logical plan is also OK, if we don't need it in physical plan at all. cc @maryannxue

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we optimize something, that should always happen in optimizer with logical plans ... right?

If we can do something with physical plans, we will have to add another argument for every non deterministic plan e.g.)

case class Sample(
    lowerBound: Double,
    upperBound: Double,
    withReplacement: Boolean,
    seed: Long,
+   deterministic: Boolean,
    child: LogicalPlan) extends UnaryNode {
case class SampleExec(
    lowerBound: Double,
    upperBound: Double,
    withReplacement: Boolean,
    seed: Long,
+   deterministic: Boolean,
    child: SparkPlan) extends UnaryExecNode with CodegenSupport {

which is pretty much different from how we do in Expression.

Otherwise, we will have to recalculate it for each plan, etc.

children.forall(_.deterministic)

/**
* Attributes that are referenced by expressions but not provided by this node's children.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression, Rand}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
Expand Down Expand Up @@ -101,4 +101,32 @@ class QueryPlanSuite extends SparkFunSuite {
val plan = t.select($"a", $"b").select($"a", $"b").select($"a", $"b").analyze
assert(testRule(plan).resolved)
}

test("SPARK-37199: add a deterministic field to QueryPlan") {
val a: NamedExpression = AttributeReference("a", IntegerType)()
val aRand: NamedExpression = Alias(a + Rand(1), "aRand")()
val deterministicPlan = Project(
Seq(a),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(deterministicPlan.deterministic)

val nonDeterministicPlan = Project(
Seq(aRand),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(!nonDeterministicPlan.deterministic)
}
}
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1945,4 +1945,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
correctAnswer)
}
}

test("SPARK-37199: deterministic in QueryPlan considers subquery") {
val deterministicQueryPlan = sql("select (select 1 as b) as b")
.queryExecution.executedPlan
assert(deterministicQueryPlan.deterministic)

val nonDeterministicQueryPlan = sql("select (select rand(1) as b) as b")
.queryExecution.executedPlan
assert(!nonDeterministicQueryPlan.deterministic)
}

}