Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class QueryExecution(
executePhase(QueryPlanningTracker.PLANNING) {
// Clone the logical plan here, in case the planner rules change the states of the logical
// plan.
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
QueryExecution.createSparkPlan(planner, optimizedPlan.clone())
}
}

Expand Down Expand Up @@ -574,7 +574,6 @@ object QueryExecution {
* Note that the returned physical plan still needs to be prepared for execution.
*/
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
Expand All @@ -594,7 +593,7 @@ object QueryExecution {
* [[SparkPlan]] for execution.
*/
def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {
val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())
val sparkPlan = createSparkPlan(spark.sessionState.planner, plan.clone())
prepareExecutedPlan(spark, sparkPlan)
}

Expand All @@ -603,11 +602,11 @@ object QueryExecution {
* This method is only called by [[PlanAdaptiveDynamicPruningFilters]].
*/
def prepareExecutedPlan(
session: SparkSession,
plan: LogicalPlan,
context: AdaptiveExecutionContext): SparkPlan = {
val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())
val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
val sparkPlan = createSparkPlan(context.session.sessionState.planner, plan.clone())
val preparationRules =
preparations(context.session, Option(InsertAdaptiveSparkPlan(context)), true)
prepareForExecution(preparationRules, sparkPlan.clone())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ case class InsertAdaptiveSparkPlan(
// Apply the same instance of this rule to sub-queries so that sub-queries all share the
// same `stageCache` for Exchange reuse.
this.applyInternal(
QueryExecution.createSparkPlan(adaptiveExecutionContext.session,
QueryExecution.createSparkPlan(
adaptiveExecutionContext.session.sessionState.planner, plan.clone()), true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ case class PlanAdaptiveDynamicPruningFilters(
val aliases = indices.map(idx => Alias(buildKeys(idx), buildKeys(idx).toString)())
val aggregate = Aggregate(aliases, aliases, buildPlan)

val session = adaptivePlan.context.session
val sparkPlan = QueryExecution.prepareExecutedPlan(
session, aggregate, adaptivePlan.context)
val sparkPlan = QueryExecution.prepareExecutedPlan(aggregate, adaptivePlan.context)
assert(sparkPlan.isInstanceOf[AdaptiveSparkPlanExec])
val newAdaptivePlan = sparkPlan.asInstanceOf[AdaptiveSparkPlanExec]
val values = SubqueryExec(name, newAdaptivePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp
plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)) {
case DynamicPruningSubquery(
value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, exprId, _) =>
val sparkPlan = QueryExecution.createSparkPlan(
sparkSession, sparkSession.sessionState.planner, buildPlan)
val sparkPlan = QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan)
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
Expand Down