Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// implies that, for a given input row, the output are determined by the expression's initial
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
Expand Down Expand Up @@ -792,7 +793,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

case filter @ Filter(condition, aggregate: Aggregate) =>
case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this case above case filter @ Filter(condition, w: Window)?

Based on the comment you add above, it becomes easier to follow by the readers.

// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
Expand Down Expand Up @@ -848,7 +850,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

case filter @ Filter(condition, u: UnaryNode)
case filter @ Filter(_, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter with nondeterministic condition through project") {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was wrong, actually we can push down nondeterministic filter through project, as long as the project list is all deterministic.

test("nondeterministic: can always push down filter through project with deterministic field") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
.select('a)
.where(Rand(10) > 5 || 'a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
val correctAnswer = testRelation
.where(Rand(10) > 5 || 'a > 5)
.select('a)
.analyze

comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter through project with nondeterministic field") {
Expand All @@ -156,6 +161,34 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}

test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
val originalQuery = testRelation
.groupBy('a)('a, Rand(10).as('rand))
.where('a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("nondeterministic: push down part of filter through aggregate with deterministic field") {
val originalQuery = testRelation
.groupBy('a)('a)
.where('a > 5 && Rand(10) > 5)
.analyze

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = testRelation
.where('a > 5)
.groupBy('a)('a)
.where(Rand(10) > 5)
.analyze

comparePlans(optimized, correctAnswer)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select('a)
Expand Down