Skip to content

Commit 07aba7c

Browse files
WenboZhaoCurtis Howard
authored andcommitted
Fix illegal executor id (apache#13)
* Fix illegal executor id * Address comments * Fix conflict env variable name (cherry picked from commit 7fd56c9) (cherry picked from commit 1785e3d) (cherry picked from commit d5c3f68)
1 parent e978875 commit 07aba7c

File tree

2 files changed

+37
-37
lines changed

2 files changed

+37
-37
lines changed

cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ object CoarseCookSchedulerBackend {
7878
* a task is done. It launches Spark tasks within the coarse-grained Cook instances using the
7979
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
8080
* latency.
81+
*
82+
* Since Spark 2.0.0, executor id must be an integer even though its type is string. This backend
83+
* uses task id which is also an integer and created via
84+
* {{{
85+
* MesosCoarseGrainedSchedulerBackend.newMesosId
86+
* }}}
87+
* as executor id.
88+
*
89+
* To ensure the mapping from executor id (task id) to its Cook job instance is 1-1 and onto,
90+
* we only allow one instance per Cook job and we are using the mapping from
91+
* <task id> -> <cook job id> to track this relationship.
8192
*/
8293
class CoarseCookSchedulerBackend(
8394
scheduler: TaskSchedulerImpl,
@@ -107,6 +118,7 @@ class CoarseCookSchedulerBackend(
107118
var totalFailures = 0
108119
val jobIds = mutable.Set[UUID]()
109120
val abortedJobIds = mutable.Set[UUID]()
121+
private val executorIdToJobId = mutable.HashMap[String, UUID]()
110122

111123
private[this] val jobClient = new JobClient.Builder()
112124
.setHost(cookHost)
@@ -184,15 +196,15 @@ class CoarseCookSchedulerBackend(
184196
import CoarseCookSchedulerBackend.fetchURI
185197

186198
val jobId = UUID.randomUUID()
187-
executorUUIDWriter(jobId)
188-
logInfo(s"Creating job with id: $jobId")
199+
val taskId = sparkMesosScheduler.newMesosTaskId()
200+
executorIdToJobId += taskId -> jobId
201+
logInfo(s"Creating job with id: $jobId. The corresponding executor id and task id is $taskId")
189202
val fakeOffer = Offer.newBuilder()
190203
.setId(OfferID.newBuilder().setValue("Cook-id"))
191204
.setFrameworkId(FrameworkID.newBuilder().setValue("Cook"))
192205
.setHostname("$(hostname)")
193-
.setSlaveId(SlaveID.newBuilder().setValue("${MESOS_EXECUTOR_ID}"))
206+
.setSlaveId(SlaveID.newBuilder().setValue(jobId.toString))
194207
.build()
195-
val taskId = sparkMesosScheduler.newMesosTaskId()
196208
val commandInfo = sparkMesosScheduler.createCommand(fakeOffer, numCores.toInt, taskId)
197209
val commandString = commandInfo.getValue
198210
val environmentInfo = commandInfo.getEnvironment
@@ -284,6 +296,7 @@ class CoarseCookSchedulerBackend(
284296
.setMemory(executorMemory(sc).toDouble)
285297
.setCpus(numCores)
286298
.setPriority(priority)
299+
.disableMeaCulpaRetries()
287300
.setRetries(1)
288301

289302
val container = conf.get("spark.executor.cook.container", null)
@@ -315,12 +328,6 @@ class CoarseCookSchedulerBackend(
315328
ret
316329
}
317330

318-
// In our fake offer mesos adds some autoincrementing ID per job but
319-
// this sticks around in the executorId so we strop it out to get the actual executor ID
320-
private def instanceIdFromExecutorId(executorId: String): UUID = {
321-
UUID.fromString(executorId.split('/')(0))
322-
}
323-
324331
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
325332
new DriverEndpoint(rpcEnv, properties) {
326333
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -330,32 +337,29 @@ class CoarseCookSchedulerBackend(
330337
}
331338

332339
def handleDisconnectedExecutor(executorId: String): Unit = {
333-
logInfo(s"Recieved disconnect message from executor with ID: ${executorId}")
340+
logInfo(s"Received disconnect message from executor with id $executorId." +
341+
s" Its related cook job id is ${executorIdToJobId(executorId)}")
334342
// TODO: we end up querying for everything, not sure of the perf implications here
335-
val allInstances = jobClient.query(jobIds.asJava).asScala.values
343+
val jobId = executorIdToJobId(executorId)
344+
val jobInstances = jobClient.query(Seq(jobId).asJava).asScala.values
336345
.flatMap(_.getInstances.asScala).toSeq
337-
val instanceId = instanceIdFromExecutorId(executorId)
338-
val correspondingInstance = allInstances.find(_.getTaskID == instanceId)
339-
if (correspondingInstance.isEmpty) {
346+
val slaveLostReason = SlaveLost("Remote RPC client disassociated likely due to " +
347+
"containers exceeding thresholds or network issues. Check driver logs for WARN " +
348+
"message.")
349+
if (jobInstances.isEmpty) {
340350
// This can happen in the case of an aborted executor when the Listener removes it first.
341351
// We can just mark it as lost since it wouldn't be preempted anyways.
342-
removeExecutor(executorId, SlaveLost("Remote RPC client disassociated likely due to " +
343-
"containers exceeding thresholds or network issues. Check driver logs for WARN " +
344-
"message."))
352+
removeExecutor(executorId, slaveLostReason)
345353
}
346-
correspondingInstance.foreach(instance => {
347-
val wasPreempted = instance.getPreempted
348-
val exitCode = instance.getReasonCode
349-
if (wasPreempted) {
350-
logInfo(s"Executor ${executorId} was removed due to preemption. Marking as killed.")
351-
removeExecutor(executorId, ExecutorExited(exitCode.toInt,
352-
false, "Executor was preempted by the scheduler."))
354+
jobInstances.foreach { instance =>
355+
if (instance.getPreempted) {
356+
logInfo(s"Executor $executorId was removed due to preemption. Marking as killed.")
357+
removeExecutor(executorId, ExecutorExited(instance.getReasonCode.toInt,
358+
exitCausedByApp = false, "Executor was preempted by the scheduler."))
353359
} else {
354-
removeExecutor(executorId, SlaveLost("Remote RPC client disassociated likely due to " +
355-
"containers exceeding thresholds or network issues. Check driver logs for WARN " +
356-
"message."))
360+
removeExecutor(executorId, slaveLostReason)
357361
}
358-
})
362+
}
359363
}
360364
}
361365
}
@@ -365,13 +369,9 @@ class CoarseCookSchedulerBackend(
365369
* @return whether the kill request is acknowledged.
366370
*/
367371
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
368-
val instancesToKill = executorIds.map(instanceIdFromExecutorId).toSet
369-
val jobsToInstances = jobClient.query(jobIds.asJava).asScala.values
370-
.flatMap(job => job.getInstances.asScala.map((job.getUUID, _))).toSeq
371-
val correspondingJobs = jobsToInstances.filter(i => instancesToKill.contains(i._2.getTaskID))
372-
.map(_._1).toSet
373-
jobClient.abort(correspondingJobs.asJava)
374-
correspondingJobs.foreach(abortedJobIds.add)
372+
val jobIdsToKill = executorIds.flatMap(executorIdToJobId.get)
373+
jobClient.abort(jobIdsToKill.asJava)
374+
jobIdsToKill.foreach(abortedJobIds.add)
375375
true
376376
}
377377

launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ abstract List<String> buildCommand(Map<String, String> env)
9191
*/
9292
List<String> buildJavaCommand(String extraClassPath) throws IOException {
9393
List<String> cmd = new ArrayList<>();
94-
String javaCommand = System.getenv("JAVA_CMD");
94+
String javaCommand = System.getenv("JAVA_COMMAND");
9595
if (javaCommand != null) {
9696
cmd.addAll(Arrays.asList(javaCommand.trim().split("\\s+")));
9797
return cmd;

0 commit comments

Comments
 (0)