Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -407,7 +408,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: ExecutorLossReason) {
protected def removeExecutor(executorId: String, reason: ExecutorLossReason) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to protected since it's called only by subclasses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you're here can you fix this to have a proper signature (add return type)?

try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
Expand All @@ -416,6 +417,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

protected def removeExecutorAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we can't just make removeExecutor always use the async API?

It doesn't look like any of the call sites actually relies on it being synchronous.

executorId: String,
reason: ExecutorLossReason): Future[Boolean] = {
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason))
}

def sufficientResourcesRegistered(): Boolean = true

override def isReady(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A [[SchedulerBackend]] implementation for Spark's standalone cluster manager.
Expand Down Expand Up @@ -148,13 +148,17 @@ private[spark] class StandaloneSchedulerBackend(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}

/** Note: this method should not block. See [[StandaloneAppClientListener]] */
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
// Only log the failure since we don't care about the result.
removeExecutorAsync(fullId.split("/")(1), reason).onFailure { case t =>
logError(t.getMessage, t)
}(ThreadUtils.sameThread)
}

override def sufficientResourcesRegistered(): Boolean = {
Expand Down