From 1044713bb1ee6915f55335ecaa74f16d4087775e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 Nov 2022 15:04:42 +0800 Subject: [PATCH 1/2] respect the original Filter operator order --- .../execution/datasources/v2/V2ScanRelationPushDown.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 24ffe4b887d9..246bc65fcadf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -350,7 +350,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val normalizedProjects = DataSourceStrategy .normalizeExprs(project, sHolder.output) .asInstanceOf[Seq[NamedExpression]] - val allFilters = filtersStayUp ++ filtersPushDown.reduceOption(And) + val allFilters = filtersPushDown.reduceOption(And).toSeq ++ filtersStayUp val normalizedFilters = DataSourceStrategy.normalizeExprs(allFilters, sHolder.output) val (scan, output) = PushDownUtils.pruneColumns( sHolder.builder, sHolder.relation, normalizedProjects, normalizedFilters) @@ -371,7 +371,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } val finalFilters = normalizedFilters.map(projectionFunc) - val withFilter = finalFilters.foldRight[LogicalPlan](scanRelation)((cond, plan) => { + // bottom-most filters are put in the left of the list. + val withFilter = finalFilters.foldLeft[LogicalPlan](scanRelation)((cond, plan) => { Filter(cond, plan) }) From 22ef6e34168f0c9c8107ff2c1f4e75fd68f11eae Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 Nov 2022 16:28:53 +0800 Subject: [PATCH 2/2] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 246bc65fcadf..87b11da5d5c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -372,7 +372,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val finalFilters = normalizedFilters.map(projectionFunc) // bottom-most filters are put in the left of the list. - val withFilter = finalFilters.foldLeft[LogicalPlan](scanRelation)((cond, plan) => { + val withFilter = finalFilters.foldLeft[LogicalPlan](scanRelation)((plan, cond) => { Filter(cond, plan) })