Skip to content

Commit 848ca6d

Browse files
author
Marcelo Vanzin
committed
[SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.
This change modifies the Yarn module so that all the logic related to running the ApplicationMaster is localized. Instead of, previously, 4 different classes with mostly identical code, now we have: - A single, shared ApplicationMaster class, which can operate both in client and cluster mode, and substitutes the old ApplicationMaster (for cluster mode) and ExecutorLauncher (for client mode). The benefit here is that all different execution modes for all supported yarn versions use the same shared code for monitoring executor allocation, setting up configuration, and monitoring the process's lifecycle. - A new YarnRMClient interface, which defines basic RM functionality needed by the ApplicationMaster. This interface has concrete implementations for each supported Yarn version. - A new YarnAllocator interface, which just abstracts the existing interface of the YarnAllocationHandler class. This is to avoid having to touch the allocator code too much in this change, although it might benefit from a similar effort in the future. The end result is much easier to understand code, with much less duplication, making it much easier to fix bugs, add features, and test everything knowing that all supported versions will behave the same.
1 parent 6201b27 commit 848ca6d

File tree

17 files changed

+760
-956
lines changed

17 files changed

+760
-956
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,14 @@ import org.apache.spark.util.{SignalLogger, Utils}
4242
/**
4343
* An application master that runs the users driver program and allocates executors.
4444
*/
45-
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
46-
sparkConf: SparkConf) extends Logging {
45+
class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
4746

48-
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
49-
this(args, new Configuration(), sparkConf)
50-
51-
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
52-
53-
private val rpc: YarnRPC = YarnRPC.create(conf)
47+
private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
48+
private val sparkConf = new SparkConf()
49+
private val rpc: YarnRPC = YarnRPC.create(yarnConf)
5450
private var resourceManager: AMRMProtocol = _
5551
private var appAttemptId: ApplicationAttemptId = _
5652
private var userThread: Thread = _
57-
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
5853
private val fs = FileSystem.get(yarnConf)
5954

6055
private var yarnAllocator: YarnAllocationHandler = _
@@ -348,24 +343,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
348343
}
349344
*/
350345

351-
def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
352-
synchronized {
353-
if (isFinished) {
354-
return
355-
}
356-
isFinished = true
357-
358-
logInfo("finishApplicationMaster with " + status)
359-
if (registered) {
360-
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
361-
.asInstanceOf[FinishApplicationMasterRequest]
362-
finishReq.setAppAttemptId(appAttemptId)
363-
finishReq.setFinishApplicationStatus(status)
364-
finishReq.setDiagnostics(diagnostics)
365-
finishReq.setTrackingUrl(uiHistoryAddress)
366-
resourceManager.finishApplicationMaster(finishReq)
367-
}
368-
}
346+
override protected def finish() = {
347+
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
348+
.asInstanceOf[FinishApplicationMasterRequest]
349+
finishReq.setAppAttemptId(appAttemptId)
350+
finishReq.setFinishApplicationStatus(status)
351+
finishReq.setDiagnostics(diagnostics)
352+
finishReq.setTrackingUrl(uiHistoryAddress)
353+
resourceManager.finishApplicationMaster(finishReq)
369354
}
370355

371356
/**
@@ -404,6 +389,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
404389
}
405390

406391
object ApplicationMaster extends Logging {
392+
393+
private var master: ApplicationMaster = _
394+
407395
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
408396
// optimal as more containers are available. Might need to handle this better.
409397
private val ALLOCATE_HEARTBEAT_INTERVAL = 100
@@ -454,7 +442,8 @@ object ApplicationMaster extends Logging {
454442
SignalLogger.register(log)
455443
val args = new ApplicationMasterArguments(argStrings)
456444
SparkHadoopUtil.get.runAsSparkUser { () =>
457-
new ApplicationMaster(args).run()
445+
master = new ApplicationMaster(args)
446+
master.run()
458447
}
459448
}
460449
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,11 @@ import org.apache.spark.deploy.SparkHadoopUtil
4141
*
4242
* This is used only in yarn-client mode.
4343
*/
44-
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
45-
extends Logging {
44+
class ExecutorLauncher(args: ApplicationMasterArguments) extends Logging {
4645

47-
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
48-
this(args, new Configuration(), sparkConf)
49-
50-
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
51-
52-
private val rpc: YarnRPC = YarnRPC.create(conf)
46+
private val sparkConf = new SparkConf()
47+
private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
48+
private val rpc: YarnRPC = YarnRPC.create(yarnConf)
5349
private var resourceManager: AMRMProtocol = _
5450
private var appAttemptId: ApplicationAttemptId = _
5551
private var reporterThread: Thread = _

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[yarn] class YarnAllocationHandler(
6868
val preferredHostToCount: Map[String, Int],
6969
val preferredRackToCount: Map[String, Int],
7070
val sparkConf: SparkConf)
71-
extends Logging {
71+
extends YarnAllocator with Logging {
7272
// These three are locked on allocatedHostToContainersMap. Complementary data structures
7373
// allocatedHostToContainersMap : containers which are running : host, Set<containerid>
7474
// allocatedContainerToHostMap: container to host mapping.

0 commit comments

Comments
 (0)