From 6e2509f060c1acb6706e575b66fb953dba9a424c Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 19 Feb 2015 16:43:26 -0800 Subject: [PATCH] [SPARK-4808] Removing minimum number of elements read before spill check We found that if we only start checking for spilling after reading 1000 elements in a Spillable, we would run out of memory if there are fewer than 1000 elements but each of those elements are very large. There is no real need to only check for spilling after reading 1000 things. It is still necessary, however, to mitigate the cost of entering a synchronized block. It turns out in practice however checking for spilling only on every 32 items is sufficient, without needing the minimum elements threshold. --- .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9f54312074856..747ecf075a397 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging { // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager - // Threshold for `elementsRead` before we start tracking this collection's memory usage - private[this] val trackMemoryThreshold = 1000 - // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing private[this] val initialMemoryThreshold: Long = @@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { - if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && - currentMemory >= myMemoryThreshold) { + if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)