-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone #25047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
8984026
a9160e4
4787498
6bee4ac
30991b0
73027b3
03c0921
3a54e16
d47c8db
0bf0d7e
09c13af
863b220
acfed50
d06985d
ff35213
83222cf
169be97
13e9f93
33ec65b
e89bfe2
0153024
68c2cc9
e19a9b6
ffa3663
8bed3c0
50e6a6c
9d532b3
a2899cc
fa28d7a
7a75713
105abea
c1ca57e
dc82637
26571ff
c2fa13e
5cb2cd4
95111b0
aec8cd5
e69c973
8bb7f18
d7c058d
a29bede
fa14f88
7300813
e9ec52e
8408e73
8aab740
a26e112
83313f0
159929d
8af658a
51818db
f7612b4
9f15819
1208f38
7016aa9
84bebae
0b67150
c2d7132
2055582
c2a9855
4946b01
782eb0f
dca8f8b
756a818
250d9a8
a97c91f
adc74ae
f121f84
b46b243
aa2f63d
ade35fc
2bb50da
16523bd
33fcc95
15a9897
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -380,6 +380,17 @@ class SparkContext(config: SparkConf) extends Logging { | |
|
|
||
| val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE) | ||
| _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) | ||
| // driver submitted in client mode under Standalone may have conflict resources with | ||
|
||
| // workers on this host. We should sync driver's resources info into SPARK_RESOURCES | ||
| // to avoid collision. | ||
| if (deployMode == "client" && (master.startsWith("spark://") | ||
| || master.startsWith("local-cluster"))) { | ||
| val requests = parseAllResourceRequests(_conf, SPARK_DRIVER_PREFIX).map {req => | ||
| req.id.resourceName -> req.amount | ||
| }.toMap | ||
| // TODO(wuyi) log driver's acquired resources separately ? | ||
|
||
| _resources = acquireResources(_resources, requests) | ||
| } | ||
|
|
||
| // log out spark.app.name in the Spark driver logs | ||
| logInfo(s"Submitted application: $appName") | ||
|
|
@@ -1935,6 +1946,7 @@ class SparkContext(config: SparkConf) extends Logging { | |
| Utils.tryLogNonFatalError { | ||
| _progressBar.foreach(_.stop()) | ||
| } | ||
| releaseResources(_resources) | ||
| _taskScheduler = null | ||
| // TODO: Cache.stop()? | ||
| if (_env != null) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,9 @@ private[spark] case class ApplicationDescription( | |
| // number of executors this application wants to start with, | ||
| // only used if dynamic allocation is enabled | ||
| initialExecutorLimit: Option[Int] = None, | ||
| user: String = System.getProperty("user.name", "<unknown>")) { | ||
| user: String = System.getProperty("user.name", "<unknown>"), | ||
| // map from resource name to its requested amount by the executor | ||
|
||
| resourceReqsPerExecutor: Map[String, Int] = Map.empty) { | ||
|
|
||
| override def toString: String = "ApplicationDescription(" + name + ")" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} | |
| import org.apache.spark.deploy.master.DriverState.DriverState | ||
| import org.apache.spark.deploy.master.RecoveryState.MasterState | ||
| import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} | ||
| import org.apache.spark.resource.ResourceInformation | ||
| import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -43,6 +44,7 @@ private[deploy] object DeployMessages { | |
| * @param memory the memory size of worker | ||
| * @param workerWebUiUrl the worker Web UI address | ||
| * @param masterAddress the master address used by the worker to connect | ||
| * @param resources the resources of worker | ||
| */ | ||
| case class RegisterWorker( | ||
| id: String, | ||
|
|
@@ -52,7 +54,8 @@ private[deploy] object DeployMessages { | |
| cores: Int, | ||
| memory: Int, | ||
| workerWebUiUrl: String, | ||
| masterAddress: RpcAddress) | ||
| masterAddress: RpcAddress, | ||
| resources: Map[String, ResourceInformation] = Map.empty) | ||
| extends DeployMessage { | ||
| Utils.checkHost(host) | ||
| assert (port > 0) | ||
|
|
@@ -72,8 +75,10 @@ private[deploy] object DeployMessages { | |
| exception: Option[Exception]) | ||
| extends DeployMessage | ||
|
|
||
| case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], | ||
| driverIds: Seq[String]) | ||
| case class WorkerSchedulerStateResponse( | ||
| id: String, | ||
| execWithResources: List[(ExecutorDescription, Map[String, Seq[String]])], | ||
|
||
| driverWithResources: Seq[(String, Map[String, Seq[String]])]) | ||
|
|
||
| /** | ||
| * A worker will send this message to the master when it registers with the master. Then the | ||
|
|
@@ -110,6 +115,12 @@ private[deploy] object DeployMessages { | |
|
|
||
| case class ReconnectWorker(masterUrl: String) extends DeployMessage | ||
|
|
||
| /** | ||
| * Ask the worker to release the indicated resources in ALLOCATED_RESOURCES_JSON_FILE | ||
| * @param toRelease the resources expected to release | ||
| */ | ||
| case class ReleaseResources(toRelease: Map[String, ResourceInformation]) extends DeployMessage | ||
|
|
||
| case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage | ||
|
|
||
| case class LaunchExecutor( | ||
|
|
@@ -118,10 +129,14 @@ private[deploy] object DeployMessages { | |
| execId: Int, | ||
| appDesc: ApplicationDescription, | ||
| cores: Int, | ||
| memory: Int) | ||
| memory: Int, | ||
| resources: Map[String, Seq[String]]) | ||
| extends DeployMessage | ||
|
|
||
| case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage | ||
| case class LaunchDriver( | ||
| driverId: String, | ||
| driverDesc: DriverDescription, | ||
| resources: Map[String, Seq[String]] = Map.empty) extends DeployMessage | ||
|
|
||
| case class KillDriver(driverId: String) extends DeployMessage | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,7 +64,8 @@ class LocalSparkCluster( | |
| /* Start the Workers */ | ||
| for (workerNum <- 1 to numWorkers) { | ||
| val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, | ||
| memoryPerWorker, masters, null, Some(workerNum), _conf) | ||
| memoryPerWorker, masters, null, Some(workerNum), _conf, | ||
| conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems a big odd to me in local mode since all workers would get the same file. The intent was really to have the cluster admin pass them in different resource per worker but that goes with my general comment. If we want to keep this way then perhaps we just need to make sure to document it. If the cluster admin does basically split the resources themselves between the Workers, then we have no need for the acquireResources and locking, so I definitely think we should put in a config to turn that off and we can document the different ways it can be setup.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the same resources file for different workers (whether local cluster or real cluster) really doesn't make sense if resources file is intent to have the cluster admin config with different resources. We can just pass None and only use discovery script for LocalSparkCluster since it is only used for test purpose. And for a real cluster, how about this: When user configs a resources file(even a discovery script configured concurrently), we just acquire resources from it and do not go through acquireResources() any more with the assuming that user has already configured different resources across workers. If not, then, we use discovery script and calls acquireResources() to make sure we get different resources compare to others. And we don't introduce a new configuration here, but just document more specific and rely on those file existence to decide which way we wanna go to. WDYT ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could do that, but then again a cluster admin could write a discovery script to make sure different workers get different resources. Then they don't have to manually create the resourcesFile. I also think there are some weird cases like you mention where you have both resources file and discovery script that wouldn't be obvious to the user what happens. one resource they separated but then another the discovery script didn't. I realize these are corner cases but with a config it would be obvious exactly what is going to happen. If we don't do the config then I think we should leave it as is and just document that the resourcesfile/discovery script for Workers/Driver in Standalone mode needs to have all the node resources or they need to configure a different resources Dir for each.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't know whether it is easy for admin to do this within a script. But I agree that it would be better to have a separate config option if we do expect a admin would do this. So, let me add it later.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kind of forgot about this, we should probably document the fact you can't use separate resource file or discovery script so only way to do this properly is with the coordination. I added an item to https://issues.apache.org/jira/browse/SPARK-27492 to make sure to document. |
||
| workerRpcEnvs += workerEnv | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,9 @@ private[master] class ExecutorDesc( | |
| val memory: Int) { | ||
|
|
||
| var state = ExecutorState.LAUNCHING | ||
| // resources(e.f. gpu/fpga) allocated to this driver | ||
| // map from resource name to its addresses | ||
| private var _resources: Map[String, Seq[String]] = _ | ||
|
||
|
|
||
| /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ | ||
| def copyState(execDesc: ExecutorDescription) { | ||
|
|
@@ -49,4 +52,8 @@ private[master] class ExecutorDesc( | |
| override def toString: String = fullId | ||
|
|
||
| override def hashCode: Int = toString.hashCode() | ||
|
|
||
| def withResources(r: Map[String, Seq[String]]): Unit = _resources = r | ||
|
|
||
| def resources: Map[String, Seq[String]] = _resources | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to print log message with the resources to the driver, but then below we may change them, that could be confusing to the user. I think its also best if we don't assign _resources here yet as it could change below. We also end up calling parseResourceRequirements twice.
It might be cleaner to either call this one is !isClientStandalone or call a different function that does some of this plus the logic to acquire below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed problem and worker has the same problem. Maybe, we can (1)abstract the resources info print logic into a single method or (2)pass in a new parameter, e.g.
isLogInfoMessage, to indicate whether to log it(Note that methodacquireResoucesalso print resources info now, but I think it'd be better to unify them.), which may require less change . WDYT ? @tgravescsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its unfortunate we need to call parseAllResourceRequests multiple times, but looking it I don't see an easy way not to that doesn't hurt readability. I would say we could separate out the log statements into a different method and call it separate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set the result of getOrDiscoverAllResources and and acquireResources to a temporary variable and then after the isClientSTandalone set the final _resources value. That way we aren't changing it.