Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit c1f0167

Browse files
committed
Fix the error masking problem
1 parent 448eae8 commit c1f0167

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,18 +209,21 @@ private[spark] class Executor(
209209

210210
// Run the actual task and measure its runtime.
211211
taskStart = System.currentTimeMillis()
212+
var threwException = true
212213
val (value, accumUpdates) = try {
213-
task.run(
214+
val res = task.run(
214215
taskAttemptId = taskId,
215216
attemptNumber = attemptNumber,
216217
metricsSystem = env.metricsSystem)
218+
threwException = false
219+
res
217220
} finally {
218221
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
219222
// when changing this, make sure to update both copies.
220223
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
221224
if (freedMemory > 0) {
222225
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
223-
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
226+
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
224227
throw new SparkException(errMsg)
225228
} else {
226229
logError(errMsg)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,17 +685,20 @@ class DAGScheduler(
685685
metricsSystem = env.metricsSystem,
686686
runningLocally = true)
687687
TaskContext.setTaskContext(taskContext)
688+
var threwException = true
688689
try {
689690
val result = job.func(taskContext, rdd.iterator(split, taskContext))
690691
job.listener.taskSucceeded(0, result)
692+
threwException = false
691693
} finally {
692694
taskContext.markTaskCompleted()
693695
TaskContext.unset()
694696
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
695697
// make sure to update both copies.
696698
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
697699
if (freedMemory > 0) {
698-
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
700+
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)
701+
&& !threwException) {
699702
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
700703
} else {
701704
logError(s"Managed memory leak detected; size = $freedMemory bytes")

0 commit comments

Comments
 (0)