diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index cfdad6e9a51f..e36f300b9c13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -119,6 +119,38 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) + /** + * The information conveyed by this method hasIndeterminism differs from that conveyed + * by [[deterministic]] in the way that an [[Attribute]] representing an [[Expression]] having + * [[deterministic]] flag false, would have its [[deterministic]] flag true, but it would still + * have [[hasIndeterminism]] as true. Because the Attribute's evaluation represents a quantity + * which constitutes inDeterminism. Contrasted with [[deterministic]] flag which is always true + * for Leaf Expressions like [[AttributeReference]], [[hasIndeterminism]] carries information + * about the nature of the evaluated value, represented by the [[Expression]] + * + * + * To further elaborate, consider following Alias expression: + * + * // The boolean hasInDeterminism carries the information + * // that the value AttributeReference represents, has an inDeterministic + * // Component to it. + * val aliasExpr = new Alias (random() * Literal(3), "X") + * assert aliasExpr.deterministic == false + * assert aliasExpr.hasInDeterminism == true + * + * val attribRef = aliasExpr.toAttribute + * assert attribRef.deterministic == true + * assert attribRef.hasInDeterminism == true + * + * @return Boolean true if the expression's evaluated value is a result of some indeterministic + * quantity. + */ + def hasIndeterminism: Boolean = _hasIndeterminism + + @transient + private lazy val _hasIndeterminism: Boolean = !deterministic || + this.references.exists(_.hasIndeterminism) + def nullable: Boolean /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala index bb67c173b946..ea092093d095 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala @@ -38,7 +38,7 @@ case class ProjectionOverSchema(schema: StructType, output: AttributeSet) { private def getProjection(expr: Expression): Option[Expression] = expr match { case a: AttributeReference if fieldNames.contains(a.name) && output.contains(a) => - Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier)) + Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier, a.hasIndeterminism)) case GetArrayItem(child, arrayItemOrdinal, failOnError) => getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal, failOnError) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 0bfba04d6372..b96cdcee42ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -111,6 +111,7 @@ abstract class Attribute extends LeafExpression with NamedExpression { @transient override lazy val references: AttributeSet = AttributeSet(this) + override def hasIndeterminism: Boolean = false def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute @@ -202,7 +203,8 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) + AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier, + this.hasIndeterminism) } else { UnresolvedAttribute.quoted(name) } @@ -282,7 +284,8 @@ case class AttributeReference( nullable: Boolean = true, override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, - val qualifier: Seq[String] = Seq.empty[String]) + val qualifier: Seq[String] = Seq.empty[String], + override val hasIndeterminism: Boolean = false) extends Attribute with Unevaluable { override lazy val treePatternBits: BitSet = AttributeReferenceTreeBits.bits @@ -320,7 +323,8 @@ case class AttributeReference( } override def newInstance(): AttributeReference = - AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier) + AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier, + hasIndeterminism = hasIndeterminism) /** * Returns a copy of this [[AttributeReference]] with changed nullability. @@ -329,7 +333,8 @@ case class AttributeReference( if (nullable == newNullability) { this } else { - AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier) + AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier, + hasIndeterminism) } } @@ -337,7 +342,7 @@ case class AttributeReference( if (name == newName) { this } else { - AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier) + AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier, hasIndeterminism) } } @@ -348,7 +353,7 @@ case class AttributeReference( if (newQualifier == qualifier) { this } else { - AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier) + AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier, hasIndeterminism) } } @@ -356,12 +361,12 @@ case class AttributeReference( if (exprId == newExprId) { this } else { - AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier) + AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier, hasIndeterminism) } } override def withMetadata(newMetadata: Metadata): AttributeReference = { - AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier) + AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, hasIndeterminism) } override def withDataType(newType: DataType): AttributeReference = { @@ -369,7 +374,7 @@ case class AttributeReference( } override protected final def otherCopyArgs: Seq[AnyRef] = { - exprId :: qualifier :: Nil + exprId :: qualifier :: Boolean.box(hasIndeterminism) :: Nil } /** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 03d1a9fca3cc..9995907fb23f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -477,12 +477,13 @@ object Union { childOutputs.transpose.map { attrs => val firstAttr = attrs.head val nullable = attrs.exists(_.nullable) + val hasIndeterminism = attrs.exists(_.hasIndeterminism) val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge) - if (firstAttr.dataType == newDt) { + if (firstAttr.dataType == newDt && !hasIndeterminism) { firstAttr.withNullability(nullable) } else { AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)( - firstAttr.exprId, firstAttr.qualifier) + firstAttr.exprId, firstAttr.qualifier, hasIndeterminism) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 184f5a2a9485..d8da04ef6456 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case _ => expr.mapChildren(replace) } - private def prepareEvaluation(expression: Expression): Expression = { + private[expressions] def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance() val resolver = ResolveTimeZone val expr = replace(resolver.resolveTimeZones(expression)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index bf1c930c0bd0..29f600347ea0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning} class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("MonotonicallyIncreasingID") { @@ -31,4 +32,45 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("InputFileName") { checkEvaluation(InputFileName(), "") } + + test("SPARK-51016: has Indeterministic Component") { + def assertIndeterminancyComponent(expression: Expression): Unit = + assert(prepareEvaluation(expression).hasIndeterminism) + + assertIndeterminancyComponent(MonotonicallyIncreasingID()) + val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() + assertIndeterminancyComponent(alias) + // For the attribute created from an Alias with deterministic flag false, the attribute would + // carry forward that information from Alias, via the hasIndeterminism flag value being true. + assertIndeterminancyComponent(alias.toAttribute) + // But the Attribute's deterministic flag would be true ( implying it does not carry forward + // that inDeterministic nature of evaluated quantity which Attribute represents) + assert(prepareEvaluation(alias.toAttribute).deterministic) + + assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L))) + assertIndeterminancyComponent( + HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5)) + assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } + + test("SPARK-51016: has Deterministic Component") { + def assertNoIndeterminancyComponent(expression: Expression): Unit = + assert(!prepareEvaluation(expression).hasIndeterminism) + + assertNoIndeterminancyComponent(Literal(1000L)) + val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")() + assertNoIndeterminancyComponent(alias) + assertNoIndeterminancyComponent(alias.toAttribute) + assertNoIndeterminancyComponent( + HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5)) + assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertNoIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } + + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index f06e6ed137cc..062a27075185 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -69,7 +69,8 @@ trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite => protected def rewriteNameFromAttrNullability(plan: LogicalPlan): LogicalPlan = { plan.transformAllExpressions { case a @ AttributeReference(name, _, false, _) => - a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier) + a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier, + hasIndeterminism = a.hasIndeterminism) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 31a3f53eb719..2b68b2feb07e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -22,15 +22,16 @@ import java.util.function.Supplier import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.rdd.{RDD, RDDOperationScope} +import org.apache.spark.rdd.{DeterministicLevel, MapPartitionsRDD, RDD, RDDOperationScope} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -43,6 +44,7 @@ import org.apache.spark.util.{MutablePair, ThreadUtils} import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} import org.apache.spark.util.random.XORShiftRandom + /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. */ @@ -452,19 +454,34 @@ object ShuffleExchangeExec { rdd } + val isIndeterministic = newPartitioning match { + case expr: Expression => expr.hasIndeterminism + case _ => false + } + // round-robin function is order sensitive if we don't sort the input. val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition if (needToCopyObjectsBeforeShuffle(part)) { - newRdd.mapPartitionsWithIndexInternal((_, iter) => { - val getPartitionKey = getPartitionKeyExtractor() - iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } - }, isOrderSensitive = isOrderSensitive) + this.createRddWithPartition( + newRdd, + (_, iter: Iterator[InternalRow]) => { + val getPartitionKey = getPartitionKeyExtractor() + iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } + }, + isIndeterministic, + isOrderSensitive + ) + } else { - newRdd.mapPartitionsWithIndexInternal((_, iter) => { - val getPartitionKey = getPartitionKeyExtractor() - val mutablePair = new MutablePair[Int, InternalRow]() - iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } - }, isOrderSensitive = isOrderSensitive) + this.createRddWithPartition( + newRdd, + (_, iter: Iterator[InternalRow]) => { + val getPartitionKey = getPartitionKeyExtractor() + val mutablePair = new MutablePair[Int, InternalRow]() + iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } + }, + isIndeterministic, + isOrderSensitive) } } @@ -481,6 +498,25 @@ object ShuffleExchangeExec { dependency } + private def createRddWithPartition[T: ClassTag, U: ClassTag]( + rdd: RDD[T], + f: (Int, Iterator[T]) => Iterator[U], + isInDeterminate: Boolean, + isOrderSensitive: Boolean): RDD[U] = if (isInDeterminate) { + RDDOperationScope.withScope(rdd.sparkContext) { + new MapPartitionsRDD( + rdd, + (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter), + isOrderSensitive = isOrderSensitive) { + override protected def getOutputDeterministicLevel = + DeterministicLevel.INDETERMINATE + } + } + } else { + rdd.mapPartitionsWithIndexInternal(f, isOrderSensitive = isOrderSensitive) + } + + /** * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. diff --git a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala new file mode 100644 index 000000000000..4733ae38d918 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType + +class ShuffleMapStageTest extends SharedSparkSession { + + test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val leftDfBase = spark.createDataset( + Seq((1L, "aa")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val rightDf = spark.createDataset( + Seq((1L, "11"), (2L, "22")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright") + + val leftDf = leftDfBase.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val join = leftDf.hint("shuffle_hash"). + join(rightDf, col("pkLeft") === col("pkRight"), "inner") + val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2) + spark.sparkContext.addSparkListener(new SparkListener() { + var i = 0 + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + if (stageSubmitted.stageInfo.shuffleDepId.isDefined) { + shuffleStages(i) = + spark.sparkContext.dagScheduler.shuffleIdToMapStage(stageSubmitted.stageInfo.stageId) + i +=1 + } + } + }); + join.collect() + assert(shuffleStages.filter(_.isIndeterminate).size == 1) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f8..cf339559a169 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.execution -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{DeterministicLevel, RDD} +import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.functions.{col, floor, isnull, rand, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{LongType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +213,37 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-51016: ShuffleRDD using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val leftDfBase = spark.createDataset( + Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val rightDf = spark.createDataset( + Seq((1L, "11"), (2L, "22"), (3L, "33")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") + + val leftDf = leftDfBase.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val join = leftDf.hint("shuffle_hash"). + join(rightDf, col("pkLeft") === col("pkRight"), "inner") + + join.collect() + val finalPlan = join.queryExecution.executedPlan + val shuffleHJExec = finalPlan.children(0).asInstanceOf[ShuffledHashJoinExec] + assert(shuffleHJExec.left.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.INDETERMINATE) + + assert(shuffleHJExec.right.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.UNORDERED) + + assert(shuffleHJExec.execute().outputDeterministicLevel == DeterministicLevel.INDETERMINATE) + } + } } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode {