Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
// "Extract PythonUDF From JoinCondition".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
Batch("Rewrite Subquery", Once,
Copy link
Member

@gatorsmile gatorsmile Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have a good answer for this PR. Ideally, we should run the whole batch operatorOptimizationBatch. However, running the whole batch could be very time consuming. I would suggest to add a new parameter for introducing the time bound limit for each batch.

cc @maryannxue WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Do you think its a good time to revisit Natt's PR to convert subquery expressions to Joins early in the optimization process ? Perhaps then we can take advantage of all the subsequent rules firing after the subquery rewrite ?

Copy link
Contributor

@maryannxue maryannxue Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I think @dilipbiswal's suggestion is the right way to go. If you think of this subquery rewriting as another kind of de-correlation, it should be a pre-optimization rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. That sounds also good to me. @dilipbiswal Could you take the PR #17520 over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Sure Sean.. Let me give it a try.

RewritePredicateSubquery,
ColumnPruning,
InferFiltersFromConstraints,
PushDownPredicate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, cc @gatorsmile @maryannxue

CollapseProject,
RemoveRedundantProject) :+
Batch("UpdateAttributeReferences", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.ListQuery
import org.apache.spark.sql.catalyst.expressions.{IsNotNull, ListQuery}
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf


class RewriteSubquerySuite extends PlanTest {
Expand All @@ -33,23 +34,44 @@ class RewriteSubquerySuite extends PlanTest {
Batch("Rewrite Subquery", FixedPoint(1),
RewritePredicateSubquery,
ColumnPruning,
InferFiltersFromConstraints,
PushDownPredicate,
CollapseProject,
RemoveRedundantProject) :: Nil
}

test("Column pruning after rewriting predicate subquery") {
val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)
withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify this existing test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, spark.sql.constraintPropagation.enabled=false to test ColumnPruning.
spark.sql.constraintPropagation.enabled=true to test ColumnPruning, InferFiltersFromConstraints and PushDownPredicate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks.

val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)

val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze
val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.select('a)
.join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
.analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized, correctAnswer)
}
}

test("Infer filters and push down predicate after rewriting predicate subquery") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need the column pruning in the test title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making the test title simple, then leaving comments about what's tested clearly here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about refactor these test to:

  val relation = LocalRelation('a.int, 'b.int)
  val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)

  test("Column pruning") {
    withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {
      val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

      val optimized = Optimize.execute(query.analyze)
      val correctAnswer = relation
        .select('a)
        .join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
        .analyze

      comparePlans(optimized, correctAnswer)
    }
  }

  test("Column pruning, infer filters and push down predicate") {
    withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
      val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

      val optimized = Optimize.execute(query.analyze)
      val correctAnswer = relation
        .where(IsNotNull('a)).select('a)
        .join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
        .analyze

      comparePlans(optimized, correctAnswer)
    }
  }

withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "true") {
val relation = LocalRelation('a.int, 'b.int)
val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)

val query = relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = relation
.where(IsNotNull('a)).select('a)
.join(relInSubquery.where(IsNotNull('x)).select('x), LeftSemi, Some('a === 'x))
.analyze

comparePlans(optimized, correctAnswer)
}
}

}