From 6c1417a9efed6926bf5c94ac87855486815ea0db Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 2 Jan 2020 16:04:27 +0800 Subject: [PATCH 1/4] improve handle nondeterministic filter --- .../sql/catalyst/planning/patterns.scala | 21 ++++++++++++------- .../datasources/FileSourceStrategy.scala | 1 + .../spark/sql/sources/PrunedScanSuite.scala | 4 ++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 4e790b1dd3f36..e7ea8dc40ff2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.planning -import scala.collection.mutable - import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ @@ -142,14 +140,21 @@ object ScanOperation extends OperationHelper with PredicateHelper { case Filter(condition, child) => collectProjectsAndFilters(child) match { case Some((fields, filters, other, aliases)) => - // Follow CombineFilters and only keep going if the collected Filters - // are all deterministic and this filter doesn't have common non-deterministic + // Follow CombineFilters and only keep going if 1) the collected Filters + // and this filter are all deterministic or 2) if this filter is non-deterministic, + // but it's the only one who doesn't have common non-deterministic // expressions with lower Project. - if (filters.forall(_.deterministic) && - !hasCommonNonDeterministic(Seq(condition), aliases)) { + if (filters.nonEmpty && filters.forall(_.deterministic)) { + val substitutedCondition = substitute(aliases)(condition) + if (substitutedCondition.deterministic) { + Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), + other, aliases)) + } else { + None + } + } else if (filters.isEmpty && !hasCommonNonDeterministic(Seq(condition), aliases)) { val substitutedCondition = substitute(aliases)(condition) - Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), - other, aliases)) + Some((fields, splitConjunctivePredicates(substitutedCondition), other, aliases)) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8385db9f78653..6e316ca915584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -148,6 +148,7 @@ object FileSourceStrategy extends Strategy with Logging { val filterSet = ExpressionSet(filters) val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, l.output) + .filter(_.deterministic) val partitionColumns = l.resolve( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 237717a3ad196..f242f75f39f20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -116,8 +116,8 @@ class PrunedScanSuite extends DataSourceTest with SharedSparkSession { testPruning("SELECT a FROM oneToTenPruned", "a") testPruning("SELECT b FROM oneToTenPruned", "b") testPruning("SELECT a, rand() FROM oneToTenPruned WHERE a > 5", "a") - testPruning("SELECT a FROM oneToTenPruned WHERE rand() > 5", "a") - testPruning("SELECT a, rand() FROM oneToTenPruned WHERE rand() > 5", "a") + testPruning("SELECT a FROM oneToTenPruned WHERE rand() > 0.5", "a") + testPruning("SELECT a, rand() FROM oneToTenPruned WHERE rand() > 0.5", "a") testPruning("SELECT a, rand() FROM oneToTenPruned WHERE b > 5", "a", "b") def testPruning(sqlString: String, expectedColumns: String*): Unit = { From 703142054f572b6cdf9d51baf22fffbe5d416e16 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Thu, 2 Jan 2020 20:35:23 +0800 Subject: [PATCH 2/4] address comment --- .../sql/catalyst/planning/patterns.scala | 23 ++++++++----------- .../datasources/FileSourceStrategy.scala | 4 ++-- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index e7ea8dc40ff2f..da39b4a5e870e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -141,20 +141,15 @@ object ScanOperation extends OperationHelper with PredicateHelper { collectProjectsAndFilters(child) match { case Some((fields, filters, other, aliases)) => // Follow CombineFilters and only keep going if 1) the collected Filters - // and this filter are all deterministic or 2) if this filter is non-deterministic, - // but it's the only one who doesn't have common non-deterministic - // expressions with lower Project. - if (filters.nonEmpty && filters.forall(_.deterministic)) { - val substitutedCondition = substitute(aliases)(condition) - if (substitutedCondition.deterministic) { - Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), - other, aliases)) - } else { - None - } - } else if (filters.isEmpty && !hasCommonNonDeterministic(Seq(condition), aliases)) { - val substitutedCondition = substitute(aliases)(condition) - Some((fields, splitConjunctivePredicates(substitutedCondition), other, aliases)) + // and this filter are all deterministic or 2) if this filter is the first + // collected filter which is non-deterministic and doesn't have common + // non-deterministic expressions with lower Project. + val substitutedCondition = substitute(aliases)(condition) + val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) && + substitutedCondition.deterministic) || filters.isEmpty + if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) { + Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), + other, aliases)) } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 6e316ca915584..f45495121a980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -147,8 +147,8 @@ object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, l.output) - .filter(_.deterministic) + val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(_.deterministic), l.output) val partitionColumns = l.resolve( From 0df1c9ca9d7d643f00cde33d08e374b3744d0d10 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 3 Jan 2020 15:45:31 +0800 Subject: [PATCH 3/4] address comment --- .../spark/sql/catalyst/planning/patterns.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index da39b4a5e870e..c5b7ce98d9495 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -33,7 +33,9 @@ trait OperationHelper { }) protected def substitute(aliases: AttributeMap[Expression])(expr: Expression): Expression = { - expr.transform { + // use transformUp instead of transformDown to avoid dead loop + // in case of there's Alias which recursively alias itself. + expr.transformUp { case a @ Alias(ref: AttributeReference, name) => aliases.get(ref) .map(Alias(_, name)(a.exprId, a.qualifier)) @@ -142,11 +144,11 @@ object ScanOperation extends OperationHelper with PredicateHelper { case Some((fields, filters, other, aliases)) => // Follow CombineFilters and only keep going if 1) the collected Filters // and this filter are all deterministic or 2) if this filter is the first - // collected filter which is non-deterministic and doesn't have common - // non-deterministic expressions with lower Project. - val substitutedCondition = substitute(aliases)(condition) - val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) && - substitutedCondition.deterministic) || filters.isEmpty + // collected filter and doesn't have common non-deterministic expressions + // with lower Project. + lazy val substitutedCondition = substitute(aliases)(condition) + val canCombineFilters = filters.isEmpty || (filters.nonEmpty && + filters.forall(_.deterministic) && substitutedCondition.deterministic) if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) { Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)) From 5fec1582b5329b163e210e1482104d17299e5743 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 3 Jan 2020 16:00:29 +0800 Subject: [PATCH 4/4] address comment --- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index c5b7ce98d9495..415ce46788119 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -34,7 +34,7 @@ trait OperationHelper { protected def substitute(aliases: AttributeMap[Expression])(expr: Expression): Expression = { // use transformUp instead of transformDown to avoid dead loop - // in case of there's Alias which recursively alias itself. + // in case of there's Alias whose exprId is the same as its child attribute. expr.transformUp { case a @ Alias(ref: AttributeReference, name) => aliases.get(ref) @@ -146,9 +146,9 @@ object ScanOperation extends OperationHelper with PredicateHelper { // and this filter are all deterministic or 2) if this filter is the first // collected filter and doesn't have common non-deterministic expressions // with lower Project. - lazy val substitutedCondition = substitute(aliases)(condition) - val canCombineFilters = filters.isEmpty || (filters.nonEmpty && - filters.forall(_.deterministic) && substitutedCondition.deterministic) + val substitutedCondition = substitute(aliases)(condition) + val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) && + substitutedCondition.deterministic) || filters.isEmpty if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) { Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases))