Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit ac3ae0f

Browse files
JoshRosenrxin
authored andcommitted
[SPARK-9266] Prevent "managed memory leak detected" exception from masking original exception
When a task fails with an exception and also fails to properly clean up its managed memory, the `spark.unsafe.exceptionOnMemoryLeak` memory leak detection mechanism's exceptions will mask the original exception that caused the task to fail. We should throw the memory leak exception only if no other exception occurred. Author: Josh Rosen <[email protected]> Closes apache#7603 from JoshRosen/SPARK-9266 and squashes the following commits: c268cb5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-9266 c1f0167 [Josh Rosen] Fix the error masking problem 448eae8 [Josh Rosen] Add regression test
1 parent b983d49 commit ac3ae0f

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,19 @@ private[spark] class Executor(
209209

210210
// Run the actual task and measure its runtime.
211211
taskStart = System.currentTimeMillis()
212+
var threwException = true
212213
val (value, accumUpdates) = try {
213-
task.run(
214+
val res = task.run(
214215
taskAttemptId = taskId,
215216
attemptNumber = attemptNumber,
216217
metricsSystem = env.metricsSystem)
218+
threwException = false
219+
res
217220
} finally {
218221
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
219222
if (freedMemory > 0) {
220223
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
221-
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
224+
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
222225
throw new SparkException(errMsg)
223226
} else {
224227
logError(errMsg)

core/src/test/scala/org/apache/spark/FailureSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
141141
FailureSuiteState.clear()
142142
}
143143

144+
test("managed memory leak error should not mask other failures (SPARK-9266") {
145+
val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true")
146+
sc = new SparkContext("local[1,1]", "test", conf)
147+
148+
// If a task leaks memory but fails due to some other cause, then make sure that the original
149+
// cause is preserved
150+
val thrownDueToTaskFailure = intercept[SparkException] {
151+
sc.parallelize(Seq(0)).mapPartitions { iter =>
152+
TaskContext.get().taskMemoryManager().allocate(128)
153+
throw new Exception("intentional task failure")
154+
iter
155+
}.count()
156+
}
157+
assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure"))
158+
159+
// If the task succeeded but memory was leaked, then the task should fail due to that leak
160+
val thrownDueToMemoryLeak = intercept[SparkException] {
161+
sc.parallelize(Seq(0)).mapPartitions { iter =>
162+
TaskContext.get().taskMemoryManager().allocate(128)
163+
iter
164+
}.count()
165+
}
166+
assert(thrownDueToMemoryLeak.getMessage.contains("memory leak"))
167+
}
168+
144169
// TODO: Need to add tests with shuffle fetch failures.
145170
}

0 commit comments

Comments
 (0)