Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
8984026
support gpu-aware resource scheduling in standalone
Ngone51 Jul 2, 2019
a9160e4
sync resources info on worker node
Ngone51 Jul 4, 2019
4787498
fix unit tests
Ngone51 Jul 4, 2019
6bee4ac
fix indentation
Ngone51 Jul 4, 2019
30991b0
close lockFileChannel after release lock
Ngone51 Jul 4, 2019
73027b3
revert createAppDesc()
Ngone51 Jul 4, 2019
03c0921
release resources on worker interrupt
Ngone51 Jul 4, 2019
3a54e16
reuse parseResourceRequirements()
Ngone51 Jul 4, 2019
d47c8db
fix scalastyle
Ngone51 Jul 4, 2019
0bf0d7e
only release resources for driver submitted in client mode
Ngone51 Jul 5, 2019
09c13af
release lock on execption
Ngone51 Jul 5, 2019
863b220
avoid NPE on listernerBus in SparkContext.stop()
Ngone51 Jul 5, 2019
acfed50
log isolated resources for worker/driver
Ngone51 Jul 5, 2019
d06985d
set _resources to empty when acquireResources failed
Ngone51 Jul 5, 2019
ff35213
do not prepare resources file for driver/executor when its resources …
Ngone51 Jul 10, 2019
83222cf
user ResourceInformation instead of Seq[String]
Ngone51 Jul 10, 2019
169be97
make it configurable for spark-resources/
Ngone51 Jul 10, 2019
13e9f93
remove variable hostResourceToAssignedAddress
Ngone51 Jul 10, 2019
33ec65b
use ResourceRequirement across task/executor/driver
Ngone51 Jul 11, 2019
e89bfe2
track assignments with pid
Ngone51 Jul 12, 2019
0153024
document resource config for worker
Ngone51 Jul 12, 2019
68c2cc9
use kill -0 instead of jps
Ngone51 Jul 13, 2019
e19a9b6
resolve conflict
Ngone51 Jul 15, 2019
ffa3663
fix python tests
Ngone51 Jul 16, 2019
8bed3c0
conflict -> conflicting
Ngone51 Jul 18, 2019
50e6a6c
SPARK_RESOURCES_DIRECTORY -> SPARK_RESOURCES_COORDINATE_DIR
Ngone51 Jul 18, 2019
9d532b3
reword configuration doc
Ngone51 Jul 18, 2019
a2899cc
use SPARK_REGEX/LOCAL_CLUSTER_REGEX
Ngone51 Jul 18, 2019
fa28d7a
remove unneed comment in ApplicationDescription
Ngone51 Jul 18, 2019
7a75713
move _resources into ExecutorDesc's constructor
Ngone51 Jul 18, 2019
105abea
notify -> recover
Ngone51 Jul 18, 2019
c1ca57e
fix bug in resourcesCanBeReleased
Ngone51 Jul 18, 2019
dc82637
add description for acquireResources()
Ngone51 Jul 18, 2019
26571ff
make acquireResources() more readable
Ngone51 Jul 18, 2019
c2fa13e
use case classes in WorkerSchedulerStateResponse
Ngone51 Jul 18, 2019
5cb2cd4
fix scalastyle
Ngone51 Jul 18, 2019
95111b0
use Executor/DriverResponse in MasterSuite
Ngone51 Jul 18, 2019
aec8cd5
also try to find SPARK_HOME under testing
Ngone51 Jul 26, 2019
e69c973
fix error in grammar
Ngone51 Jul 26, 2019
8bb7f18
abstract resource info showing logic into a separate method
Ngone51 Jul 26, 2019
d7c058d
avoid changing _resources
Ngone51 Jul 27, 2019
a29bede
showResourceInfo -> logResourceInfo
Ngone51 Jul 27, 2019
fa14f88
move WorkerInfo.releaseResources() into removeDriver()&removeExecutor()
Ngone51 Jul 27, 2019
7300813
rename old canLaunchExecutor() to canLaunchExecutorForApp()
Ngone51 Jul 27, 2019
e9ec52e
remove driver/execToResourcesUsed
Ngone51 Jul 27, 2019
8408e73
add desc for prepareResourceFile()
Ngone51 Jul 27, 2019
8aab740
prepareResourceFile -> prepareResourcesFile
Ngone51 Jul 27, 2019
a26e112
make prepareResourcesFile() return Option[File]
Ngone51 Jul 27, 2019
83313f0
try finally with releaseLock()
Ngone51 Jul 27, 2019
159929d
more comments for acquireResources()
Ngone51 Jul 27, 2019
8af658a
update doc
Ngone51 Jul 28, 2019
51818db
add StandaloneResourceUtils
Ngone51 Jul 28, 2019
f7612b4
check the format of process name
Ngone51 Aug 1, 2019
9f15819
remove ReleaseResources && releaseResourcesIfPossible()
Ngone51 Aug 1, 2019
1208f38
update comment above driver acquireResources()
Ngone51 Aug 1, 2019
7016aa9
remove unused try-catch
Ngone51 Aug 1, 2019
84bebae
logs for releaseResources()
Ngone51 Aug 1, 2019
0b67150
logWarning -> logError
Ngone51 Aug 1, 2019
c2d7132
pass None resourcesFile for LocalSparkCluster
Ngone51 Aug 1, 2019
2055582
add config spark.resources.coordinate.enable
Ngone51 Aug 2, 2019
c2a9855
update doc
Ngone51 Aug 2, 2019
4946b01
make ResourceAllocation public and add @@Evolving
Ngone51 Aug 2, 2019
782eb0f
simplify releaseResources()
Ngone51 Aug 2, 2019
dca8f8b
update comments for acquireResources() & releaseResources()
Ngone51 Aug 6, 2019
756a818
(Executor/Driver)Response -> Worker(Executor/DriverState)Response
Ngone51 Aug 6, 2019
250d9a8
remove resourcesCanBeReleased()
Ngone51 Aug 6, 2019
a97c91f
remove pid in WorkerInfo & RegisterWorker
Ngone51 Aug 6, 2019
adc74ae
improve doc
Ngone51 Aug 6, 2019
f121f84
fix scalastyle
Ngone51 Aug 6, 2019
b46b243
merge master
Ngone51 Aug 6, 2019
aa2f63d
make coordinate default to be true
Ngone51 Aug 6, 2019
ade35fc
add test for coordinate is off
Ngone51 Aug 6, 2019
2bb50da
array of ResourceAllocation
Ngone51 Aug 6, 2019
16523bd
update comment for test
Ngone51 Aug 6, 2019
33fcc95
revert doc for ResourceAllocation
Ngone51 Aug 8, 2019
15a9897
log warn when driver/executor require more resource than any workers …
Ngone51 Aug 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ scalastyle-on-compile.generated.xml
scalastyle-output.xml
scalastyle.txt
spark-*-bin-*.tgz
spark-resources/
spark-tests.log
src_managed/
streaming-tests.log
Expand Down
34 changes: 30 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.StandaloneResourceUtils._
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -245,6 +246,15 @@ class SparkContext(config: SparkConf) extends Logging {

def isLocal: Boolean = Utils.isLocalMaster(_conf)

private def isClientStandalone: Boolean = {
val isSparkCluster = master match {
case SparkMasterRegex.SPARK_REGEX(_) => true
case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true
case _ => false
}
deployMode == "client" && isSparkCluster
}

/**
* @return true if context is stopped or in the midst of stopping.
*/
Expand Down Expand Up @@ -380,7 +390,18 @@ class SparkContext(config: SparkConf) extends Logging {
_driverLogger = DriverLogger(_conf)

val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
_resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt)
val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt)
_resources = {
// driver submitted in client mode under Standalone may have conflicting resources with
// other drivers/workers on this host. We should sync driver's resources info into
// SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision.
if (isClientStandalone) {
acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, Utils.getProcessId)
} else {
allResources
}
}
logResourceInfo(SPARK_DRIVER_PREFIX, _resources)

// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
Expand Down Expand Up @@ -1911,8 +1932,10 @@ class SparkContext(config: SparkConf) extends Logging {
ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}

Utils.tryLogNonFatalError {
postApplicationEnd()
if (listenerBus != null) {
Utils.tryLogNonFatalError {
postApplicationEnd()
}
}
Utils.tryLogNonFatalError {
_driverLogger.foreach(_.stop())
Expand Down Expand Up @@ -1960,6 +1983,9 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
if (isClientStandalone) {
releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, Utils.getProcessId)
}
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
Expand Down Expand Up @@ -2726,7 +2752,7 @@ object SparkContext extends Logging {

// Calculate the max slots each executor can provide based on resources available on each
// executor and resources required by each task.
val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
val executorResourcesAndAmounts =
parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
.map(request => (request.id.resourceName, request.amount)).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy

import java.net.URI

import org.apache.spark.resource.ResourceRequirement

private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
Expand All @@ -32,7 +34,8 @@ private[spark] case class ApplicationDescription(
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
user: String = System.getProperty("user.name", "<unknown>"),
resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {

override def toString: String = "ApplicationDescription(" + name + ")"
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}

Expand Down Expand Up @@ -92,13 +93,15 @@ private class ClientEndpoint(
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
config.SPARK_DRIVER_PREFIX)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
command,
driverResourceReqs)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

Expand Down
28 changes: 22 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.util.Utils

private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler endpoint nodes. */
private[deploy] object DeployMessages {

// Worker to Master

/**
Expand All @@ -43,6 +43,7 @@ private[deploy] object DeployMessages {
* @param memory the memory size of worker
* @param workerWebUiUrl the worker Web UI address
* @param masterAddress the master address used by the worker to connect
* @param resources the resources of worker
*/
case class RegisterWorker(
id: String,
Expand All @@ -52,7 +53,8 @@ private[deploy] object DeployMessages {
cores: Int,
memory: Int,
workerWebUiUrl: String,
masterAddress: RpcAddress)
masterAddress: RpcAddress,
resources: Map[String, ResourceInformation] = Map.empty)
extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
Expand All @@ -72,8 +74,18 @@ private[deploy] object DeployMessages {
exception: Option[Exception])
extends DeployMessage

case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])
case class WorkerExecutorStateResponse(
desc: ExecutorDescription,
resources: Map[String, ResourceInformation])

case class WorkerDriverStateResponse(
driverId: String,
resources: Map[String, ResourceInformation])

case class WorkerSchedulerStateResponse(
id: String,
execResponses: List[WorkerExecutorStateResponse],
driverResponses: Seq[WorkerDriverStateResponse])

/**
* A worker will send this message to the master when it registers with the master. Then the
Expand Down Expand Up @@ -118,10 +130,14 @@ private[deploy] object DeployMessages {
execId: Int,
appDesc: ApplicationDescription,
cores: Int,
memory: Int)
memory: Int,
resources: Map[String, ResourceInformation] = Map.empty)
extends DeployMessage

case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
case class LaunchDriver(
driverId: String,
driverDesc: DriverDescription,
resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage

case class KillDriver(driverId: String) extends DeployMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.deploy

import org.apache.spark.resource.ResourceRequirement

private[deploy] case class DriverDescription(
jarUrl: String,
mem: Int,
cores: Int,
supervise: Boolean,
command: Command) {
command: Command,
resourceReqs: Seq[ResourceRequirement] = Seq.empty) {

override def toString: String = s"DriverDescription (${command.mainClass})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum), _conf)
memoryPerWorker, masters, null, Some(workerNum), _conf,
conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a big odd to me in local mode since all workers would get the same file. The intent was really to have the cluster admin pass them in different resource per worker but that goes with my general comment. If we want to keep this way then perhaps we just need to make sure to document it.

If the cluster admin does basically split the resources themselves between the Workers, then we have no need for the acquireResources and locking, so I definitely think we should put in a config to turn that off and we can document the different ways it can be setup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same resources file for different workers (whether local cluster or real cluster) really doesn't make sense if resources file is intent to have the cluster admin config with different resources. We can just pass None and only use discovery script for LocalSparkCluster since it is only used for test purpose.

And for a real cluster, how about this:

When user configs a resources file(even a discovery script configured concurrently), we just acquire resources from it and do not go through acquireResources() any more with the assuming that user has already configured different resources across workers. If not, then, we use discovery script and calls acquireResources() to make sure we get different resources compare to others.

And we don't introduce a new configuration here, but just document more specific and rely on those file existence to decide which way we wanna go to.

WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could do that, but then again a cluster admin could write a discovery script to make sure different workers get different resources. Then they don't have to manually create the resourcesFile. I also think there are some weird cases like you mention where you have both resources file and discovery script that wouldn't be obvious to the user what happens. one resource they separated but then another the discovery script didn't. I realize these are corner cases but with a config it would be obvious exactly what is going to happen.

If we don't do the config then I think we should leave it as is and just document that the resourcesfile/discovery script for Workers/Driver in Standalone mode needs to have all the node resources or they need to configure a different resources Dir for each.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then again a cluster admin could write a discovery script to make sure different workers get different resources.

I don't know whether it is easy for admin to do this within a script. But I agree that it would be better to have a separate config option if we do expect a admin would do this.

So, let me add it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of forgot about this, we should probably document the fact you can't use separate resource file or discovery script so only way to do this properly is with the coordination. I added an item to https://issues.apache.org/jira/browse/SPARK-27492 to make sure to document.

workerRpcEnvs += workerEnv
}

Expand Down
Loading