Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.io.Codec
import scala.language.implicitConversions
import scala.ref.WeakReference
import scala.reflect.{classTag, ClassTag}
import scala.util.hashing

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
Expand All @@ -50,7 +49,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
Utils => collectionUtils}
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
SamplingUtils}
SamplingUtils, XORShiftRandom}

/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
Expand Down Expand Up @@ -505,7 +504,7 @@ abstract class RDD[T: ClassTag](
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
var position = new XORShiftRandom(index).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.sql.execution.exchange

import java.util.Random
import java.util.function.Supplier

import scala.concurrent.Future
import scala.util.hashing

import org.apache.spark._
import org.apache.spark.internal.config
Expand All @@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
import org.apache.spark.util.random.XORShiftRandom

/**
* Common trait for all shuffle exchange implementations to facilitate pattern matching.
Expand Down Expand Up @@ -307,7 +306,7 @@ object ShuffleExchangeExec {
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions)
var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down
14 changes: 13 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.apache.hadoop.fs.{Path, PathFilter}
import org.scalatest.Assertions._
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.TableDrivenPropertyChecks._
Expand Down Expand Up @@ -2169,7 +2170,18 @@ class DatasetSuite extends QueryTest
test("SPARK-40407: repartition should not result in severe data skew") {
val df = spark.range(0, 100, 1, 50).repartition(4)
val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect()
assert(result.sorted.toSeq === Seq(19, 25, 25, 31))
assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
}

test("SPARK-40660: Switch to XORShiftRandom to distribute elements") {
withTempDir { dir =>
spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
val fs = new Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf())
val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new PathFilter {
override def accept(path: Path): Boolean = path.getName.endsWith("parquet")
})
assert(parquetFiles.size === 10)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
// partition size [0,258,72,72,72]
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
// partition size [144,72,144,216,144]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6)
// partition size [144,72,144,72,72,144,72]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7)
}

// no skewed partition should be optimized
Expand Down