Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
Batch("Operator Optimization before Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::
Batch("Infer Filters", Once,
InferFiltersFromGenerate,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I don't get the question.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Looks okay as it is. Since operatorOptimizationRuleSet has InferFiltersFromConstraints, I thought we'd better to put InferFiltersFromConstraints/InferFiltersFromGenerate there together.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the InferFiltersFromConstraints is allways filtered out from the operatorOptimizationRuleSet:

val rulesWithoutInferFiltersFromConstraints =
operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints)
Batch("Operator Optimization before Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::

I think it is very misleading , perhaps we could remove it from there and avoid further confusion?

It was asked in the original PR, put looks like it was forgotten:
https://github.com/apache/spark/pull/19149/files#r157777935

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, but I also don't know why. cc: @cloud-fan

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite remember, maybe we can remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and removed it from the operatorOptimizationRuleSet. All the tests are still passing as was expected.

InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::
Expand Down Expand Up @@ -1847,3 +1848,25 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
empty(ll)
}
}

/**
* Generates filters for exploded expression, such that rows that would have been removed
* by this [[Generate]] can be removed earlier - before joins and in data sources.
*/
object InferFiltersFromGenerate extends Rule[LogicalPlan] {
Comment thread
tanelk marked this conversation as resolved.
Outdated
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case g @ Generate(e: ExplodeBase, _, false, _, _, child)
Comment thread
tanelk marked this conversation as resolved.
Outdated
if e.deterministic && !e.child.foldable =>

// Exclude child's constraints to guarantee idempotency
val inferredFilters = ExpressionSet(
Seq(GreaterThan(Size(e.child), Literal(0)), IsNotNull(e.child))
) -- child.constraints

if (inferredFilters.nonEmpty) {
g.copy(child = Filter(inferredFilters.reduce(And), child))
} else {
g
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.IntegerType

class InferFiltersFromGenerateSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Infer Filters", Once, InferFiltersFromGenerate) :: Nil
}

val testRelation = LocalRelation('a.array(IntegerType))

Seq(
(e: Expression) => Explode(e),
(e: Expression) => PosExplode(e)
).foreach(f => {
val explode = f('a)
test("Infer filters from " + explode) {
val originalQuery = testRelation.generate(explode).analyze
val correctAnswer = testRelation
.where(IsNotNull('a) && Size('a) > 0)
Comment thread
tanelk marked this conversation as resolved.
.generate(explode)
.analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("Don't infer filters from outer " + explode) {
val originalQuery = testRelation.generate(explode, outer = true).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}

val foldableExplode = f(CreateArray(Seq(Literal(0), Literal(1))))
test("Don't infer filters from " + foldableExplode) {
val originalQuery = testRelation.generate(foldableExplode).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, originalQuery)
}
})
}