@@ -24,9 +24,8 @@ import org.apache.spark.SparkEnv
2424 * Spills contents of an in-memory collection to disk when the memory threshold
2525 * has been exceeded.
2626 */
27- private [spark] trait Spillable [C ] {
28-
29- this : Logging =>
27+ private [spark] trait Spillable [C ] extends Logging {
28+ import Spillable ._
3029
3130 /**
3231 * Spills the current in-memory collection to disk, and releases the memory.
@@ -41,11 +40,12 @@ private[spark] trait Spillable[C] {
4140 // Memory manager that can be used to acquire/release memory
4241 private [this ] val shuffleMemoryManager = SparkEnv .get.shuffleMemoryManager
4342
44- // What threshold of elementsRead we start estimating collection size at
43+ // Threshold for ` elementsRead` before we start tracking this collection's memory usage
4544 private [this ] val trackMemoryThreshold = 1000
4645
47- // How much of the shared memory pool this collection has claimed
48- private [this ] var myMemoryThreshold = 0L
46+ // Threshold for this collection's size in bytes before we start tracking its memory usage
47+ // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
48+ private [this ] var myMemoryThreshold = INITIAL_MEMORY_TRACKING_THRESHOLD
4949
5050 // Number of bytes spilled in total
5151 private [this ] var _memoryBytesSpilled = 0L
@@ -94,8 +94,9 @@ private[spark] trait Spillable[C] {
9494 * Release our memory back to the shuffle pool so that other threads can grab it.
9595 */
9696 private def releaseMemoryForThisThread (): Unit = {
97- shuffleMemoryManager.release(myMemoryThreshold)
98- myMemoryThreshold = 0L
97+ // The amount we requested does not include the initial memory tracking threshold
98+ shuffleMemoryManager.release(myMemoryThreshold - INITIAL_MEMORY_TRACKING_THRESHOLD )
99+ myMemoryThreshold = INITIAL_MEMORY_TRACKING_THRESHOLD
99100 }
100101
101102 /**
@@ -106,7 +107,12 @@ private[spark] trait Spillable[C] {
106107 @ inline private def logSpillage (size : Long ) {
107108 val threadId = Thread .currentThread().getId
108109 logInfo(" Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
109- .format(threadId, org.apache.spark.util.Utils .bytesToString(size),
110- _spillCount, if (_spillCount > 1 ) " s" else " " ))
110+ .format(threadId, org.apache.spark.util.Utils .bytesToString(size),
111+ _spillCount, if (_spillCount > 1 ) " s" else " " ))
111112 }
112113}
114+
115+ private object Spillable {
116+ // Initial threshold for the size of a collection before we start tracking its memory usage
117+ val INITIAL_MEMORY_TRACKING_THRESHOLD : Long = 5 * 1024 * 1024
118+ }
0 commit comments