From 06847fe1f2b24a13d2c05b6788c1a68f56b1125e Mon Sep 17 00:00:00 2001 From: jiake Date: Fri, 25 Oct 2019 11:55:13 +0800 Subject: [PATCH 1/2] change the leaf node to unary node about local shuffle reader exec --- .../spark/sql/execution/SparkPlanInfo.scala | 1 - .../adaptive/OptimizeLocalShuffleReader.scala | 16 ++++++++++++++-- .../sql/execution/adaptive/QueryStageExec.scala | 3 ++- .../adaptive/ReduceNumShufflePartitions.scala | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 459311df22d2..9351b074c659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -56,7 +56,6 @@ private[execution] object SparkPlanInfo { case ReusedSubqueryExec(child) => child :: Nil case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil - case localReader: LocalShuffleReaderExec => localReader.child :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 89e2813695a6..be72313e64c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} import org.apache.spark.sql.internal.SQLConf @@ -37,6 +37,16 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) } + def setIsLocalShuffleToTrue(stage: QueryStageExec): Unit = { + stage match { + case stage: ShuffleQueryStageExec => + stage.isLocalShuffle = true + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => + stage.isLocalShuffle = true + } + + } + override def apply(plan: SparkPlan): SparkPlan = { if (!conf.getConf(SQLConf.OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED)) { return plan @@ -45,9 +55,11 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { val optimizedPlan = plan.transformDown { case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) + setIsLocalShuffleToTrue(join.right.asInstanceOf[QueryStageExec]) join.copy(right = localReader) case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) + setIsLocalShuffleToTrue(join.left.asInstanceOf[QueryStageExec]) join.copy(left = localReader) } @@ -70,7 +82,7 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { } } -case class LocalShuffleReaderExec(child: QueryStageExec) extends LeafExecNode { +case class LocalShuffleReaderExec(child: QueryStageExec) extends UnaryExecNode { override def output: Seq[Attribute] = child.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 231fffce3360..2e295bd6401b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -129,7 +129,8 @@ abstract class QueryStageExec extends LeafExecNode { */ case class ShuffleQueryStageExec( override val id: Int, - override val plan: ShuffleExchangeExec) extends QueryStageExec { + override val plan: ShuffleExchangeExec, + var isLocalShuffle: Boolean = false) extends QueryStageExec { @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { if (plan.inputRDD.getNumPartitions == 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 1a85d5c02075..50aba14ecc94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -70,7 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. - if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) { + if (!shuffleStages.forall(_.plan.canChangeNumPartitions) || + shuffleStages.exists(_.isLocalShuffle == true)) { plan } else { val shuffleMetrics = shuffleStages.map { stage => From b5eb97716acbaaf8e166ed9475b76a22754c6315 Mon Sep 17 00:00:00 2001 From: jiake Date: Mon, 28 Oct 2019 10:23:08 +0800 Subject: [PATCH 2/2] resolve the comments --- .../adaptive/AdaptiveSparkPlanHelper.scala | 1 - .../adaptive/OptimizeLocalShuffleReader.scala | 12 ------------ .../sql/execution/adaptive/QueryStageExec.scala | 3 +-- .../adaptive/ReduceNumShufflePartitions.scala | 15 +++++++++------ 4 files changed, 10 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala index 94e66b0c3a43..0ec8710e4db4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala @@ -125,7 +125,6 @@ trait AdaptiveSparkPlanHelper { private def allChildren(p: SparkPlan): Seq[SparkPlan] = p match { case a: AdaptiveSparkPlanExec => Seq(a.executedPlan) case s: QueryStageExec => Seq(s.plan) - case l: LocalShuffleReaderExec => Seq(l.child) case _ => p.children } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index be72313e64c1..d8dd7224fef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -37,16 +37,6 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) } - def setIsLocalShuffleToTrue(stage: QueryStageExec): Unit = { - stage match { - case stage: ShuffleQueryStageExec => - stage.isLocalShuffle = true - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => - stage.isLocalShuffle = true - } - - } - override def apply(plan: SparkPlan): SparkPlan = { if (!conf.getConf(SQLConf.OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED)) { return plan @@ -55,11 +45,9 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { val optimizedPlan = plan.transformDown { case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) - setIsLocalShuffleToTrue(join.right.asInstanceOf[QueryStageExec]) join.copy(right = localReader) case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) => val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) - setIsLocalShuffleToTrue(join.left.asInstanceOf[QueryStageExec]) join.copy(left = localReader) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 2e295bd6401b..231fffce3360 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -129,8 +129,7 @@ abstract class QueryStageExec extends LeafExecNode { */ case class ShuffleQueryStageExec( override val id: Int, - override val plan: ShuffleExchangeExec, - var isLocalShuffle: Boolean = false) extends QueryStageExec { + override val plan: ShuffleExchangeExec) extends QueryStageExec { @transient lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { if (plan.inputRDD.getNumPartitions == 0) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 50aba14ecc94..5a505c213a26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.adaptive.{LocalShuffleReaderExec, QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -64,14 +64,17 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { return plan } - val shuffleStages = plan.collect { - case stage: ShuffleQueryStageExec => stage - case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage + def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match { + case _: LocalShuffleReaderExec => Nil + case stage: ShuffleQueryStageExec => Seq(stage) + case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => Seq(stage) + case _ => plan.children.flatMap(collectShuffleStages) } + + val shuffleStages = collectShuffleStages(plan) // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. - if (!shuffleStages.forall(_.plan.canChangeNumPartitions) || - shuffleStages.exists(_.isLocalShuffle == true)) { + if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) { plan } else { val shuffleMetrics = shuffleStages.map { stage =>