Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.ColumnarRule
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}

/**
* :: Experimental ::
Expand All @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.ColumnarRule
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
* <li>Columnar Rules.</li>
* <li>Adaptive Query Stage Preparation Rules.</li>
* </ul>
*
* The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
Expand Down Expand Up @@ -96,8 +97,10 @@ class SparkSessionExtensions {
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule
type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the document (at line 47) accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I pushed an update for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public API. I think we also need to add a version information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is marked as unstable and experimental. Do we need to add version information in that case? I am fine with doing so, but then we should add it everywhere.


private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder]

/**
* Build the override rules for columnar execution.
Expand All @@ -106,13 +109,28 @@ class SparkSessionExtensions {
columnarRuleBuilders.map(_.apply(session)).toSeq
}

/**
* Build the override rules for the query stage preparation phase of adaptive query execution.
*/
private[sql] def buildQueryStagePrepRules(session: SparkSession): Seq[Rule[SparkPlan]] = {
queryStagePrepRuleBuilders.map(_.apply(session)).toSeq
}

/**
* Inject a rule that can override the columnar execution of an executor.
*/
def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
columnarRuleBuilders += builder
}

/**
* Inject a rule that can override the the query stage preparation phase of adaptive query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the the => the

* execution.
*/
def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
queryStagePrepRuleBuilders += builder
}

private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ case class AdaptiveSparkPlanExec(
// Exchange nodes) after running these rules.
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
Copy link
Member

@gatorsmile gatorsmile Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this function should be moved out of the physical node AdaptiveSparkPlanExec? cc @maryannxue @cloud-fan

ensureRequirements
)
) ++ context.session.sessionState.queryStagePrepRules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this after ensure requirements? The queryStagePrepRules might break requirements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom rules may need to see exchange nodes as well. We can do validation at the end to make sure exchange requirements are still required.


// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlan, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.aggregate.ResolveEncodersInScalaAgg
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.command.CommandCheck
Expand Down Expand Up @@ -286,6 +286,10 @@ abstract class BaseSessionStateBuilder(
extensions.buildColumnarRules(session)
}

protected def queryStagePrepRules: Seq[Rule[SparkPlan]] = {
extensions.buildQueryStagePrepRules(session)
}

/**
* Create a query execution object.
*/
Expand Down Expand Up @@ -337,7 +341,8 @@ abstract class BaseSessionStateBuilder(
() => resourceLoader,
createQueryExecution,
createClone,
columnarRules)
columnarRules,
queryStagePrepRules)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution._
import org.apache.spark.sql.streaming.StreamingQueryManager
Expand Down Expand Up @@ -73,7 +74,8 @@ private[sql] class SessionState(
resourceLoaderBuilder: () => SessionResourceLoader,
createQueryExecution: LogicalPlan => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState,
val columnarRules: Seq[ColumnarRule]) {
val columnarRules: Seq[ColumnarRule],
val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionState is a critical concept/class in Spark SQL. Adding queryStagePrepRules into SessionState looks weird to me. WDYT?

cc @hvanhovell

Copy link
Contributor

@hvanhovell hvanhovell Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably both columnarRules and queryStagePrepRules should be passed directly to the thing that is using them. This is not entirely easy because of how we deal with post-planning rules in QueryExecution. It would be great if someone could move those into the builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very hard. The extension rules are injected in the BaseSessionStateBuilder, and we need to carry it in SessionState to propagate it further. The custom columnarRules and queryStagePrepRules are no different from custom analyzer/optimizer/planner rules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just refactoring right (famous last words)? You pull out the rules from query execution, put them in the session state builder, and manipulate them there.

Copy link
Contributor

@cloud-fan cloud-fan Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something. The session state builder is used to build SessionState. SparkSession or QueryExecition doesn't hold session state builder instance anymore.

Are you suggesting we create something like Analyzer, Optimizer, etc. to wrap the post-planning rules in BaseSessionStateBuilder, so that we just pass that wrapper around?


// The following fields are lazy to avoid creating the Hive client when creating SessionState.
lazy val catalog: SessionCatalog = catalogBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, UnresolvedHint}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE
Expand Down Expand Up @@ -145,6 +147,28 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}

test("inject adaptive query prep rule") {
val extensions = create { extensions =>
// inject rule that will run during AQE query stage preparation and will add custom tags
// to the plan
extensions.injectQueryStagePrepRule(session => MyQueryStagePrepRule())
// inject rule that will run during AQE query stage optimization and will verify that the
// custom tags were written in the preparation phase
extensions.injectColumnar(session =>
MyColumarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))
}
withSession(extensions) { session =>
session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
assert(session.sessionState.queryStagePrepRules.contains(MyQueryStagePrepRule()))
assert(session.sessionState.columnarRules.contains(
MyColumarRule(MyNewQueryStageRule(), MyNewQueryStageRule())))
import session.sqlContext.implicits._
val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1)
val df = data.selectExpr("vals + 1")
df.collect()
}
}

test("inject columnar") {
val extensions = create { extensions =>
extensions.injectColumnar(session =>
Expand Down Expand Up @@ -731,6 +755,31 @@ class MyExtensions extends (SparkSessionExtensions => Unit) {
}
}

object QueryPrepRuleHelper {
val myPrepTag: TreeNodeTag[String] = TreeNodeTag[String]("myPrepTag")
val myPrepTagValue: String = "myPrepTagValue"
}

// this rule will run during AQE query preparation and will write custom tags to each node
case class MyQueryStagePrepRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case plan =>
plan.setTagValue(QueryPrepRuleHelper.myPrepTag, QueryPrepRuleHelper.myPrepTagValue)
plan
}
}

// this rule will run during AQE query stage optimization and will verify custom tags were
// already written during query preparation phase
case class MyNewQueryStageRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
case plan if !plan.isInstanceOf[AdaptiveSparkPlanExec] =>
assert(plan.getTagValue(QueryPrepRuleHelper.myPrepTag).get ==
QueryPrepRuleHelper.myPrepTagValue)
plan
}
}

case class MyRule2(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
Expand Down