From 65b831aa2eba01c5a8127655933fa9e8ba6b399e Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Mon, 7 Feb 2022 10:15:17 +0800 Subject: [PATCH] Retry all map tasks if partition key is indeterminate --- .../apache/spark/rdd/MapPartitionsRDD.scala | 9 ++++- .../main/scala/org/apache/spark/rdd/RDD.scala | 9 ++++- .../exchange/ShuffleExchangeExec.scala | 39 ++++++++++++++++--- .../apache/spark/sql/execution/limit.scala | 4 +- .../org/apache/spark/sql/JoinSuite.scala | 24 ++++++++++++ 5 files changed, 73 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index 39520a9734b0..0e1cd55c96eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -35,13 +35,17 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param isPartitionKeyIndeterminate whether or not the partition key is indeterminate. + * If not, it may return different result event though + * [[org.apache.spark.Partitioner]] is deterministic. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, - isOrderSensitive: Boolean = false) + isOrderSensitive: Boolean = false, + isPartitionKeyIndeterminate: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None @@ -60,7 +64,8 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( isFromBarrier || dependencies.exists(_.rdd.isBarrier()) override protected def getOutputDeterministicLevel = { - if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) { + if (isPartitionKeyIndeterminate || + (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED)) { DeterministicLevel.INDETERMINATE } else { super.getOutputDeterministicLevel diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 71885664513a..4c6816b2c097 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -867,16 +867,21 @@ abstract class RDD[T: ClassTag]( * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param isPartitionKeyIndeterminate whether or not the partition key is indeterminate. + * If not, it may return different result event though + * [[org.apache.spark.Partitioner]] is deterministic. */ private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false, - isOrderSensitive: Boolean = false): RDD[U] = withScope { + isOrderSensitive: Boolean = false, + isPartitionKeyIndeterminate: Boolean = false): RDD[U] = withScope { new MapPartitionsRDD( this, (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), preservesPartitioning = preservesPartitioning, - isOrderSensitive = isOrderSensitive) + isOrderSensitive = isOrderSensitive, + isPartitionKeyIndeterminate = isPartitionKeyIndeterminate) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index c033aedc7786..45317c564979 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, NamedExpression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ @@ -167,7 +167,7 @@ case class ShuffleExchangeExec( lazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = { val dep = ShuffleExchangeExec.prepareShuffleDependency( inputRDD, - child.output, + child, outputPartitioning, serializer, writeMetrics) @@ -260,11 +260,12 @@ object ShuffleExchangeExec { */ def prepareShuffleDependency( rdd: RDD[InternalRow], - outputAttributes: Seq[Attribute], + child: SparkPlan, newPartitioning: Partitioning, serializer: Serializer, writeMetrics: Map[String, SQLMetric]) : ShuffleDependency[Int, InternalRow, InternalRow] = { + val outputAttributes = child.output val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(_, n) => @@ -373,18 +374,21 @@ object ShuffleExchangeExec { } // round-robin function is order sensitive if we don't sort the input. - val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition + val isOrderSensitive = (isRoundRobin && !SQLConf.get.sortBeforeRepartition) + val isPartitionKeyIndeterminate = isPartitioningIndeterminate(newPartitioning, child) if (needToCopyObjectsBeforeShuffle(part)) { newRdd.mapPartitionsWithIndexInternal((_, iter) => { val getPartitionKey = getPartitionKeyExtractor() iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } - }, isOrderSensitive = isOrderSensitive) + }, isOrderSensitive = isOrderSensitive, + isPartitionKeyIndeterminate = isPartitionKeyIndeterminate) } else { newRdd.mapPartitionsWithIndexInternal((_, iter) => { val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } - }, isOrderSensitive = isOrderSensitive) + }, isOrderSensitive = isOrderSensitive, + isPartitionKeyIndeterminate = isPartitionKeyIndeterminate) } } @@ -401,6 +405,29 @@ object ShuffleExchangeExec { dependency } + /** + * Checks if the shuffle partitioning contains indeterminate expression/reference. + */ + private def isPartitioningIndeterminate(partitioning: Partitioning, plan: SparkPlan): Boolean = { + val indeterminateAttrs = plan.flatMap(_.expressions).collect { + case e: NamedExpression if !e.deterministic => e.exprId + }.toSet + + def hasIndeterminateReference(e: Expression): Boolean = { + indeterminateAttrs.size > 0 && + e.find { + case a: AttributeReference if indeterminateAttrs.contains(a.exprId) => true + case _ => false + }.nonEmpty + } + + partitioning match { + case HashPartitioning(exprs, _) + if exprs.exists(e => !e.deterministic || hasIndeterminateReference(e)) => true + case _ => false + } + } + /** * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5114c075a72d..4d9f723602f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -64,7 +64,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( locallyLimited, - child.output, + child, SinglePartition, serializer, writeMetrics), @@ -233,7 +233,7 @@ case class TakeOrderedAndProjectExec( new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( localTopK, - child.output, + child, SinglePartition, serializer, writeMetrics), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 77493afe4314..01a4494c5c91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer import org.mockito.Mockito._ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} +import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder} @@ -1440,4 +1441,27 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } } + + test("Join by rand should generate indeterminate mapPartitionRDD") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( + """ + |SELECT + | t1.key, t2.randkey, t2.a + |FROM + | testData t1 + |LEFT JOIN + | (select rand() as randkey, a from testData2) t2 + |ON + | t1.key = t2.randkey + """.stripMargin) + + val shuffleDeps = collect(df.queryExecution.executedPlan) { + case s: ShuffleExchangeExec => s.shuffleDependency + } + assert(shuffleDeps.size == 2) + assert(shuffleDeps.filter( + _.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE).size == 1) + } + } }