From bcba710778bb85701c52f0218743fae5aa0fdd27 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 3 Jan 2024 23:38:47 +0800 Subject: [PATCH 1/2] init --- .../org/apache/spark/executor/Executor.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f2a65aab1ba4..da7873e0b72a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy import javax.ws.rs.core.UriBuilder -import scala.collection.{immutable, mutable} +import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -54,6 +54,7 @@ import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ +import org.apache.spark.util.ArrayImplicits._ private[spark] class IsolatedSessionState( val sessionUUID: String, @@ -751,8 +752,8 @@ private[spark] class Executor( val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) // Here and below, put task metric peaks in a ArraySeq to expose them as a Seq // without requiring a copy. - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) - val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq) + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq + val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks) plugins.foreach(_.onTaskFailed(reason)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) @@ -762,8 +763,8 @@ private[spark] class Executor( logInfo(s"Executor interrupted and killed $taskName, reason: $killReason") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) - val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq) + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq + val reason = TaskKilled(killReason, accUpdates, accums, metricPeaks) plugins.foreach(_.onTaskFailed(reason)) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) @@ -806,18 +807,18 @@ private[spark] class Executor( // instead of an app issue). if (!ShutdownHookManager.inShutdown()) { val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - val metricPeaks = mutable.ArraySeq.make(metricsPoller.getTaskMetricPeaks(taskId)) + val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq val (taskFailureReason, serializedTaskFailureReason) = { try { val ef = new ExceptionFailure(t, accUpdates).withAccums(accums) - .withMetricPeaks(metricPeaks.toSeq) + .withMetricPeaks(metricPeaks) (ef, ser.serialize(ef)) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) - .withMetricPeaks(metricPeaks.toSeq) + .withMetricPeaks(metricPeaks) (ef, ser.serialize(ef)) } } From cdb82f81af6b1b115c4861c054e537c4c9c3b9d2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 3 Jan 2024 23:48:38 +0800 Subject: [PATCH 2/2] comments --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index da7873e0b72a..12471915cd97 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -750,8 +750,8 @@ private[spark] class Executor( logInfo(s"Executor killed $taskName, reason: ${t.reason}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) - // Here and below, put task metric peaks in a ArraySeq to expose them as a Seq - // without requiring a copy. + // Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an + // immutable.Seq without requiring a copy. val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq val reason = TaskKilled(t.reason, accUpdates, accums, metricPeaks) plugins.foreach(_.onTaskFailed(reason))