diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 9624bf1fa9f6..c61fd9ce10f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -51,8 +51,10 @@ class SparkOptimizer( Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("PartitionPruning", Once, PartitionPruning, - RowLevelOperationRuntimeGroupFiltering, - OptimizeSubqueries) :+ + // We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries + // twice which may break some optimizer rules that can only be applied once. The rule below + // only invokes `OptimizeSubqueries` to optimize newly added subqueries. + new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+ Batch("InjectRuntimeFilter", FixedPoint(1), InjectRuntimeFilter) :+ Batch("MergeScalarSubqueries", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 21bc55110fe8..9a780c11eefa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati case class PlanAdaptiveDynamicPruningFilters( rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper { def apply(plan: SparkPlan): SparkPlan = { - if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) { + if (!conf.dynamicPartitionPruningEnabled) { return plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index df5e3ea13652..c9ff28eb0459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp } override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) { + if (!conf.dynamicPartitionPruningEnabled) { return plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index bb5edc94fa50..f2b513e630b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat * * Note this rule only applies to group-based row-level operations. */ -object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper { +class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan]) + extends Rule[LogicalPlan] with PredicateHelper { import DataSourceV2Implicits._ @@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre Filter(dynamicPruningCond, r) } - replaceData.copy(query = newQuery) + // optimize subqueries to rewrite them as joins and trigger job planning + replaceData.copy(query = optimizeSubqueries(newQuery)) } private def buildMatchingRowsPlan(