Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ abstract class Expression extends TreeNode[Expression] {
*/
lazy val deterministic: Boolean = children.forall(_.deterministic)

/**
* The information conveyed by this method hasIndeterminism differs from that conveyed
* by [[deterministic]] in the way that an [[Attribute]] representing an [[Expression]] having
* [[deterministic]] flag false, would have its [[deterministic]] flag true, but it would still
* have [[hasIndeterminism]] as true. Because the Attribute's evaluation represents a quantity
* which constitutes inDeterminism. Contrasted with [[deterministic]] flag which is always true
* for Leaf Expressions like [[AttributeReference]], [[hasIndeterminism]] carries information
* about the nature of the evaluated value, represented by the [[Expression]]
*
*
* To further elaborate, consider following Alias expression:
*
* // The boolean hasInDeterminism carries the information
* // that the value AttributeReference represents, has an inDeterministic
* // Component to it.
* val aliasExpr = new Alias (random() * Literal(3), "X")
* assert aliasExpr.deterministic == false
* assert aliasExpr.hasInDeterminism == true
*
* val attribRef = aliasExpr.toAttribute
* assert attribRef.deterministic == true
* assert attribRef.hasInDeterminism == true
*
* @return Boolean true if the expression's evaluated value is a result of some indeterministic
* quantity.
*/
def hasIndeterminism: Boolean = _hasIndeterminism

@transient
private lazy val _hasIndeterminism: Boolean = !deterministic ||
this.references.exists(_.hasIndeterminism)

def nullable: Boolean

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ProjectionOverSchema(schema: StructType, output: AttributeSet) {
private def getProjection(expr: Expression): Option[Expression] =
expr match {
case a: AttributeReference if fieldNames.contains(a.name) && output.contains(a) =>
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier))
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier, a.hasIndeterminism))
case GetArrayItem(child, arrayItemOrdinal, failOnError) =>
getProjection(child).map {
projection => GetArrayItem(projection, arrayItemOrdinal, failOnError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ abstract class Attribute extends LeafExpression with NamedExpression {
@transient
override lazy val references: AttributeSet = AttributeSet(this)

override def hasIndeterminism: Boolean = false
def withNullability(newNullability: Boolean): Attribute
def withQualifier(newQualifier: Seq[String]): Attribute
def withName(newName: String): Attribute
Expand Down Expand Up @@ -202,7 +203,8 @@ case class Alias(child: Expression, name: String)(

override def toAttribute: Attribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier,
this.hasIndeterminism)
} else {
UnresolvedAttribute.quoted(name)
}
Expand Down Expand Up @@ -282,7 +284,8 @@ case class AttributeReference(
nullable: Boolean = true,
override val metadata: Metadata = Metadata.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifier: Seq[String] = Seq.empty[String])
val qualifier: Seq[String] = Seq.empty[String],
override val hasIndeterminism: Boolean = false)
extends Attribute with Unevaluable {

override lazy val treePatternBits: BitSet = AttributeReferenceTreeBits.bits
Expand Down Expand Up @@ -320,7 +323,8 @@ case class AttributeReference(
}

override def newInstance(): AttributeReference =
AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier)
AttributeReference(name, dataType, nullable, metadata)(qualifier = qualifier,
hasIndeterminism = hasIndeterminism)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -329,15 +333,16 @@ case class AttributeReference(
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier,
hasIndeterminism)
}
}

override def withName(newName: String): AttributeReference = {
if (name == newName) {
this
} else {
AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier)
AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier, hasIndeterminism)
}
}

Expand All @@ -348,28 +353,28 @@ case class AttributeReference(
if (newQualifier == qualifier) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier, hasIndeterminism)
}
}

override def withExprId(newExprId: ExprId): AttributeReference = {
if (exprId == newExprId) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier)
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier, hasIndeterminism)
}
}

override def withMetadata(newMetadata: Metadata): AttributeReference = {
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier)
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, hasIndeterminism)
}

override def withDataType(newType: DataType): AttributeReference = {
AttributeReference(name, newType, nullable, metadata)(exprId, qualifier)
}

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: Nil
exprId :: qualifier :: Boolean.box(hasIndeterminism) :: Nil
}

/** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,13 @@ object Union {
childOutputs.transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val hasIndeterminism = attrs.exists(_.hasIndeterminism)
val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
if (firstAttr.dataType == newDt && !hasIndeterminism) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
firstAttr.exprId, firstAttr.qualifier, hasIndeterminism)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
case _ => expr.mapChildren(replace)
}

private def prepareEvaluation(expression: Expression): Expression = {
private[expressions] def prepareEvaluation(expression: Expression): Expression = {
val serializer = new JavaSerializer(new SparkConf()).newInstance()
val resolver = ResolveTimeZone
val expr = replace(resolver.resolveTimeZones(expression))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning}

class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("MonotonicallyIncreasingID") {
Expand All @@ -31,4 +32,45 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("InputFileName") {
checkEvaluation(InputFileName(), "")
}

test("SPARK-51016: has Indeterministic Component") {
def assertIndeterminancyComponent(expression: Expression): Unit =
assert(prepareEvaluation(expression).hasIndeterminism)

assertIndeterminancyComponent(MonotonicallyIncreasingID())
val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")()
assertIndeterminancyComponent(alias)
// For the attribute created from an Alias with deterministic flag false, the attribute would
// carry forward that information from Alias, via the hasIndeterminism flag value being true.
assertIndeterminancyComponent(alias.toAttribute)
// But the Attribute's deterministic flag would be true ( implying it does not carry forward
// that inDeterministic nature of evaluated quantity which Attribute represents)
assert(prepareEvaluation(alias.toAttribute).deterministic)

assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L)))
assertIndeterminancyComponent(
HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5))
assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5))
assertIndeterminancyComponent(
RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5))
assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5))
}

test("SPARK-51016: has Deterministic Component") {
def assertNoIndeterminancyComponent(expression: Expression): Unit =
assert(!prepareEvaluation(expression).hasIndeterminism)

assertNoIndeterminancyComponent(Literal(1000L))
val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")()
assertNoIndeterminancyComponent(alias)
assertNoIndeterminancyComponent(alias.toAttribute)
assertNoIndeterminancyComponent(
HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5))
assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5))
assertNoIndeterminancyComponent(
RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5))
assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5))
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite =>
protected def rewriteNameFromAttrNullability(plan: LogicalPlan): LogicalPlan = {
plan.transformAllExpressions {
case a @ AttributeReference(name, _, false, _) =>
a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier)
a.copy(name = s"*$name")(exprId = a.exprId, qualifier = a.qualifier,
hasIndeterminism = a.hasIndeterminism)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import java.util.function.Supplier

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.rdd.{DeterministicLevel, MapPartitionsRDD, RDD, RDDOperationScope}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand All @@ -43,6 +44,7 @@ import org.apache.spark.util.{MutablePair, ThreadUtils}
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
import org.apache.spark.util.random.XORShiftRandom


/**
* Common trait for all shuffle exchange implementations to facilitate pattern matching.
*/
Expand Down Expand Up @@ -452,19 +454,34 @@ object ShuffleExchangeExec {
rdd
}

val isIndeterministic = newPartitioning match {
case expr: Expression => expr.hasIndeterminism
case _ => false
}

// round-robin function is order sensitive if we don't sort the input.
val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
if (needToCopyObjectsBeforeShuffle(part)) {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
}, isOrderSensitive = isOrderSensitive)
this.createRddWithPartition(
newRdd,
(_, iter: Iterator[InternalRow]) => {
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
},
isIndeterministic,
isOrderSensitive
)

} else {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
}, isOrderSensitive = isOrderSensitive)
this.createRddWithPartition(
newRdd,
(_, iter: Iterator[InternalRow]) => {
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
},
isIndeterministic,
isOrderSensitive)
}
}

Expand All @@ -481,6 +498,25 @@ object ShuffleExchangeExec {
dependency
}

private def createRddWithPartition[T: ClassTag, U: ClassTag](
rdd: RDD[T],
f: (Int, Iterator[T]) => Iterator[U],
isInDeterminate: Boolean,
isOrderSensitive: Boolean): RDD[U] = if (isInDeterminate) {
RDDOperationScope.withScope(rdd.sparkContext) {
new MapPartitionsRDD(
rdd,
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
isOrderSensitive = isOrderSensitive) {
override protected def getOutputDeterministicLevel =
DeterministicLevel.INDETERMINATE
}
}
} else {
rdd.mapPartitionsWithIndexInternal(f, isOrderSensitive = isOrderSensitive)
}


/**
* Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter
* with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]].
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.LongType

class ShuffleMapStageTest extends SharedSparkSession {

test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val leftDfBase = spark.createDataset(
Seq((1L, "aa")))(
Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft")

val rightDf = spark.createDataset(
Seq((1L, "11"), (2L, "22")))(
Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright")

val leftDf = leftDfBase.select(
col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)).
cast(LongType)).
otherwise(col("pkLeftt")).as("pkLeft"))

val join = leftDf.hint("shuffle_hash").
join(rightDf, col("pkLeft") === col("pkRight"), "inner")
val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2)
spark.sparkContext.addSparkListener(new SparkListener() {
var i = 0
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
if (stageSubmitted.stageInfo.shuffleDepId.isDefined) {
shuffleStages(i) =
spark.sparkContext.dagScheduler.shuffleIdToMapStage(stageSubmitted.stageInfo.stageId)
i +=1
}
}
});
join.collect()
assert(shuffleStages.filter(_.isIndeterminate).size == 1)
}
}
}
Loading