Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, subqueryCache),

// When adding local shuffle readers in 'OptimizeLocalShuffleReader`, we revert all the local
// readers if additional shuffles are introduced. This may be too conservative: maybe there is
// only one local reader that introduces shuffle, and we can still keep other local readers.
// Here we re-execute this rule with the sub-plan-tree of a query stage, to make sure necessary
// local readers are added before executing the query stage.
// This rule must be executed before `ReduceNumShufflePartitions`, as local shuffle readers
// can't change number of partitions.
OptimizeLocalShuffleReader(conf),
ReduceNumShufflePartitions(conf),
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
session.sessionState.columnarRules),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
// The child of remaining one BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 2)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, would the change make this value consistently be 2? Because the value has changed to 2 but the value was actually flaky (neither 1 or 2 consistently) depending on the situation/randomness (maybe).

You may want to run the same for what I've discovered, 1) solely in local dev, 2) test suite in local dev, 3) trigger CI for 5 times or alike.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR With this patch, the value will consistently be 2. Because we already optimize all the possible local shuffle reader. And I have run in local dev and also the test suite, the value are all 2. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for confirming.

}
}

Expand All @@ -188,7 +189,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
}
}
Expand All @@ -213,7 +215,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
}
}
Expand Down