From 3445646cf0e24507c4c1c74cbca96db59c620008 Mon Sep 17 00:00:00 2001 From: ashahid Date: Thu, 20 Feb 2025 14:19:35 -0800 Subject: [PATCH 01/12] SPARK-51016. Fixing the code of isInDeterminate boolean for a Stage --- .../scala/org/apache/spark/Dependency.scala | 22 +++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 + .../spark/scheduler/ShuffleMapStage.scala | 2 + python/pyspark/pandas/internal.py | 6 +- .../sql/catalyst/expressions/Expression.scala | 3 + .../expressions/namedExpressions.scala | 16 ++++- .../expressions/ExpressionEvalHelper.scala | 2 +- .../expressions/NondeterministicSuite.scala | 34 +++++++++++ .../exchange/ShuffleExchangeExec.scala | 13 +++- .../spark/scheduler/ShuffleMapStageTest.scala | 61 +++++++++++++++++++ ...rojectedOrderingAndPartitioningSuite.scala | 40 +++++++++++- 11 files changed, 187 insertions(+), 15 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 573608c4327e..2751d7b3b2e0 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -79,13 +79,25 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, - val serializer: Serializer = SparkEnv.get.serializer, - val keyOrdering: Option[Ordering[K]] = None, - val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false, - val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) + val serializer: Serializer, + val keyOrdering: Option[Ordering[K]], + val aggregator: Option[Aggregator[K, V, C]], + val mapSideCombine: Boolean, + val shuffleWriterProcessor: ShuffleWriteProcessor, + val isInDeterministic: Boolean) extends Dependency[Product2[K, V]] with Logging { + def this ( + rdd: RDD[_ <: Product2[K, V]], + partitioner: Partitioner, + serializer: Serializer = SparkEnv.get.serializer, + keyOrdering: Option[Ordering[K]] = None, + aggregator: Option[Aggregator[K, V, C]] = None, + mapSideCombine: Boolean = false, + shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor + ) = this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine, + shuffleWriterProcessor, false) + if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 80db818b77e4..fb8564827b02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -2105,6 +2105,9 @@ abstract class RDD[T: ClassTag]( val deterministicLevelCandidates = dependencies.map { // The shuffle is not really happening, treat it like narrow dependency and assume the output // deterministic level of current RDD is same as parent. + case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic => + DeterministicLevel.INDETERMINATE + case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => dep.rdd.outputDeterministicLevel diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db09d19d0acf..38da37751206 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -94,4 +94,6 @@ private[spark] class ShuffleMapStage( .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } + + override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic || super.isIndeterminate } diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py index 3f6831b60067..bda05017e135 100644 --- a/python/pyspark/pandas/internal.py +++ b/python/pyspark/pandas/internal.py @@ -766,8 +766,9 @@ def __init__( for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - index_field.struct_field == struct_field + _drop_metadata(index_field.struct_field) == _drop_metadata(struct_field) for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) @@ -794,8 +795,9 @@ def __init__( for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - data_field.struct_field == struct_field + _drop_metadata(data_field.struct_field) == _drop_metadata(struct_field) for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 7d4f8c3b2564..2a6d928f5ab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -115,6 +115,9 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) + lazy val exprValHasIndeterministicCharacter: Boolean = !deterministic || + this.references.exists(_.exprValHasIndeterministicCharacter) + def nullable: Boolean /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 2af6a1ba84ec..421c2fd190a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -111,6 +111,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { @transient override lazy val references: AttributeSet = AttributeSet(this) + override lazy val exprValHasIndeterministicCharacter: Boolean = + metadata.contains(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) && + metadata.getBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) + def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute @@ -123,6 +127,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { } +object Attribute { + val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" +} + /** * Used to assign a new name to a computation. * For example the SQL expression "1 + 1 AS a" could be represented as follows: @@ -194,7 +202,13 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) + val mdForAttrib = if (this.exprValHasIndeterministicCharacter) { + new MetadataBuilder().withMetadata(metadata). + putBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() + } else { + metadata + } + AttributeReference(name, child.dataType, child.nullable, mdForAttrib)(exprId, qualifier) } else { UnresolvedAttribute.quoted(name) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 184f5a2a9485..2347410d4537 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case _ => expr.mapChildren(replace) } - private def prepareEvaluation(expression: Expression): Expression = { + def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance() val resolver = ResolveTimeZone val expr = replace(resolver.resolveTimeZones(expression)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index bf1c930c0bd0..983810a5fdae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning} class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("MonotonicallyIncreasingID") { @@ -31,4 +32,37 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("InputFileName") { checkEvaluation(InputFileName(), "") } + + test("SPARK-51016: has Indeterministic Component") { + def assertIndeterminancyComponent(expression: Expression): Unit = + assert(prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertIndeterminancyComponent(MonotonicallyIncreasingID()) + val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() + assertIndeterminancyComponent(alias) + assertIndeterminancyComponent(alias.toAttribute) + assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L))) + assertIndeterminancyComponent( + HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5)) + assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } + + test("SPARK-51016: has Deterministic Component") { + def assertNoIndeterminancyComponent(expression: Expression): Unit = + assert(!prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertNoIndeterminancyComponent(Literal(1000L)) + val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")() + assertNoIndeterminancyComponent(alias) + assertNoIndeterminancyComponent(alias.toAttribute) + assertNoIndeterminancyComponent( + HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5)) + assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertNoIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 31a3f53eb719..2e6261de4040 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -467,7 +467,10 @@ object ShuffleExchangeExec { }, isOrderSensitive = isOrderSensitive) } } - + val isIndeterministic = newPartitioning match { + case expr: Expression => expr.exprValHasIndeterministicCharacter + case _ => false + } // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough. @@ -476,7 +479,11 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) + None, + None, + false, + shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics), + isIndeterministic) dependency } diff --git a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala new file mode 100644 index 000000000000..da3f619c4a2c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType + +class ShuffleMapStageTest extends SharedSparkSession { + + test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2) + spark.sparkContext.addSparkListener(new SparkListener() { + var i = 0 + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + if (stageSubmitted.stageInfo.shuffleDepId.isDefined) { + shuffleStages(i) = + spark.sparkContext.dagScheduler.shuffleIdToMapStage(stageSubmitted.stageInfo.stageId) + i +=1 + } + } + }); + outerjoin.collect() + assert(shuffleStages.filter(_.isIndeterminate).size == 1) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f8..395c03b81055 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.execution -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{DeterministicLevel, RDD} +import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.functions.{col, floor, isnull, rand, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{LongType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +213,37 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-51016: ShuffleRDD using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22"), (3L, "33")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + + outerjoin.collect() + val finalPlan = outerjoin.queryExecution.executedPlan + val shuffleHJExec = finalPlan.children(0).asInstanceOf[ShuffledHashJoinExec] + assert(shuffleHJExec.left.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.INDETERMINATE) + + assert(shuffleHJExec.right.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.UNORDERED) + + assert(shuffleHJExec.execute().outputDeterministicLevel == DeterministicLevel.INDETERMINATE) + } + } } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode { From 3ce5ad3168a1e55d254ded8392be1ce7ced45b84 Mon Sep 17 00:00:00 2001 From: ashahid Date: Thu, 20 Feb 2025 15:18:32 -0800 Subject: [PATCH 02/12] SPARK-51016. updating comment --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index fb8564827b02..2ccc0f6476af 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -2103,11 +2103,11 @@ abstract class RDD[T: ClassTag]( @DeveloperApi protected def getOutputDeterministicLevel: DeterministicLevel.Value = { val deterministicLevelCandidates = dependencies.map { - // The shuffle is not really happening, treat it like narrow dependency and assume the output - // deterministic level of current RDD is same as parent. case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic => DeterministicLevel.INDETERMINATE + // The shuffle is not really happening, treat it like narrow dependency and assume the output + // deterministic level of current RDD is same as parent. case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => dep.rdd.outputDeterministicLevel From 038e14daf0b0a1007468712797a8cfffee7261ab Mon Sep 17 00:00:00 2001 From: ashahid Date: Fri, 28 Feb 2025 13:42:00 -0800 Subject: [PATCH 03/12] SPARK-51016. Refactored the code so as to pass the boolean hasInDeterminism via curried constructor of AttributeReference, as per feedback. Renamed the boolean --- .../sql/catalyst/expressions/Expression.scala | 7 ++-- .../expressions/ProjectionOverSchema.scala | 2 +- .../expressions/namedExpressions.scala | 34 +++++++------------ .../expressions/NondeterministicSuite.scala | 4 +-- .../spark/sql/catalyst/plans/PlanTest.scala | 3 +- .../exchange/ShuffleExchangeExec.scala | 2 +- 6 files changed, 24 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 2a6d928f5ab2..4922713de408 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -115,8 +115,11 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) - lazy val exprValHasIndeterministicCharacter: Boolean = !deterministic || - this.references.exists(_.exprValHasIndeterministicCharacter) + def hasIndeterminism: Boolean = _hasIndeterminism + + @transient + private lazy val _hasIndeterminism: Boolean = !deterministic || + this.references.exists(_.hasIndeterminism) def nullable: Boolean diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala index bb67c173b946..ea092093d095 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala @@ -38,7 +38,7 @@ case class ProjectionOverSchema(schema: StructType, output: AttributeSet) { private def getProjection(expr: Expression): Option[Expression] = expr match { case a: AttributeReference if fieldNames.contains(a.name) && output.contains(a) => - Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier)) + Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier, a.hasIndeterminism)) case GetArrayItem(child, arrayItemOrdinal, failOnError) => getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal, failOnError) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 421c2fd190a4..9c365d7565a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -111,9 +111,7 @@ abstract class Attribute extends LeafExpression with NamedExpression { @transient override lazy val references: AttributeSet = AttributeSet(this) - override lazy val exprValHasIndeterministicCharacter: Boolean = - metadata.contains(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) && - metadata.getBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) + override def hasIndeterminism: Boolean = false def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute @@ -127,10 +125,6 @@ abstract class Attribute extends LeafExpression with NamedExpression { } -object Attribute { - val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" -} - /** * Used to assign a new name to a computation. * For example the SQL expression "1 + 1 AS a" could be represented as follows: @@ -202,13 +196,8 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - val mdForAttrib = if (this.exprValHasIndeterministicCharacter) { - new MetadataBuilder().withMetadata(metadata). - putBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() - } else { - metadata - } - AttributeReference(name, child.dataType, child.nullable, mdForAttrib)(exprId, qualifier) + AttributeReference(name, child.dataType, child.nullable)(exprId, qualifier, + this.hasIndeterminism) } else { UnresolvedAttribute.quoted(name) } @@ -288,7 +277,8 @@ case class AttributeReference( nullable: Boolean = true, override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, - val qualifier: Seq[String] = Seq.empty[String]) + val qualifier: Seq[String] = Seq.empty[String], + override val hasIndeterminism: Boolean = false) extends Attribute with Unevaluable { override lazy val treePatternBits: BitSet = AttributeReferenceTreeBits.bits @@ -326,7 +316,8 @@ case class AttributeReference( } override def newInstance(): AttributeReference = - AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier) + AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier, + hasIndeterminism = hasIndeterminism) /** * Returns a copy of this [[AttributeReference]] with changed nullability. @@ -335,7 +326,8 @@ case class AttributeReference( if (nullable == newNullability) { this } else { - AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier) + AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier, + hasIndeterminism) } } @@ -343,7 +335,7 @@ case class AttributeReference( if (name == newName) { this } else { - AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier) + AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier, hasIndeterminism) } } @@ -354,7 +346,7 @@ case class AttributeReference( if (newQualifier == qualifier) { this } else { - AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier) + AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier, hasIndeterminism) } } @@ -362,12 +354,12 @@ case class AttributeReference( if (exprId == newExprId) { this } else { - AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier) + AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier, hasIndeterminism) } } override def withMetadata(newMetadata: Metadata): AttributeReference = { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier) + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, hasIndeterminism) } override def withDataType(newType: DataType): AttributeReference = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index 983810a5fdae..2f0aa5fa6875 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -35,7 +35,7 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-51016: has Indeterministic Component") { def assertIndeterminancyComponent(expression: Expression): Unit = - assert(prepareEvaluation(expression).exprValHasIndeterministicCharacter) + assert(prepareEvaluation(expression).hasIndeterminism) assertIndeterminancyComponent(MonotonicallyIncreasingID()) val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() @@ -52,7 +52,7 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-51016: has Deterministic Component") { def assertNoIndeterminancyComponent(expression: Expression): Unit = - assert(!prepareEvaluation(expression).exprValHasIndeterministicCharacter) + assert(!prepareEvaluation(expression).hasIndeterminism) assertNoIndeterminancyComponent(Literal(1000L)) val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index f06e6ed137cc..062a27075185 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -69,7 +69,8 @@ trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite => protected def rewriteNameFromAttrNullability(plan: LogicalPlan): LogicalPlan = { plan.transformAllExpressions { case a @ AttributeReference(name, _, false, _) => - a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier) + a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier, + hasIndeterminism = a.hasIndeterminism) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 2e6261de4040..bf63d2772c87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -468,7 +468,7 @@ object ShuffleExchangeExec { } } val isIndeterministic = newPartitioning match { - case expr: Expression => expr.exprValHasIndeterministicCharacter + case expr: Expression => expr.hasIndeterminism case _ => false } // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds From 7f2d2bccd44836cee207de3e2cc4f304a0f1b804 Mon Sep 17 00:00:00 2001 From: ashahid Date: Fri, 28 Feb 2025 13:48:57 -0800 Subject: [PATCH 04/12] SPARK-51016. Refactored the code so as to pass the boolean hasInDeterminism via curried constructor of AttributeReference, as per feedback. Renamed the boolean --- python/pyspark/pandas/internal.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py index bda05017e135..3f6831b60067 100644 --- a/python/pyspark/pandas/internal.py +++ b/python/pyspark/pandas/internal.py @@ -766,9 +766,8 @@ def __init__( for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) else: - # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - _drop_metadata(index_field.struct_field) == _drop_metadata(struct_field) + index_field.struct_field == struct_field for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) @@ -795,9 +794,8 @@ def __init__( for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) else: - # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - _drop_metadata(data_field.struct_field) == _drop_metadata(struct_field) + data_field.struct_field == struct_field for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) From 9cec75e1012c7d7bd9751b7c0982ea7046363727 Mon Sep 17 00:00:00 2001 From: ashahid Date: Fri, 28 Feb 2025 14:33:45 -0800 Subject: [PATCH 05/12] SPARK-51016. fixed test failures after refactoring --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 9c365d7565a2..4b8762a4719e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -196,7 +196,7 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable)(exprId, qualifier, + AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier, this.hasIndeterminism) } else { UnresolvedAttribute.quoted(name) @@ -367,7 +367,7 @@ case class AttributeReference( } override protected final def otherCopyArgs: Seq[AnyRef] = { - exprId :: qualifier :: Nil + exprId :: qualifier :: Boolean.box(hasIndeterminism) :: Nil } /** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */ From 7878a601408650cef63241c1fa95c5e09e8719e5 Mon Sep 17 00:00:00 2001 From: ashahid Date: Mon, 3 Mar 2025 12:23:28 -0800 Subject: [PATCH 06/12] SPARK-51016. refcatored the tests to use inner joins instead of Left Outer to replicate the bug. --- .../catalyst/expressions/namedExpressions.scala | 1 - .../spark/scheduler/ShuffleMapStageTest.scala | 12 ++++++------ .../ProjectedOrderingAndPartitioningSuite.scala | 14 +++++++------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 4b8762a4719e..f6b8a96dca97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -112,7 +112,6 @@ abstract class Attribute extends LeafExpression with NamedExpression { override lazy val references: AttributeSet = AttributeSet(this) override def hasIndeterminism: Boolean = false - def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute diff --git a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala index da3f619c4a2c..4733ae38d918 100644 --- a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala @@ -28,21 +28,21 @@ class ShuffleMapStageTest extends SharedSparkSession { test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - val outerDf = spark.createDataset( + val leftDfBase = spark.createDataset( Seq((1L, "aa")))( Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft") - val innerDf = spark.createDataset( + val rightDf = spark.createDataset( Seq((1L, "11"), (2L, "22")))( Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright") - val leftOuter = outerDf.select( + val leftDf = leftDfBase.select( col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). cast(LongType)). otherwise(col("pkLeftt")).as("pkLeft")) - val outerjoin = leftOuter.hint("shuffle_hash"). - join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + val join = leftDf.hint("shuffle_hash"). + join(rightDf, col("pkLeft") === col("pkRight"), "inner") val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2) spark.sparkContext.addSparkListener(new SparkListener() { var i = 0 @@ -54,7 +54,7 @@ class ShuffleMapStageTest extends SharedSparkSession { } } }); - outerjoin.collect() + join.collect() assert(shuffleStages.filter(_.isIndeterminate).size == 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index 395c03b81055..cf339559a169 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -216,24 +216,24 @@ class ProjectedOrderingAndPartitioningSuite test("SPARK-51016: ShuffleRDD using indeterministic join keys should be INDETERMINATE") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - val outerDf = spark.createDataset( + val leftDfBase = spark.createDataset( Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") - val innerDf = spark.createDataset( + val rightDf = spark.createDataset( Seq((1L, "11"), (2L, "22"), (3L, "33")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") - val leftOuter = outerDf.select( + val leftDf = leftDfBase.select( col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). cast(LongType)). otherwise(col("pkLeftt")).as("pkLeft")) - val outerjoin = leftOuter.hint("shuffle_hash"). - join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + val join = leftDf.hint("shuffle_hash"). + join(rightDf, col("pkLeft") === col("pkRight"), "inner") - outerjoin.collect() - val finalPlan = outerjoin.queryExecution.executedPlan + join.collect() + val finalPlan = join.queryExecution.executedPlan val shuffleHJExec = finalPlan.children(0).asInstanceOf[ShuffledHashJoinExec] assert(shuffleHJExec.left.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == DeterministicLevel.INDETERMINATE) From 54f18fe1a380ba6409d97f444d9f0fb21453d871 Mon Sep 17 00:00:00 2001 From: ashahid Date: Wed, 19 Mar 2025 11:09:48 -0700 Subject: [PATCH 07/12] SPARK-51016. Implemented review feedback. Thank you @squito --- .../spark/sql/catalyst/expressions/Expression.scala | 11 +++++++++++ .../catalyst/expressions/ExpressionEvalHelper.scala | 2 +- .../catalyst/expressions/NondeterministicSuite.scala | 8 ++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 4922713de408..3a5beb2362af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -115,6 +115,17 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) + /** + * The information conveyed by this method hasIndeterminism differs from that conveyed + * by [[deterministic]] in the way that an [[Attribute]] representing an [[Expression]] having + * [[deterministic]] flag false, would have its [[deterministic]] flag true, but it would still + * have [[hasIndeterminism]] as true. Because the Attribute's evaluation represents a quantity + * which constitutes inDeterminism. Contrasted with [[deterministic]] flag which is always true + * for Leaf Expressions like [[AttributeReference]], [[hasIndeterminism]] carries information + * about the nature of the evaluated value, represented by the [[Expression]] + * @return Boolean true if the expression's evaluated value is a result of some indeterministic + * quantity. + */ def hasIndeterminism: Boolean = _hasIndeterminism @transient diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 2347410d4537..d8da04ef6456 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case _ => expr.mapChildren(replace) } - def prepareEvaluation(expression: Expression): Expression = { + private[expressions] def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance() val resolver = ResolveTimeZone val expr = replace(resolver.resolveTimeZones(expression)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index 2f0aa5fa6875..29f600347ea0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -40,7 +40,13 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { assertIndeterminancyComponent(MonotonicallyIncreasingID()) val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() assertIndeterminancyComponent(alias) + // For the attribute created from an Alias with deterministic flag false, the attribute would + // carry forward that information from Alias, via the hasIndeterminism flag value being true. assertIndeterminancyComponent(alias.toAttribute) + // But the Attribute's deterministic flag would be true ( implying it does not carry forward + // that inDeterministic nature of evaluated quantity which Attribute represents) + assert(prepareEvaluation(alias.toAttribute).deterministic) + assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L))) assertIndeterminancyComponent( HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5)) @@ -65,4 +71,6 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) } + + } From 56e572a7915f64a21e9e1cb0a8e7e710571522fc Mon Sep 17 00:00:00 2001 From: ashahid Date: Thu, 20 Mar 2025 15:32:50 -0700 Subject: [PATCH 08/12] SPARK-51016. Implemented review feedback from @squito and @mridulm --- .../scala/org/apache/spark/Dependency.scala | 22 ++----- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 - .../spark/scheduler/ShuffleMapStage.scala | 2 - .../exchange/ShuffleExchangeExec.scala | 66 ++++++++++++++----- 4 files changed, 53 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 2751d7b3b2e0..573608c4327e 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -79,25 +79,13 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, - val serializer: Serializer, - val keyOrdering: Option[Ordering[K]], - val aggregator: Option[Aggregator[K, V, C]], - val mapSideCombine: Boolean, - val shuffleWriterProcessor: ShuffleWriteProcessor, - val isInDeterministic: Boolean) + val serializer: Serializer = SparkEnv.get.serializer, + val keyOrdering: Option[Ordering[K]] = None, + val aggregator: Option[Aggregator[K, V, C]] = None, + val mapSideCombine: Boolean = false, + val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) extends Dependency[Product2[K, V]] with Logging { - def this ( - rdd: RDD[_ <: Product2[K, V]], - partitioner: Partitioner, - serializer: Serializer = SparkEnv.get.serializer, - keyOrdering: Option[Ordering[K]] = None, - aggregator: Option[Aggregator[K, V, C]] = None, - mapSideCombine: Boolean = false, - shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor - ) = this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine, - shuffleWriterProcessor, false) - if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2ccc0f6476af..80db818b77e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -2103,9 +2103,6 @@ abstract class RDD[T: ClassTag]( @DeveloperApi protected def getOutputDeterministicLevel: DeterministicLevel.Value = { val deterministicLevelCandidates = dependencies.map { - case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic => - DeterministicLevel.INDETERMINATE - // The shuffle is not really happening, treat it like narrow dependency and assume the output // deterministic level of current RDD is same as parent. case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index 38da37751206..db09d19d0acf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -94,6 +94,4 @@ private[spark] class ShuffleMapStage( .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } - - override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic || super.isIndeterminate } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index bf63d2772c87..e5a3e935c691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.execution.exchange import java.util.concurrent.atomic.AtomicReference import java.util.function.Supplier - import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future, Promise} - import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.rdd.{RDD, RDDOperationScope} +import org.apache.spark.rdd.{DeterministicLevel, MapPartitionsRDD, RDD, RDDOperationScope} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -43,6 +41,8 @@ import org.apache.spark.util.{MutablePair, ThreadUtils} import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} import org.apache.spark.util.random.XORShiftRandom +import scala.reflect.ClassTag + /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. */ @@ -452,25 +452,37 @@ object ShuffleExchangeExec { rdd } + val isIndeterministic = newPartitioning match { + case expr: Expression => expr.hasIndeterminism + case _ => false + } + // round-robin function is order sensitive if we don't sort the input. val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition if (needToCopyObjectsBeforeShuffle(part)) { - newRdd.mapPartitionsWithIndexInternal((_, iter) => { - val getPartitionKey = getPartitionKeyExtractor() - iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } - }, isOrderSensitive = isOrderSensitive) + this.createRddWithPartition( + newRdd, + (_, iter: Iterator[InternalRow]) => { + val getPartitionKey = getPartitionKeyExtractor() + iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } + }, + isIndeterministic, + isOrderSensitive + ) + } else { - newRdd.mapPartitionsWithIndexInternal((_, iter) => { - val getPartitionKey = getPartitionKeyExtractor() - val mutablePair = new MutablePair[Int, InternalRow]() - iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } - }, isOrderSensitive = isOrderSensitive) + this.createRddWithPartition( + newRdd, + (_, iter: Iterator[InternalRow]) => { + val getPartitionKey = getPartitionKeyExtractor() + val mutablePair = new MutablePair[Int, InternalRow]() + iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } + }, + isIndeterministic, + isOrderSensitive) } } - val isIndeterministic = newPartitioning match { - case expr: Expression => expr.hasIndeterminism - case _ => false - } + // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough. @@ -482,12 +494,30 @@ object ShuffleExchangeExec { None, None, false, - shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics), - isIndeterministic) + shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + private def createRddWithPartition[T: ClassTag, U: ClassTag]( + rdd: RDD[T], + f: (Int, Iterator[T]) => Iterator[U], + isInDeterminate: Boolean, + isOrderSensitive: Boolean): RDD[U] = if (isInDeterminate) { + RDDOperationScope.withScope(rdd.sparkContext){ + new MapPartitionsRDD( + rdd, + (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), + isOrderSensitive = isOrderSensitive) { + override protected def getOutputDeterministicLevel = + DeterministicLevel.INDETERMINATE + } + } + } else { + rdd.mapPartitionsWithIndexInternal(f, isOrderSensitive = isOrderSensitive) + } + + /** * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. From b9bd30bf7b01bfef4dd32cc7099e0c1c960b093f Mon Sep 17 00:00:00 2001 From: ashahid Date: Thu, 20 Mar 2025 15:37:02 -0700 Subject: [PATCH 09/12] SPARK-51016. Implemented review feedback from @squito and @mridulm --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index e5a3e935c691..4915e9a703b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.exchange import java.util.concurrent.atomic.AtomicReference import java.util.function.Supplier + import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.reflect.ClassTag + import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.rdd.{DeterministicLevel, MapPartitionsRDD, RDD, RDDOperationScope} @@ -41,7 +44,6 @@ import org.apache.spark.util.{MutablePair, ThreadUtils} import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} import org.apache.spark.util.random.XORShiftRandom -import scala.reflect.ClassTag /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -504,7 +506,7 @@ object ShuffleExchangeExec { f: (Int, Iterator[T]) => Iterator[U], isInDeterminate: Boolean, isOrderSensitive: Boolean): RDD[U] = if (isInDeterminate) { - RDDOperationScope.withScope(rdd.sparkContext){ + RDDOperationScope.withScope(rdd.sparkContext) { new MapPartitionsRDD( rdd, (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), From f7a6721de4933fc4c5428782f6a0c5b7fbf69865 Mon Sep 17 00:00:00 2001 From: ashahid Date: Thu, 20 Mar 2025 15:39:14 -0700 Subject: [PATCH 10/12] SPARK-51016. Implemented review feedback from @squito and @mridulm --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 4915e9a703b3..2b68b2feb07e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -493,9 +493,6 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - None, - None, - false, shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency From b528421db930fa03400ddb04cd7704fccc14e060 Mon Sep 17 00:00:00 2001 From: ashahid Date: Tue, 15 Apr 2025 11:26:51 -0700 Subject: [PATCH 11/12] SPARK-51016. added more documentation --- .../sql/catalyst/expressions/Expression.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 3a5beb2362af..abaea1bdc32c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -123,6 +123,21 @@ abstract class Expression extends TreeNode[Expression] { * which constitutes inDeterminism. Contrasted with [[deterministic]] flag which is always true * for Leaf Expressions like [[AttributeReference]], [[hasIndeterminism]] carries information * about the nature of the evaluated value, represented by the [[Expression]] + * + * + * To further elaborate, consider following Alias expression: + * + * // The boolean hasInDeterminism carries the information + * // that the value AttributeReference represents, has an inDeterministic + * // Component to it. + * val aliasExpr = new Alias (random() * Literal(3), "X") + * assert aliasExpr.deterministic == false + * assert aliasExpr.hasInDeterminism == true + * + * val attribRef = aliasExpr.toAttribute + * assert attribRef.deterministic == true + * assert attribRef.hasInDeterminism == true + * * @return Boolean true if the expression's evaluated value is a result of some indeterministic * quantity. */ From fade9abf40ca54b809d8291aeb789b713b9ea833 Mon Sep 17 00:00:00 2001 From: Asif Hussain Shahid Date: Fri, 2 May 2025 11:23:52 -0700 Subject: [PATCH 12/12] SPARK-51016. fixed the hasInDeterminism flag for Union node. Thanks @ptoth forpointing it out. --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d1c6ec08151f..2c637d4cbda4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -477,12 +477,13 @@ object Union { childOutputs.transpose.map { attrs => val firstAttr = attrs.head val nullable = attrs.exists(_.nullable) + val hasIndeterminism = attrs.exists(_.hasIndeterminism) val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge) - if (firstAttr.dataType == newDt) { + if (firstAttr.dataType == newDt && !hasIndeterminism) { firstAttr.withNullability(nullable) } else { AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)( - firstAttr.exprId, firstAttr.qualifier) + firstAttr.exprId, firstAttr.qualifier, hasIndeterminism) } } }