Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch rewrites data source plans and should be run after the operator
// optimization batch and before any batches that depend on stats.
Batch("Data Source Rewrite Rules", Once, dataSourceRewriteRules: _*) :+
// This batch rewrites plans after the operator optimization and
// before any batches that depend on stats.
Batch("Pre CBO Rules", Once, preCBORules: _*) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Expand Down Expand Up @@ -293,10 +293,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for rewriting data source plans. Such rules will be
* applied after operator optimization rules and before any rules that depend on stats.
* Override to provide additional rules for rewriting plans after operator optimization rules and
* before any cost-based optimization rules that depend on stats.
*/
def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil
def preCBORules: Seq[Rule[LogicalPlan]] = Nil

/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>Analyzer Rules.</li>
* <li>Check Analysis Rules.</li>
* <li>Optimizer Rules.</li>
* <li>Data Source Rewrite Rules.</li>
* <li>Pre CBO Rules.</li>
* <li>Planning Strategies.</li>
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
Expand Down Expand Up @@ -200,19 +200,19 @@ class SparkSessionExtensions {
optimizerRules += builder
}

private[this] val dataSourceRewriteRules = mutable.Buffer.empty[RuleBuilder]
private[this] val preCBORules = mutable.Buffer.empty[RuleBuilder]

private[sql] def buildDataSourceRewriteRules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
dataSourceRewriteRules.map(_.apply(session)).toSeq
private[sql] def buildPreCBORules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
preCBORules.map(_.apply(session)).toSeq
}

/**
* Inject an optimizer `Rule` builder that rewrites data source plans into the [[SparkSession]].
* The injected rules will be executed after the operator optimization batch and before rules
* that depend on stats.
* Inject an optimizer `Rule` builder that rewrites logical plans into the [[SparkSession]].
* The injected rules will be executed once after the operator optimization batch and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that the rule will be executed once to the docs.

* before any cost-based optimization rules that depend on stats.
*/
def injectDataSourceRewriteRule(builder: RuleBuilder): Unit = {
dataSourceRewriteRules += builder
def injectPreCBORule(builder: RuleBuilder): Unit = {
preCBORules += builder
}

private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ abstract class BaseSessionStateBuilder(
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
super.earlyScanPushDownRules ++ customEarlyScanPushDownRules

override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] =
super.dataSourceRewriteRules ++ customDataSourceRewriteRules
override def preCBORules: Seq[Rule[LogicalPlan]] =
super.preCBORules ++ customPreCBORules

override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
Expand All @@ -258,13 +258,13 @@ abstract class BaseSessionStateBuilder(
protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Custom rules for rewriting data source plans to add to the Optimizer. Prefer overriding
* this instead of creating your own Optimizer.
* Custom rules for rewriting plans after operator optimization and before CBO.
* Prefer overriding this instead of creating your own Optimizer.
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = {
extensions.buildDataSourceRewriteRules(session)
protected def customPreCBORules: Seq[Rule[LogicalPlan]] = {
extensions.buildPreCBORules(session)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}

test("SPARK-33621: inject data source rewrite rule") {
withSession(Seq(_.injectDataSourceRewriteRule(MyRule))) { session =>
assert(session.sessionState.optimizer.dataSourceRewriteRules.contains(MyRule(session)))
test("SPARK-33621: inject a pre CBO rule") {
withSession(Seq(_.injectPreCBORule(MyRule))) { session =>
assert(session.sessionState.optimizer.preCBORules.contains(MyRule(session)))
}
}

Expand Down