Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ class SparkOptimizer(
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("PartitionPruning", Once,
PartitionPruning,
RowLevelOperationRuntimeGroupFiltering,
OptimizeSubqueries) :+
// We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries
// twice which may break some optimizer rules that can only be applied once. The rule below
// only invokes `OptimizeSubqueries` to optimize newly added subqueries.
Comment on lines +54 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? This batch has only PartitionPruning and RowLevelOperationRuntimeGroupFiltering. What some optimizer rules are? PartitionPruning?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you mean other Once batches in SparkOptimizer.defaultBatches?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in Optimizer where OptimizeSubqueries also runs, there are also other Once batches but seems fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the optimizer batches are optimizing the same query plan. If OptimizeSubqueries appears twice, it means the subqueries are optimized twice.

Note that, most optimizer rules don't optimize subqueries, they need OptimizeSubqueries to invoke the entire optimizer to optimize subqueries recursively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inspired by #38619 , maybe we don't need to invoke the entire optimizer, but just a few rules to optimize this subquery.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the optimizer batches are optimizing the same query plan. If OptimizeSubqueries appears twice, it means the subqueries are optimized twice.

Oh, got it, you actually mean OptimizeSubqueries is applied twice (here and Optimizer). I thought that by running OptimizeSubqueries itself here breaks some rules which cannot run twice.

new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
Batch("MergeScalarSubqueries", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
case class PlanAdaptiveDynamicPruningFilters(
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
}

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
*
* Note this rule only applies to group-based row-level operations.
*/
object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {
class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
extends Rule[LogicalPlan] with PredicateHelper {

import DataSourceV2Implicits._

Expand All @@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre
Filter(dynamicPruningCond, r)
}

replaceData.copy(query = newQuery)
// optimize subqueries to rewrite them as joins and trigger job planning
replaceData.copy(query = optimizeSubqueries(newQuery))
}

private def buildMatchingRowsPlan(
Expand Down