Skip to content

Commit f2e552c

Browse files
author
Andrew Or
committed
Fix tests
1 parent 7012595 commit f2e552c

File tree

4 files changed

+21
-11
lines changed

4 files changed

+21
-11
lines changed

core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.{Logging, SparkException, SparkConf}
22+
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2323

2424
/**
2525
* Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
@@ -124,5 +124,5 @@ private[spark] object ShuffleMemoryManager {
124124
}
125125

126126
// Initial threshold for the size of a collection before we start tracking its memory usage
127-
val INITIAL_MEMORY_TRACKING_THRESHOLD: Long = 5 * 1024 * 1024
127+
val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024
128128
}

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,14 @@ class ExternalAppendOnlyMap[K, V, C](
8282
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
8383
private val trackMemoryThreshold = 1000
8484

85+
// Initial threshold for the size of a collection before we start tracking its memory usage
86+
private val initialMemoryThreshold =
87+
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
88+
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
89+
8590
// Threshold for the collection's size in bytes before we start tracking its memory usage
8691
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
87-
private var myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD
92+
private var myMemoryThreshold = initialMemoryThreshold
8893

8994
/**
9095
* Size of object batches when reading/writing from serializers.
@@ -238,11 +243,10 @@ class ExternalAppendOnlyMap[K, V, C](
238243

239244
// Release our memory back to the shuffle pool so that other threads can grab it
240245
// The amount we requested does not include the initial memory tracking threshold
241-
shuffleMemoryManager.release(
242-
myMemoryThreshold - ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD)
246+
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
243247

244248
// Reset this to the initial threshold to avoid spilling many small files
245-
myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD
249+
myMemoryThreshold = initialMemoryThreshold
246250

247251
elementsRead = 0
248252
_memoryBytesSpilled += mapSize

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
135135
// Write metrics for current spill
136136
private var curWriteMetrics: ShuffleWriteMetrics = _
137137

138+
// Initial threshold for the size of a collection before we start tracking its memory usage
139+
private val initialMemoryThreshold =
140+
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
141+
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
142+
138143
// Threshold for the collection's size in bytes before we start tracking its memory usage
139144
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
140-
private var myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD
145+
private var myMemoryThreshold = initialMemoryThreshold
141146

142147
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
143148
// local aggregation and sorting, write numPartitions files directly and just concatenate them
@@ -287,11 +292,10 @@ private[spark] class ExternalSorter[K, V, C](
287292

288293
// Release our memory back to the shuffle pool so that other threads can grab it
289294
// The amount we requested does not include the initial memory tracking threshold
290-
shuffleMemoryManager.release(
291-
myMemoryThreshold - ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD)
295+
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
292296

293297
// Reset this to the initial threshold to avoid spilling many small files
294-
myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD
298+
myMemoryThreshold = initialMemoryThreshold
295299

296300
_memoryBytesSpilled += memorySize
297301
elementsRead = 0

core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
127127
test("empty partitions with spilling") {
128128
val conf = createSparkConf(false)
129129
conf.set("spark.shuffle.memoryFraction", "0.001")
130+
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
130131
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
131132
sc = new SparkContext("local", "test", conf)
132133

@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
152153
test("empty partitions with spilling, bypass merge-sort") {
153154
val conf = createSparkConf(false)
154155
conf.set("spark.shuffle.memoryFraction", "0.001")
156+
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
155157
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
156158
sc = new SparkContext("local", "test", conf)
157159

@@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
761763
}
762764

763765
sorter2.stop()
764-
}
766+
}
765767
}

0 commit comments

Comments
 (0)