diff --git a/.gitignore b/.gitignore index d5cf66d1db12..91b662ecbacb 100644 --- a/.gitignore +++ b/.gitignore @@ -70,6 +70,7 @@ scalastyle-on-compile.generated.xml scalastyle-output.xml scalastyle.txt spark-*-bin-*.tgz +spark-resources/ spark-tests.log src_managed/ streaming-tests.log diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aa71b21caa30..396d712bd739 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging @@ -245,6 +246,15 @@ class SparkContext(config: SparkConf) extends Logging { def isLocal: Boolean = Utils.isLocalMaster(_conf) + private def isClientStandalone: Boolean = { + val isSparkCluster = master match { + case SparkMasterRegex.SPARK_REGEX(_) => true + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true + case _ => false + } + deployMode == "client" && isSparkCluster + } + /** * @return true if context is stopped or in the midst of stopping. */ @@ -380,7 +390,18 @@ class SparkContext(config: SparkConf) extends Logging { _driverLogger = DriverLogger(_conf) val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE) - _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) + val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) + _resources = { + // driver submitted in client mode under Standalone may have conflicting resources with + // other drivers/workers on this host. We should sync driver's resources info into + // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision. + if (isClientStandalone) { + acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, Utils.getProcessId) + } else { + allResources + } + } + logResourceInfo(SPARK_DRIVER_PREFIX, _resources) // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") @@ -1911,8 +1932,10 @@ class SparkContext(config: SparkConf) extends Logging { ShutdownHookManager.removeShutdownHook(_shutdownHookRef) } - Utils.tryLogNonFatalError { - postApplicationEnd() + if (listenerBus != null) { + Utils.tryLogNonFatalError { + postApplicationEnd() + } } Utils.tryLogNonFatalError { _driverLogger.foreach(_.stop()) @@ -1960,6 +1983,9 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _progressBar.foreach(_.stop()) } + if (isClientStandalone) { + releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, Utils.getProcessId) + } _taskScheduler = null // TODO: Cache.stop()? if (_env != null) { @@ -2726,7 +2752,7 @@ object SparkContext extends Logging { // Calculate the max slots each executor can provide based on resources available on each // executor and resources required by each task. - val taskResourceRequirements = parseTaskResourceRequirements(sc.conf) + val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX) val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX) .map(request => (request.id.resourceName, request.amount)).toMap diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index c5c5c60923f4..e11f497b4bfd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.net.URI +import org.apache.spark.resource.ResourceRequirement + private[spark] case class ApplicationDescription( name: String, maxCores: Option[Int], @@ -32,7 +34,8 @@ private[spark] case class ApplicationDescription( // number of executors this application wants to start with, // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, - user: String = System.getProperty("user.name", "")) { + user: String = System.getProperty("user.name", ""), + resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index ea7c902b1b6b..648a8b1c763d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,6 +29,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils} @@ -92,13 +93,15 @@ private class ClientEndpoint( val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) - + val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_DRIVER_PREFIX) val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, - command) + command, + driverResourceReqs) asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 5723b0f69057..3f1d1aebdf9d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -24,6 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.util.Utils @@ -31,7 +32,6 @@ private[deploy] sealed trait DeployMessage extends Serializable /** Contains messages sent between Scheduler endpoint nodes. */ private[deploy] object DeployMessages { - // Worker to Master /** @@ -43,6 +43,7 @@ private[deploy] object DeployMessages { * @param memory the memory size of worker * @param workerWebUiUrl the worker Web UI address * @param masterAddress the master address used by the worker to connect + * @param resources the resources of worker */ case class RegisterWorker( id: String, @@ -52,7 +53,8 @@ private[deploy] object DeployMessages { cores: Int, memory: Int, workerWebUiUrl: String, - masterAddress: RpcAddress) + masterAddress: RpcAddress, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage { Utils.checkHost(host) assert (port > 0) @@ -72,8 +74,18 @@ private[deploy] object DeployMessages { exception: Option[Exception]) extends DeployMessage - case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], - driverIds: Seq[String]) + case class WorkerExecutorStateResponse( + desc: ExecutorDescription, + resources: Map[String, ResourceInformation]) + + case class WorkerDriverStateResponse( + driverId: String, + resources: Map[String, ResourceInformation]) + + case class WorkerSchedulerStateResponse( + id: String, + execResponses: List[WorkerExecutorStateResponse], + driverResponses: Seq[WorkerDriverStateResponse]) /** * A worker will send this message to the master when it registers with the master. Then the @@ -118,10 +130,14 @@ private[deploy] object DeployMessages { execId: Int, appDesc: ApplicationDescription, cores: Int, - memory: Int) + memory: Int, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage - case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage + case class LaunchDriver( + driverId: String, + driverDesc: DriverDescription, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 1f5626ab5a89..02c166b8785a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -17,12 +17,15 @@ package org.apache.spark.deploy +import org.apache.spark.resource.ResourceRequirement + private[deploy] case class DriverDescription( jarUrl: String, mem: Int, cores: Int, supervise: Boolean, - command: Command) { + command: Command, + resourceReqs: Seq[ResourceRequirement] = Seq.empty) { override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index c1866b4c3606..f1b58eb33a1b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -64,7 +64,8 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masters, null, Some(workerNum), _conf) + memoryPerWorker, masters, null, Some(workerNum), _conf, + conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE)) workerRpcEnvs += workerEnv } diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala new file mode 100644 index 000000000000..b64a36f532d0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{File, RandomAccessFile} +import java.nio.channels.{FileLock, OverlappingFileLockException} +import java.nio.file.Files + +import scala.collection.mutable +import scala.util.Random +import scala.util.control.NonFatal + +import org.json4s.{DefaultFormats, Extraction} +import org.json4s.jackson.JsonMethods.{compact, parse, render} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE, SPARK_RESOURCES_DIR} +import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements, withResourcesJson} +import org.apache.spark.util.Utils + +private[spark] object StandaloneResourceUtils extends Logging { + // These directory/files are used to coordinate the resources between + // the drivers/workers on the host in Spark Standalone. + val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources" + val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json" + val RESOURCES_LOCK_FILE = "__allocated_resources__.lock" + + /** + * Resource allocation used in Standalone only, which tracks assignments with + * worker/driver(client only) pid. + */ + case class StandaloneResourceAllocation(pid: Int, allocations: Seq[ResourceAllocation]) { + // convert allocations to a resource information map + def toResourceInformationMap: Map[String, ResourceInformation] = { + allocations.map { allocation => + allocation.id.resourceName -> allocation.toResourceInformation + }.toMap + } + } + + /** + * Assigns (if coordinate needed) resources to workers/drivers from the same host to avoid + * address conflict. + * + * This function works in three steps. First, acquiring the lock on RESOURCES_LOCK_FILE + * to achieve synchronization among workers and drivers. Second, getting all allocated + * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources to the worker + * or driver after differentiating available resources in discovered resources from + * allocated resources. If available resources don't meet worker's or driver's requirement, + * try to update allocated resources by excluding the resource allocation if its related + * process has already terminated and do the assignment again. If still don't meet requirement, + * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with new allocated + * resources along with pid for the worker or driver. Then, return allocated resources + * information after releasing the lock. + * + * @param conf SparkConf + * @param componentName spark.driver / spark.worker + * @param resources the resources found by worker/driver on the host + * @param pid the process id of worker/driver to acquire resources. + * @return allocated resources for the worker/driver or throws exception if can't + * meet worker/driver's requirement + */ + def acquireResources( + conf: SparkConf, + componentName: String, + resources: Map[String, ResourceInformation], + pid: Int) + : Map[String, ResourceInformation] = { + if (!needCoordinate(conf)) { + return resources + } + val resourceRequirements = parseResourceRequirements(conf, componentName) + if (resourceRequirements.isEmpty) { + return Map.empty + } + val lock = acquireLock(conf) + try { + val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) + // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated if any allocations' + // related processes detected to be terminated while checking pids below. + var origAllocation = Seq.empty[StandaloneResourceAllocation] + // Map[pid -> Map[resourceName -> Addresses[]]] + var allocated = { + if (resourcesFile.exists()) { + origAllocation = allocatedStandaloneResources(resourcesFile.getPath) + val allocations = origAllocation.map { resource => + val resourceMap = { + resource.allocations.map { allocation => + allocation.id.resourceName -> allocation.addresses.toArray + }.toMap + } + resource.pid -> resourceMap + }.toMap + allocations + } else { + Map.empty[Int, Map[String, Array[String]]] + } + } + + // new allocated resources for worker or driver, + // map from resource name to its allocated addresses. + var newAssignments: Map[String, Array[String]] = null + // Whether we've checked process status and we'll only do the check at most once. + // Do the check iff the available resources can't meet the requirements at the first time. + var checked = false + // Whether we need to keep allocating for the worker/driver and we'll only go through + // the loop at most twice. + var keepAllocating = true + while (keepAllocating) { + keepAllocating = false + // store the pid whose related allocated resources conflict with + // discovered resources passed in. + val pidsToCheck = mutable.Set[Int]() + newAssignments = resourceRequirements.map { req => + val rName = req.resourceName + val amount = req.amount + // initially, we must have available.length >= amount as we've done pre-check previously + var available = resources(rName).addresses + // gets available resource addresses by excluding all + // allocated resource addresses from discovered resources + allocated.foreach { a => + val thePid = a._1 + val resourceMap = a._2 + val assigned = resourceMap.getOrElse(rName, Array.empty) + val retained = available.diff(assigned) + // if len(retained) < len(available) after differ to assigned, then, there must be + // some conflicting resources addresses between available and assigned. So, we should + // store its pid here to check whether it's alive in case we don't find enough + // resources after traversal all allocated resources. + if (retained.length < available.length && !checked) { + pidsToCheck += thePid + } + if (retained.length >= amount) { + available = retained + } else if (checked) { + keepAllocating = false + throw new SparkException(s"No more resources available since they've already" + + s" assigned to other workers/drivers.") + } else { + keepAllocating = true + } + } + val assigned = { + if (keepAllocating) { // can't meet the requirement + // excludes the allocation whose related process has already been terminated. + val (invalid, valid) = allocated.partition { a => + pidsToCheck(a._1) && !(Utils.isTesting || Utils.isProcessRunning(a._1))} + allocated = valid + origAllocation = origAllocation.filter( + allocation => !invalid.contains(allocation.pid)) + checked = true + // note this is a meaningless return value, just to avoid creating any new object + available + } else { + available.take(amount) + } + } + rName -> assigned + }.toMap + } + val newAllocation = { + val allocations = newAssignments.map { case (rName, addresses) => + ResourceAllocation(ResourceID(componentName, rName), addresses) + }.toSeq + StandaloneResourceAllocation(pid, allocations) + } + writeResourceAllocationJson( + componentName, origAllocation ++ Seq(newAllocation), resourcesFile) + newAllocation.toResourceInformationMap + } finally { + releaseLock(lock) + } + } + + /** + * Frees (if coordinate needed) all the resources a worker/driver (pid) has in one shot + * to make those resources be available for other workers/drivers on the same host. + * @param conf SparkConf + * @param componentName spark.driver / spark.worker + * @param toRelease the resources expected to release + * @param pid the process id of worker/driver to release resources. + */ + def releaseResources( + conf: SparkConf, + componentName: String, + toRelease: Map[String, ResourceInformation], + pid: Int) + : Unit = { + if (!needCoordinate(conf)) { + return + } + if (toRelease != null && toRelease.nonEmpty) { + val lock = acquireLock(conf) + try { + val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) + if (resourcesFile.exists()) { + val (target, others) = + allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid) + if (target.nonEmpty) { + if (others.isEmpty) { + if (!resourcesFile.delete()) { + logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.") + } + } else { + writeResourceAllocationJson(componentName, others, resourcesFile) + } + logDebug(s"$componentName(pid=$pid) released resources: ${toRelease.mkString("\n")}") + } else { + logWarning(s"$componentName(pid=$pid) has already released its resources.") + } + } + } finally { + releaseLock(lock) + } + } + } + + private def acquireLock(conf: SparkConf): FileLock = { + val resourcesDir = getOrCreateResourcesDir(conf) + val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE) + val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel + var keepTry = true + var lock: FileLock = null + while (keepTry) { + try { + lock = lockFileChannel.lock() + logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.") + keepTry = false + } catch { + case e: OverlappingFileLockException => + // This exception throws when we're in LocalSparkCluster mode. FileLock is designed + // to be used across JVMs, but our LocalSparkCluster is designed to launch multiple + // workers in the same JVM. As a result, when an worker in LocalSparkCluster try to + // acquire the lock on `resources.lock` which already locked by other worker, we'll + // hit this exception. So, we should manually control it. + keepTry = true + // there may be multiple workers race for the lock, + // so, sleep for a random time to avoid possible conflict + val duration = Random.nextInt(1000) + 1000 + Thread.sleep(duration) + } + } + assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.") + lock + } + + private def releaseLock(lock: FileLock): Unit = { + try { + lock.release() + lock.channel().close() + logInfo(s"Released lock on $RESOURCES_LOCK_FILE.") + } catch { + case e: Exception => + logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e) + } + } + + private def getOrCreateResourcesDir(conf: SparkConf): File = { + val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse { + val sparkHome = if (Utils.isTesting) { + assert(sys.props.contains("spark.test.home") || + sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is not set.") + sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME")) + } else { + sys.env.getOrElse("SPARK_HOME", ".") + } + sparkHome + }) + val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR) + if (!resourceDir.exists()) { + Utils.createDirectory(resourceDir) + } + resourceDir + } + + private def allocatedStandaloneResources(resourcesFile: String) + : Seq[StandaloneResourceAllocation] = { + withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json => + implicit val formats = DefaultFormats + parse(json).extract[Seq[StandaloneResourceAllocation]] + } + } + + /** + * Save the allocated resources of driver(cluster only) or executor into a JSON formatted + * resources file. Used in Standalone only. + * @param componentName spark.driver / spark.executor + * @param resources allocated resources for driver(cluster only) or executor + * @param dir the target directory used to place the resources file + * @return None if resources is empty or Some(file) which represents the resources file + */ + def prepareResourcesFile( + componentName: String, + resources: Map[String, ResourceInformation], + dir: File): Option[File] = { + if (resources.isEmpty) { + return None + } + + val compShortName = componentName.substring(componentName.lastIndexOf(".") + 1) + val tmpFile = Utils.tempFileWith(dir) + val allocations = resources.map { case (rName, rInfo) => + ResourceAllocation(ResourceID(componentName, rName), rInfo.addresses) + }.toSeq + try { + writeResourceAllocationJson(componentName, allocations, tmpFile) + } catch { + case NonFatal(e) => + val errMsg = s"Exception threw while preparing resource file for $compShortName" + logError(errMsg, e) + throw new SparkException(errMsg, e) + } + val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir) + tmpFile.renameTo(resourcesFile) + Some(resourcesFile) + } + + private def writeResourceAllocationJson[T]( + componentName: String, + allocations: Seq[T], + jsonFile: File): Unit = { + implicit val formats = DefaultFormats + val allocationJson = Extraction.decompose(allocations) + Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes()) + } + + /** Whether needs to coordinate resources among workers and drivers for user */ + def needCoordinate(conf: SparkConf): Boolean = { + conf.get(SPARK_RESOURCES_COORDINATE) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 53564d0e9515..6c56807458b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.deploy.ApplicationDescription +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -82,8 +83,10 @@ private[spark] class ApplicationInfo( private[master] def addExecutor( worker: WorkerInfo, cores: Int, + resources: Map[String, ResourceInformation], useID: Option[Int] = None): ExecutorDesc = { - val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, + desc.memoryPerExecutorMB, resources) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 8d5edae0501e..bf68ba8e15af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import java.util.Date import org.apache.spark.deploy.DriverDescription +import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.Utils private[deploy] class DriverInfo( @@ -34,6 +35,9 @@ private[deploy] class DriverInfo( @transient var exception: Option[Exception] = None /* Most recent worker assigned to this driver */ @transient var worker: Option[WorkerInfo] = None + // resources(e.f. gpu/fpga) allocated to this driver + // map from resource name to ResourceInformation + private var _resources: Map[String, ResourceInformation] = _ init() @@ -47,4 +51,8 @@ private[deploy] class DriverInfo( worker = None exception = None } + + def withResources(r: Map[String, ResourceInformation]): Unit = _resources = r + + def resources: Map[String, ResourceInformation] = _resources } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala index fc62b094def6..a8f849256111 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala @@ -18,13 +18,17 @@ package org.apache.spark.deploy.master import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} +import org.apache.spark.resource.ResourceInformation private[master] class ExecutorDesc( val id: Int, val application: ApplicationInfo, val worker: WorkerInfo, val cores: Int, - val memory: Int) { + val memory: Int, + // resources(e.f. gpu/fpga) allocated to this executor + // map from resource name to ResourceInformation + val resources: Map[String, ResourceInformation]) { var state = ExecutorState.LAUNCHING diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 3c0a49e4ab20..676551985608 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -25,8 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, - ExecutorState, SparkHadoopUtil} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ @@ -38,6 +37,7 @@ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} +import org.apache.spark.resource.{ResourceRequirement, ResourceUtils} import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -244,7 +244,8 @@ private[deploy] class Master( System.exit(0) case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => + id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, + masterAddress, resources) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -252,8 +253,9 @@ private[deploy] class Master( } else if (idToWorker.contains(id)) { workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true)) } else { + val workerResources = resources.map(r => r._1 -> WorkerResourceInfo(r._1, r._2.addresses)) val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerWebUiUrl) + workerRef, workerWebUiUrl, workerResources) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false)) @@ -361,24 +363,31 @@ private[deploy] class Master( if (canCompleteRecovery) { completeRecovery() } - case WorkerSchedulerStateResponse(workerId, executors, driverIds) => + case WorkerSchedulerStateResponse(workerId, execResponses, driverResponses) => idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE - val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) + val validExecutors = execResponses.filter( + exec => idToApp.get(exec.desc.appId).isDefined) for (exec <- validExecutors) { - val app = idToApp(exec.appId) - val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) + val (execDesc, execResources) = (exec.desc, exec.resources) + val app = idToApp(execDesc.appId) + val execInfo = app.addExecutor( + worker, execDesc.cores, execResources, Some(execDesc.execId)) worker.addExecutor(execInfo) - execInfo.copyState(exec) + worker.recoverResources(execResources) + execInfo.copyState(execDesc) } - for (driverId <- driverIds) { + for (driver <- driverResponses) { + val (driverId, driverResource) = (driver.driverId, driver.resources) drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING + driver.withResources(driverResource) + worker.recoverResources(driverResource) worker.addDriver(driver) } } @@ -614,24 +623,34 @@ private[deploy] class Master( val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB + val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) /** Return whether the specified worker can launch an executor for this app. */ - def canLaunchExecutor(pos: Int): Boolean = { + def canLaunchExecutorForApp(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor + val assignedExecutorNum = assignedExecutors(pos) // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. - val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 + val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 if (launchingNewExecutor) { - val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + val assignedMemory = assignedExecutorNum * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor + val assignedResources = resourceReqsPerExecutor.map { + req => req.resourceName -> req.amount * assignedExecutorNum + }.toMap + val resourcesFree = usableWorkers(pos).resourcesFree.map { + case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) + } + val enoughResources = ResourceUtils.resourcesMeetRequirements( + resourcesFree, resourceReqsPerExecutor) val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit - keepScheduling && enoughCores && enoughMemory && underLimit + keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits @@ -641,11 +660,11 @@ private[deploy] class Master( // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application's limits - var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) + var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true - while (keepScheduling && canLaunchExecutor(pos)) { + while (keepScheduling && canLaunchExecutorForApp(pos)) { coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor @@ -666,7 +685,7 @@ private[deploy] class Master( } } } - freeWorkers = freeWorkers.filter(canLaunchExecutor) + freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) } assignedCores } @@ -683,9 +702,11 @@ private[deploy] class Master( if (app.coresLeft >= coresPerExecutor) { // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= coresPerExecutor) + .filter(canLaunchExecutor(_, app.desc)) .sortBy(_.coresFree).reverse + if (waitingApps.length == 1 && usableWorkers.isEmpty) { + logWarning(s"App ${app.id} requires more resource than any of Workers could have.") + } val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them @@ -715,12 +736,44 @@ private[deploy] class Master( val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { - val exec = app.addExecutor(worker, coresToAssign) + val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor) + val exec = app.addExecutor(worker, coresToAssign, allocated) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } + private def canLaunch( + worker: WorkerInfo, + memoryReq: Int, + coresReq: Int, + resourceRequirements: Seq[ResourceRequirement]) + : Boolean = { + val enoughMem = worker.memoryFree >= memoryReq + val enoughCores = worker.coresFree >= coresReq + val enoughResources = ResourceUtils.resourcesMeetRequirements( + worker.resourcesFree, resourceRequirements) + enoughMem && enoughCores && enoughResources + } + + /** + * @return whether the worker could launch the driver represented by DriverDescription + */ + private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): Boolean = { + canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs) + } + + /** + * @return whether the worker could launch the executor according to application's requirement + */ + private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = { + canLaunch( + worker, + desc.memoryPerExecutorMB, + desc.coresPerExecutor.getOrElse(1), + desc.resourceReqsPerExecutor) + } + /** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. @@ -738,17 +791,24 @@ private[deploy] class Master( // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false + var isClusterIdle = true var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) + isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty numWorkersVisited += 1 - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + if (canLaunchDriver(worker, driver.desc)) { + val allocated = worker.acquireResources(driver.desc.resourceReqs) + driver.withResources(allocated) launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } + if (!launched && isClusterIdle) { + logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.") + } } startExecutorsOnWorkers() } @@ -756,8 +816,8 @@ private[deploy] class Master( private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.endpoint.send(LaunchExecutor(masterUrl, - exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) + worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, + exec.application.desc, exec.cores, exec.memory, exec.resources)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) } @@ -1021,7 +1081,7 @@ private[deploy] class Master( logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) - worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) + worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources)) driver.state = DriverState.RUNNING } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c87d6e24b78c..d485db43c5f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -19,9 +19,24 @@ package org.apache.spark.deploy.master import scala.collection.mutable +import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils +private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String]) + extends ResourceAllocator(name, addresses) { + + def toResourceInformation(): ResourceInformation = { + new ResourceInformation(name, addresses.toArray) + } + + def acquire(amount: Int): ResourceInformation = { + val allocated = availableAddrs.take(amount) + acquire(allocated) + new ResourceInformation(name, allocated.toArray) + } +} + private[spark] class WorkerInfo( val id: String, val host: String, @@ -29,7 +44,9 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val endpoint: RpcEndpointRef, - val webUiAddress: String) + val webUiAddress: String, + val resources: Map[String, WorkerResourceInfo], + val pid: Int = 0) extends Serializable { Utils.checkHost(host) @@ -47,6 +64,11 @@ private[spark] class WorkerInfo( def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed + def resourcesFree: Map[String, Int] = { + resources.map { case (rName, rInfo) => + rName -> rInfo.availableAddrs.length + } + } private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() @@ -78,6 +100,7 @@ private[spark] class WorkerInfo( executors -= exec.fullId coresUsed -= exec.cores memoryUsed -= exec.memory + releaseResources(exec.resources) } } @@ -95,6 +118,7 @@ private[spark] class WorkerInfo( drivers -= driver.id memoryUsed -= driver.desc.mem coresUsed -= driver.desc.cores + releaseResources(driver.resources) } def setState(state: WorkerState.Value): Unit = { @@ -102,4 +126,36 @@ private[spark] class WorkerInfo( } def isAlive(): Boolean = this.state == WorkerState.ALIVE + + /** + * acquire specified amount resources for driver/executor from the worker + * @param resourceReqs the resources requirement from driver/executor + */ + def acquireResources(resourceReqs: Seq[ResourceRequirement]) + : Map[String, ResourceInformation] = { + resourceReqs.map { req => + val rName = req.resourceName + val amount = req.amount + rName -> resources(rName).acquire(amount) + }.toMap + } + + /** + * used during master recovery + */ + def recoverResources(expected: Map[String, ResourceInformation]): Unit = { + expected.foreach { case (rName, rInfo) => + resources(rName).acquire(rInfo.addresses) + } + } + + /** + * release resources to worker from the driver/executor + * @param allocated the resources which allocated to driver/executor previously + */ + private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = { + allocated.foreach { case (rName, rInfo) => + resources(rName).release(rInfo.addresses) + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index f912ed64c80b..c060ef9da8c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -25,6 +25,7 @@ import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ import org.apache.spark.internal.config import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -174,8 +175,11 @@ private[rest] class StandaloneSubmitRequestServlet( val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_DRIVER_PREFIX) new DriverDescription( - appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) + appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command, + driverResourceReqs) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 0c88119441ad..4934722c0d83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -28,10 +28,13 @@ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged +import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX} import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} @@ -47,7 +50,8 @@ private[deploy] class DriverRunner( val driverDesc: DriverDescription, val worker: RpcEndpointRef, val workerUrl: String, - val securityManager: SecurityManager) + val securityManager: SecurityManager, + val resources: Map[String, ResourceInformation] = Map.empty) extends Logging { @volatile private var process: Option[Process] = None @@ -171,6 +175,7 @@ private[deploy] class DriverRunner( private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) + val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl @@ -178,9 +183,12 @@ private[deploy] class DriverRunner( case other => other } + // config resource file for driver, which would be used to load resources when driver starts up + val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f => + Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty) // TODO: If we add ability to submit multiple jars they should also be added here - val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, - driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) + val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts), + securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 6f1484cee586..97939107f305 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -27,8 +27,11 @@ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_EXECUTOR_PREFIX import org.apache.spark.internal.config.UI._ +import org.apache.spark.resource.{ResourceInformation, ResourceUtils} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.util.logging.FileAppender @@ -54,7 +57,8 @@ private[deploy] class ExecutorRunner( val workerUrl: String, conf: SparkConf, val appLocalDirs: Seq[String], - @volatile var state: ExecutorState.Value) + @volatile var state: ExecutorState.Value, + val resources: Map[String, ResourceInformation] = Map.empty) extends Logging { private val fullId = appId + "/" + execId @@ -143,11 +147,14 @@ private[deploy] class ExecutorRunner( */ private def fetchAndRunExecutor() { try { + val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir) // Launch the process + val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f => + Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty) val subsOpts = appDesc.command.javaOpts.map { Utils.substituteAppNExecIds(_, appId, execId.toString) } - val subsCommand = appDesc.command.copy(javaOpts = subsOpts) + val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts) val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ac7a1b91db6b..899593dff95f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.worker -import java.io.File -import java.io.IOException +import java.io.{File, IOException} import java.text.SimpleDateFormat import java.util.{Date, Locale, UUID} import java.util.concurrent._ @@ -34,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging} @@ -44,7 +44,7 @@ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.ResourceInformation import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} +import org.apache.spark.util.{SignalUtils, SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -57,7 +57,8 @@ private[deploy] class Worker( val conf: SparkConf, val securityMgr: SecurityManager, resourceFileOpt: Option[String] = None, - externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null) + externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null, + pid: Int = Utils.getProcessId) extends ThreadSafeRpcEndpoint with Logging { private val host = rpcEnv.address.host @@ -180,7 +181,7 @@ private[deploy] class Worker( ) // visible for tests - private[deploy] var resources: Map[String, ResourceInformation] = _ + private[deploy] var resources: Map[String, ResourceInformation] = Map.empty var coresUsed = 0 var memoryUsed = 0 @@ -190,19 +191,8 @@ private[deploy] class Worker( private def createWorkDir() { workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) - try { - // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() - // So attempting to create and then check if directory was created or not. - workDir.mkdirs() - if ( !workDir.exists() || !workDir.isDirectory) { - logError("Failed to create work directory " + workDir) - System.exit(1) - } - assert (workDir.isDirectory) - } catch { - case e: Exception => - logError("Failed to create work directory " + workDir, e) - System.exit(1) + if (!Utils.createDirectory(workDir)) { + System.exit(1) } } @@ -214,6 +204,7 @@ private[deploy] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() startExternalShuffleService() + releaseResourcesOnInterrupt() setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -227,13 +218,29 @@ private[deploy] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } + /** + * Used to catch the TERM signal from sbin/stop-slave.sh and + * release resources before Worker exits + */ + private def releaseResourcesOnInterrupt(): Unit = { + SignalUtils.register("TERM") { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) + false + } + } + private def setupWorkerResources(): Unit = { try { - resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) + val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) + resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources, pid) + logResourceInfo(SPARK_WORKER_PREFIX, resources) } catch { case e: Exception => logError("Failed to setup worker resources: ", e) - System.exit(1) + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) + if (!Utils.isTesting) { + System.exit(1) + } } } @@ -349,6 +356,7 @@ private[deploy] class Worker( TimeUnit.SECONDS)) } } else { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) logError("All masters are unresponsive! Giving up.") System.exit(1) } @@ -405,7 +413,8 @@ private[deploy] class Worker( cores, memory, workerWebUiUrl, - masterEndpoint.address)) + masterEndpoint.address, + resources)) } private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { @@ -446,6 +455,7 @@ private[deploy] class Worker( case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) System.exit(1) } @@ -506,15 +516,20 @@ private[deploy] class Worker( logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) changeMaster(masterRef, masterWebUiUrl, masterRef.address) - val execs = executors.values. - map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) - masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) + val executorResponses = executors.values.map { e => + WorkerExecutorStateResponse(new ExecutorDescription( + e.appId, e.execId, e.cores, e.state), e.resources) + } + val driverResponses = drivers.keys.map { id => + WorkerDriverStateResponse(id, drivers(id).resources)} + masterRef.send(WorkerSchedulerStateResponse( + workerId, executorResponses.toList, driverResponses.toSeq)) case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") registerWithMaster() - case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { @@ -567,7 +582,8 @@ private[deploy] class Worker( workerUri, conf, appLocalDirs, - ExecutorState.LAUNCHING) + ExecutorState.LAUNCHING, + resources_) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -601,7 +617,7 @@ private[deploy] class Worker( } } - case LaunchDriver(driverId, driverDesc) => + case LaunchDriver(driverId, driverDesc, resources_) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, @@ -611,7 +627,8 @@ private[deploy] class Worker( driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, - securityMgr) + securityMgr, + resources_) drivers(driverId) = driver driver.start() @@ -701,6 +718,7 @@ private[deploy] class Worker( } override def onStop() { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) cleanupThreadExecutor.shutdownNow() metricsSystem.report() cancelLastRegistrationRetry() @@ -835,8 +853,9 @@ private[deploy] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL) + val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, - masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt)) + masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt, pid = pid)) rpcEnv } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 98e5aa6ec0c7..a42a928936a8 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -90,11 +90,13 @@ private[spark] class CoarseGrainedExecutorBackend( // visible for testing def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = { // only parse the resources if a task requires them - val resourceInfo = if (parseTaskResourceRequirements(env.conf).nonEmpty) { + val resourceInfo = if (parseResourceRequirements(env.conf, SPARK_TASK_PREFIX).nonEmpty) { val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt) if (resources.isEmpty) { throw new SparkException("User specified resources per task via: " + s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.") + } else { + logResourceInfo(SPARK_EXECUTOR_PREFIX, resources) } resources } else { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e0147218d3eb..214675b6cfd2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -36,6 +36,23 @@ package object config { private[spark] val SPARK_EXECUTOR_PREFIX = "spark.executor" private[spark] val SPARK_TASK_PREFIX = "spark.task" + private[spark] val SPARK_RESOURCES_COORDINATE = + ConfigBuilder("spark.resources.coordinate.enable") + .doc("Whether to coordinate resources automatically among workers/drivers(client only) " + + "in Standalone. If false, the user is responsible for configuring different resources " + + "for workers/drivers that run on the same host.") + .booleanConf + .createWithDefault(true) + + private[spark] val SPARK_RESOURCES_DIR = + ConfigBuilder("spark.resources.dir") + .doc("Directory used to coordinate resources among workers/drivers(client only) in " + + "Standalone. Default is SPARK_HOME. Make sure to use the same directory for worker " + + "and drivers in client mode that run on the same host. Don't clean up this directory " + + "while workers/drivers are still alive to avoid the most likely resources conflict. ") + .stringConf + .createOptional + private[spark] val DRIVER_RESOURCES_FILE = ConfigBuilder("spark.driver.resourcesFile") .internal() diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala new file mode 100644 index 000000000000..719f34db9e18 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.util.collection.OpenHashMap + +/** + * Class used to help executor/worker allocate resources + * Please note that this class is intended to be used in a single thread. + * @param name Resource name, e.g. gpu/fpga + * @param addresses Resource addresses provided by the executor/worker + */ +class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializable { + /** + * Map from an address to its availability, the value `true` means the address is available, + * while value `false` means the address is assigned. + * TODO Use [[OpenHashMap]] instead to gain better performance. + */ + private val addressAvailabilityMap = mutable.HashMap(addresses.map(_ -> true): _*) + + /** + * Sequence of currently available resource addresses. + */ + def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case (addr, available) => + if (available) Some(addr) else None + }.toSeq + + /** + * Sequence of currently assigned resource addresses. + * Exposed for testing only. + */ + private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap + .flatMap { case (addr, available) => + if (!available) Some(addr) else None + }.toSeq + + /** + * Acquire a sequence of resource addresses (to a launched task), these addresses must be + * available. When the task finishes, it will return the acquired resource addresses. + * Throw an Exception if an address is not available or doesn't exist. + */ + def acquire(addrs: Seq[String]): Unit = { + addrs.foreach { address => + if (!addressAvailabilityMap.contains(address)) { + throw new SparkException(s"Try to acquire an address that doesn't exist. $name address " + + s"$address doesn't exist.") + } + val isAvailable = addressAvailabilityMap(address) + if (isAvailable) { + addressAvailabilityMap(address) = false + } else { + throw new SparkException(s"Try to acquire an address that is not available. $name " + + s"address $address is not available.") + } + } + } + + /** + * Release a sequence of resource addresses, these addresses must have been assigned. Resource + * addresses are released when a task has finished. + * Throw an Exception if an address is not assigned or doesn't exist. + */ + def release(addrs: Seq[String]): Unit = { + addrs.foreach { address => + if (!addressAvailabilityMap.contains(address)) { + throw new SparkException(s"Try to release an address that doesn't exist. $name address " + + s"$address doesn't exist.") + } + val isAvailable = addressAvailabilityMap(address) + if (!isAvailable) { + addressAvailabilityMap(address) = true + } else { + throw new SparkException(s"Try to release an address that is not assigned. $name " + + s"address $address is not assigned.") + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 69265861a931..150ba09f77dd 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -27,7 +27,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.util.Utils.executeAndGetOutput /** @@ -48,7 +47,7 @@ private[spark] case class ResourceRequest( discoveryScript: Option[String], vendor: Option[String]) -private[spark] case class TaskResourceRequirement(resourceName: String, amount: Int) +private[spark] case class ResourceRequirement(resourceName: String, amount: Int) /** * Case class representing allocated resource addresses for a specific resource. @@ -62,7 +61,6 @@ private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[Stri } private[spark] object ResourceUtils extends Logging { - // config suffixes val DISCOVERY_SCRIPT = "discoveryScript" val VENDOR = "vendor" @@ -94,23 +92,39 @@ private[spark] object ResourceUtils extends Logging { } } - def parseTaskResourceRequirements(sparkConf: SparkConf): Seq[TaskResourceRequirement] = { - parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request => - TaskResourceRequirement(request.id.resourceName, request.amount) + def parseResourceRequirements(sparkConf: SparkConf, componentName: String) + : Seq[ResourceRequirement] = { + parseAllResourceRequests(sparkConf, componentName).map { request => + ResourceRequirement(request.id.resourceName, request.amount) + } + } + + def resourcesMeetRequirements( + resourcesFree: Map[String, Int], + resourceRequirements: Seq[ResourceRequirement]) + : Boolean = { + resourceRequirements.forall { req => + resourcesFree.getOrElse(req.resourceName, 0) >= req.amount } } - private def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { - implicit val formats = DefaultFormats + def withResourcesJson[T](resourcesFile: String)(extract: String => Seq[T]): Seq[T] = { val json = new String(Files.readAllBytes(Paths.get(resourcesFile))) try { - parse(json).extract[Seq[ResourceAllocation]] + extract(json) } catch { case NonFatal(e) => throw new SparkException(s"Error parsing resources file $resourcesFile", e) } } + def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { + withResourcesJson[ResourceAllocation](resourcesFile) { json => + implicit val formats = DefaultFormats + parse(json).extract[Seq[ResourceAllocation]] + } + } + private def parseAllocatedOrDiscoverResources( sparkConf: SparkConf, componentName: String, @@ -154,10 +168,14 @@ private[spark] object ResourceUtils extends Logging { val allocations = parseAllocatedOrDiscoverResources(sparkConf, componentName, resourcesFileOpt) assertAllResourceAllocationsMeetRequests(allocations, requests) val resourceInfoMap = allocations.map(a => (a.id.resourceName, a.toResourceInformation)).toMap + resourceInfoMap + } + + def logResourceInfo(componentName: String, resources: Map[String, ResourceInformation]) + : Unit = { logInfo("==============================================================") - logInfo(s"Resources for $componentName:\n${resourceInfoMap.mkString("\n")}") + logInfo(s"Resources for $componentName:\n${resources.mkString("\n")}") logInfo("==============================================================") - resourceInfoMap } // visible for test @@ -175,7 +193,7 @@ private[spark] object ResourceUtils extends Logging { "doesn't exist!") } } else { - throw new SparkException(s"User is expecting to use resource: $resourceName but " + + throw new SparkException(s"User is expecting to use resource: $resourceName, but " + "didn't specify a discovery script!") } if (!result.name.equals(resourceName)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index c75931d53b4b..f05281e50b05 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -17,85 +17,14 @@ package org.apache.spark.scheduler -import scala.collection.mutable - -import org.apache.spark.SparkException -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.resource.ResourceAllocator /** * Class to hold information about a type of Resource on an Executor. This information is managed * by SchedulerBackend, and TaskScheduler shall schedule tasks on idle Executors based on the * information. - * Please note that this class is intended to be used in a single thread. * @param name Resource name * @param addresses Resource addresses provided by the executor */ -private[spark] class ExecutorResourceInfo( - val name: String, - addresses: Seq[String]) extends Serializable { - - /** - * Map from an address to its availability, the value `true` means the address is available, - * while value `false` means the address is assigned. - * TODO Use [[OpenHashMap]] instead to gain better performance. - */ - private val addressAvailabilityMap = mutable.HashMap(addresses.map(_ -> true): _*) - - /** - * Sequence of currently available resource addresses. - */ - def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case (addr, available) => - if (available) Some(addr) else None - }.toSeq - - /** - * Sequence of currently assigned resource addresses. - * Exposed for testing only. - */ - private[scheduler] def assignedAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - if (!available) Some(addr) else None - }.toSeq - - /** - * Acquire a sequence of resource addresses (to a launched task), these addresses must be - * available. When the task finishes, it will return the acquired resource addresses. - * Throw an Exception if an address is not available or doesn't exist. - */ - def acquire(addrs: Seq[String]): Unit = { - addrs.foreach { address => - if (!addressAvailabilityMap.contains(address)) { - throw new SparkException(s"Try to acquire an address that doesn't exist. $name address " + - s"$address doesn't exist.") - } - val isAvailable = addressAvailabilityMap(address) - if (isAvailable) { - addressAvailabilityMap(address) = false - } else { - throw new SparkException(s"Try to acquire an address that is not available. $name " + - s"address $address is not available.") - } - } - } - - /** - * Release a sequence of resource addresses, these addresses must have been assigned. Resource - * addresses are released when a task has finished. - * Throw an Exception if an address is not assigned or doesn't exist. - */ - def release(addrs: Seq[String]): Unit = { - addrs.foreach { address => - if (!addressAvailabilityMap.contains(address)) { - throw new SparkException(s"Try to release an address that doesn't exist. $name address " + - s"$address doesn't exist.") - } - val isAvailable = addressAvailabilityMap(address) - if (!isAvailable) { - addressAvailabilityMap(address) = true - } else { - throw new SparkException(s"Try to release an address that is not assigned. $name " + - s"address $address is not assigned.") - } - } - } -} +private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String]) + extends ResourceAllocator(name, addresses) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2e3e0a2f3407..1496dff31a4d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -94,7 +94,7 @@ private[spark] class TaskSchedulerImpl( val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // Resources to request per task - val resourcesReqsPerTask = ResourceUtils.parseTaskResourceRequirements(sc.conf) + val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. Protected by `this` @@ -383,9 +383,8 @@ private[spark] class TaskSchedulerImpl( * Check whether the resources from the WorkerOffer are enough to run at least one task. */ private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = { - resourcesReqsPerTask.forall { req => - resources.contains(req.resourceName) && resources(req.resourceName).size >= req.amount - } + val resourcesFree = resources.map(r => r._1 -> r._2.length) + ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index e0605fee9cbf..2025a7dc2482 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientL import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -112,8 +113,11 @@ private[spark] class StandaloneSchedulerBackend( } else { None } + val executorResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_EXECUTOR_PREFIX) val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) + webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit, + resourceReqsPerExecutor = executorResourceReqs) client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3ad67f44afa6..9c1f21fa236b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -269,6 +269,26 @@ private[spark] object Utils extends Logging { file.setExecutable(true, true) } + /** + * Create a directory given the abstract pathname + * @return true, if the directory is successfully created; otherwise, return false. + */ + def createDirectory(dir: File): Boolean = { + try { + // This sporadically fails - not sure why ... !dir.exists() && !dir.mkdirs() + // So attempting to create and then check if directory was created or not. + dir.mkdirs() + if ( !dir.exists() || !dir.isDirectory) { + logError(s"Failed to create directory " + dir) + } + dir.isDirectory + } catch { + case e: Exception => + logError(s"Failed to create directory " + dir, e) + false + } + } + /** * Create a directory inside the given parent directory. The directory is guaranteed to be * newly created, and is not marked for automatic deletion. @@ -2554,6 +2574,28 @@ private[spark] object Utils extends Logging { new File(path.getAbsolutePath + "." + UUID.randomUUID()) } + /** + * Given a process id, return true if the process is still running. + */ + def isProcessRunning(pid: Int): Boolean = { + val process = executeCommand(Seq("kill", "-0", pid.toString)) + process.waitFor(10, TimeUnit.SECONDS) + process.exitValue() == 0 + } + + /** + * Returns the pid of this JVM process. + */ + def getProcessId: Int = { + val PROCESS = "(\\d+)@(.*)".r + val name = getProcessName() + name match { + case PROCESS(pid, _) => pid.toInt + case _ => + throw new SparkException(s"Unexpected process name: $name, expected to be PID@hostname.") + } + } + /** * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), * this is formatted as PID@hostname. diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 202b85dcf569..9f00131c8dc2 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -440,7 +440,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set(TASK_GPU_ID.amountConf, "2") conf.set(TASK_FPGA_ID.amountConf, "1") var taskResourceRequirement = - parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap + parseResourceRequirements(conf, SPARK_TASK_PREFIX) + .map(req => (req.resourceName, req.amount)).toMap assert(taskResourceRequirement.size == 2) assert(taskResourceRequirement(GPU) == 2) @@ -450,7 +451,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst // Ignore invalid prefix conf.set(ResourceID("spark.invalid.prefix", FPGA).amountConf, "1") taskResourceRequirement = - parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap + parseResourceRequirements(conf, SPARK_TASK_PREFIX) + .map(req => (req.resourceName, req.amount)).toMap assert(taskResourceRequirement.size == 1) assert(taskResourceRequirement.get(FPGA).isEmpty) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index fed3ae35ee0e..c1402bd2915a 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -756,7 +756,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val conf = new SparkConf() .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") - conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.amountConf, "2") conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) sc = new SparkContext(conf) @@ -783,7 +783,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .set(DRIVER_RESOURCES_FILE, resourcesFile) .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") - conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.amountConf, "3") conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) sc = new SparkContext(conf) @@ -850,26 +850,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assume(!(Utils.isWindows)) withTempDir { dir => val discoveryScript = createTempScriptWithExpectedOutput(dir, "resourceDiscoveryScript", - """{"name": "gpu","addresses":["0", "1", "2"]}""") + """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", "8"]}""") val conf = new SparkConf() .setMaster("local-cluster[3, 3, 1024]") .setAppName("test-cluster") - conf.set(TASK_GPU_ID.amountConf, "1") + conf.set(WORKER_GPU_ID.amountConf, "3") + conf.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + conf.set(TASK_GPU_ID.amountConf, "3") conf.set(EXECUTOR_GPU_ID.amountConf, "3") - conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, discoveryScript) sc = new SparkContext(conf) // Ensure all executors has started TestUtils.waitUntilExecutorsUp(sc, 3, 60000) - val rdd = sc.makeRDD(1 to 10, 9).mapPartitions { it => + val rdd = sc.makeRDD(1 to 10, 3).mapPartitions { it => val context = TaskContext.get() context.resources().get(GPU).get.addresses.iterator } val gpus = rdd.collect() - assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2")) + assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8")) eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 784981ef99cd..a2c466931f0c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.File -import java.util.Date import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} @@ -50,7 +49,8 @@ private[deploy] object DeployTestUtils { createDriverDesc(), JsonConstants.submitDate) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, + "http://publicAddress:80", Map.empty) workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index f19e99894644..9ce046a2e2f5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -42,6 +42,8 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ +import org.apache.spark.resource.{ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer @@ -68,17 +70,23 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend }) } - val appDesc = DeployTestUtils.createAppDesc() + var appDesc = DeployTestUtils.createAppDesc() val drivers = mutable.HashSet[String]() + val driverResources = new mutable.HashMap[String, Map[String, Set[String]]] + val execResources = new mutable.HashMap[String, Map[String, Set[String]]] override def receive: PartialFunction[Any, Unit] = { case RegisteredWorker(masterRef, _, _, _) => masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq)) - case LaunchDriver(driverId, desc) => + case LaunchExecutor(_, appId, execId, _, _, _, resources_) => + execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet)) + case LaunchDriver(driverId, desc, resources_) => drivers += driverId + driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet)) master.send(RegisterApplication(appDesc, newDriver(driverId))) case KillDriver(driverId) => master.send(DriverStateChanged(driverId, DriverState.KILLED, None)) drivers -= driverId + driverResources.remove(driverId) driverIdToAppId.get(driverId) match { case Some(appId) => apps.remove(appId) @@ -93,7 +101,7 @@ class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new extends MockWorker(master, conf) { var failedCnt = 0 override def receive: PartialFunction[Any, Unit] = { - case LaunchExecutor(_, appId, execId, _, _, _) => + case LaunchExecutor(_, appId, execId, _, _, _, _) => failedCnt += 1 master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) case otherMsg => super.receive(otherMsg) @@ -167,7 +175,8 @@ class MasterSuite extends SparkFunSuite cores = 0, memory = 0, endpoint = null, - webUiAddress = "http://localhost:80" + webUiAddress = "http://localhost:80", + Map.empty ) val (rpcEnv, _, _) = @@ -248,9 +257,12 @@ class MasterSuite extends SparkFunSuite // Application state should be WAITING when "MasterChangeAcknowledged" event executed. fakeAppInfo.state should be(ApplicationState.WAITING) } - - master.self.send( - WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id))) + val execResponse = fakeExecutors.map(exec => + WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation])) + val driverResponse = WorkerDriverStateResponse( + fakeDriverInfo.id, Map.empty[String, ResourceInformation]) + master.self.send(WorkerSchedulerStateResponse( + fakeWorkerInfo.id, execResponse, Seq(driverResponse))) eventually(timeout(5.seconds), interval(100.milliseconds)) { getState(master) should be(RecoveryState.ALIVE) @@ -545,6 +557,16 @@ class MasterSuite extends SparkFunSuite _master } + def makeAliveMaster(conf: SparkConf = new SparkConf): Master = { + val master = makeMaster(conf) + master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") + } + master + } + private def makeAppInfo( memoryPerExecutorMb: Int, coresPerExecutor: Option[Int] = None, @@ -563,7 +585,8 @@ class MasterSuite extends SparkFunSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, "http://localhost:80") + new WorkerInfo(workerId, "host", 100, cores, memoryMb, + endpointRef, "http://localhost:80", Map.empty) } private def scheduleExecutorsOnWorkers( @@ -575,13 +598,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]() val killedDrivers = new ConcurrentLinkedQueue[String]() val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint { @@ -614,13 +631,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-20529: Master should reply the address received from worker") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() @volatile var receivedMasterAddress: RpcAddress = null val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint { override val rpcEnv: RpcEnv = master.rpcEnv @@ -647,13 +658,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() var worker: MockExecutorLaunchFailWorker = null try { worker = new MockExecutorLaunchFailWorker(master.self) @@ -697,12 +702,7 @@ class MasterSuite extends SparkFunSuite test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { val conf = new SparkConf().set(WORKER_TIMEOUT, 1L) - val master = makeMaster(conf) - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } + val master = makeAliveMaster(conf) var worker1: MockWorker = null var worker2: MockWorker = null try { @@ -770,6 +770,95 @@ class MasterSuite extends SparkFunSuite } } + test("assign/recycle resources to/from driver") { + val master = makeAliveMaster() + val masterRef = master.self + val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3)) + val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs) + val driverId = masterRef.askSync[SubmitDriverResponse]( + RequestSubmitDriver(driver)).driverId.get + var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) + assert(status.state === Some(DriverState.SUBMITTED)) + val worker = new MockWorker(masterRef) + worker.rpcEnv.setupEndpoint(s"worker", worker) + val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")), + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3"))) + val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024, + "http://localhost:8080", RpcAddress("localhost", 10000), resources) + masterRef.send(regMsg) + eventually(timeout(10.seconds)) { + status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) + assert(status.state === Some(DriverState.RUNNING)) + assert(worker.drivers.head === driverId) + assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"), + FPGA -> Set("f1", "f2", "f3"))) + val workerResources = master.workers.head.resources + assert(workerResources(GPU).availableAddrs.length === 0) + assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2")) + assert(workerResources(FPGA).availableAddrs.length === 0) + assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3")) + } + val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None) + masterRef.send(driverFinished) + eventually(timeout(10.seconds)) { + val workerResources = master.workers.head.resources + assert(workerResources(GPU).availableAddrs.length === 3) + assert(workerResources(GPU).assignedAddrs.toSet === Set()) + assert(workerResources(FPGA).availableAddrs.length === 3) + assert(workerResources(FPGA).assignedAddrs.toSet === Set()) + } + } + + test("assign/recycle resources to/from executor") { + + def makeWorkerAndRegister( + master: RpcEndpointRef, + workerResourceReqs: Map[String, Int] = Map.empty) + : MockWorker = { + val worker = new MockWorker(master) + worker.rpcEnv.setupEndpoint(s"worker", worker) + val resources = workerResourceReqs.map { case (rName, amount) => + val shortName = rName.charAt(0) + val addresses = (0 until amount).map(i => s"$shortName$i").toArray + rName -> new ResourceInformation(rName, addresses) + } + val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048, + "http://localhost:8080", RpcAddress("localhost", 10000), resources) + master.send(reg) + worker + } + + val master = makeAliveMaster() + val masterRef = master.self + val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3)) + val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6)) + worker.appDesc = worker.appDesc.copy(resourceReqsPerExecutor = resourceReqs) + val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs) + val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId + val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get)) + assert(status.state === Some(DriverState.RUNNING)) + val workerResources = master.workers.head.resources + eventually(timeout(10.seconds)) { + assert(workerResources(GPU).availableAddrs.length === 0) + assert(workerResources(FPGA).availableAddrs.length === 0) + assert(worker.driverResources.size === 1) + assert(worker.execResources.size === 1) + val driverResources = worker.driverResources.head._2 + val execResources = worker.execResources.head._2 + val gpuAddrs = driverResources(GPU).union(execResources(GPU)) + val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA)) + assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5")) + assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5")) + } + val appId = worker.apps.head._1 + masterRef.send(UnregisterApplication(appId)) + masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None)) + eventually(timeout(10.seconds)) { + assert(workerResources(GPU).availableAddrs.length === 6) + assert(workerResources(FPGA).availableAddrs.length === 6) + } + } + private def getDrivers(master: Master): HashSet[DriverInfo] = { master.invokePrivate(_drivers()) } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 3d8a46bd02e1..39607621b4c4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -86,7 +86,8 @@ class PersistenceEngineSuite extends SparkFunSuite { cores = 0, memory = 0, endpoint = workerEndpoint, - webUiAddress = "http://localhost:80") + webUiAddress = "http://localhost:80", + Map.empty) persistenceEngine.addWorker(workerToPersist) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 37e5fbcca46d..bb541b4cad8b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -23,6 +23,7 @@ import java.util.function.Supplier import scala.concurrent.duration._ +import org.json4s.{DefaultFormats, Extraction} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS import org.mockito.ArgumentMatchers.any @@ -32,11 +33,16 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.TestUtils.{createTempJsonFile, createTempScriptWithExpectedOutput} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup} +import org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE, SPARK_RESOURCES_COORDINATE_DIR} import org.apache.spark.deploy.master.DriverState import org.apache.spark.internal.config import org.apache.spark.internal.config.Worker._ +import org.apache.spark.resource.{ResourceAllocation, ResourceInformation} +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID} import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.util.Utils @@ -51,17 +57,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts) + implicit val formats = DefaultFormats + private var _worker: Worker = _ private def makeWorker( - conf: SparkConf, - shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker = { + conf: SparkConf = new SparkConf(), + shuffleServiceSupplier: Supplier[ExternalShuffleService] = null, + pid: Int = Utils.getProcessId, + local: Boolean = false): Worker = { assert(_worker === null, "Some Worker's RpcEnv is leaked in tests") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr) - _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)), - "Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier) - _worker + val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE) + val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5, + Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp", + conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid) + if (local) { + localWorker + } else { + _worker = localWorker + _worker + } + } + + private def assertResourcesFileDeleted(): Unit = { + assert(sys.props.contains("spark.test.home")) + val sparkHome = sys.props.get("spark.test.home") + val resourceFile = new File(sparkHome + "/" + SPARK_RESOURCES_COORDINATE_DIR, + ALLOCATED_RESOURCES_FILE) + assert(!resourceFile.exists()) } before { @@ -218,6 +243,141 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } } + test("worker could be launched without any resources") { + val worker = makeWorker() + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map.empty) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + + test("worker could load resources from resources file while launching") { + val conf = new SparkConf() + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1")) + val fpgaArgs = + ResourceAllocation(WORKER_FPGA_ID, Seq("f1", "f2", "f3")) + val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs)) + val f1 = createTempJsonFile(dir, "resources", ja) + conf.set(SPARK_WORKER_RESOURCE_FILE.key, f1) + conf.set(WORKER_GPU_ID.amountConf, "2") + conf.set(WORKER_FPGA_ID.amountConf, "3") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> fpgaArgs.toResourceInformation)) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("worker could load resources from discovery script while launching") { + val conf = new SparkConf() + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "3") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(FPGA -> + new ResourceInformation(FPGA, Array("f1", "f2", "f3")))) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("worker could load resources from resources file and discovery script while launching") { + val conf = new SparkConf() + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val resourcesPath = createTempJsonFile(dir, "resources", ja) + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath) + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "3") + conf.set(WORKER_GPU_ID.amountConf, "2") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("Workers run on the same host should avoid resources conflict when coordinate is on") { + val conf = new SparkConf() + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "2") + val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) + workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} + eventually(timeout(20.seconds)) { + val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) + assert(empty.length === 1) + assert(nonEmpty.length === 2) + val totalResources = nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted + assert(totalResources === Seq("f1", "f2", "f3", "f4")) + workers.foreach(_.rpcEnv.shutdown()) + workers.foreach(_.rpcEnv.awaitTermination()) + } + assertResourcesFileDeleted() + } + } + + test("Workers run on the same host should load resources naively when coordinate is off") { + val conf = new SparkConf() + // disable coordination + conf.set(config.SPARK_RESOURCES_COORDINATE, false) + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val resourcesPath = createTempJsonFile(dir, "resources", ja) + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") + conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath) + conf.set(WORKER_GPU_ID.amountConf, "2") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "2") + val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) + workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} + eventually(timeout(20.seconds)) { + val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) + assert(empty.length === 0) + assert(nonEmpty.length === 3) + // Each Worker should get the same resources from resources file and discovery script + // without coordination. Note that, normally, we must config different resources + // for workers run on the same host when coordinate config is off. Test here is used + // to validate the different behaviour comparing to the above test when coordinate config + // is on, so we admit the resources collision here. + nonEmpty.foreach { worker => + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3", "f4", "f5")))) + } + workers.foreach(_.rpcEnv.shutdown()) + workers.foreach(_.rpcEnv.awaitTermination()) + } + assertResourcesFileDeleted() + } + } + test("cleanup non-shuffle files after executor exits when config " + "spark.storage.cleanupFilesAfterExecutorExit=true") { testCleanupFilesWithConfig(true) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 693b0ee1778b..64d99a59b919 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -157,7 +157,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("User is expecting to use resource: gpu but didn't specify a " + + assert(error.contains("User is expecting to use resource: gpu, but didn't specify a " + "discovery script!")) } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 51a92e0a50f2..c2ecc96db906 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -253,7 +253,7 @@ class ResourceUtilsSuite extends SparkFunSuite discoverResource(request) }.getMessage() - assert(error.contains("User is expecting to use resource: gpu but " + + assert(error.contains("User is expecting to use resource: gpu, but " + "didn't specify a discovery script!")) } } diff --git a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala index 6d2c07d89f5b..c4509e93104d 100644 --- a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala +++ b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala @@ -18,14 +18,18 @@ package org.apache.spark.resource import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX, SPARK_TASK_PREFIX} +import org.apache.spark.internal.config.Worker.SPARK_WORKER_PREFIX import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} object TestResourceIDs { val DRIVER_GPU_ID = ResourceID(SPARK_DRIVER_PREFIX, GPU) val EXECUTOR_GPU_ID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU) val TASK_GPU_ID = ResourceID(SPARK_TASK_PREFIX, GPU) + val WORKER_GPU_ID = ResourceID(SPARK_WORKER_PREFIX, GPU) val DRIVER_FPGA_ID = ResourceID(SPARK_DRIVER_PREFIX, FPGA) val EXECUTOR_FPGA_ID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA) val TASK_FPGA_ID = ResourceID(SPARK_TASK_PREFIX, FPGA) + val WORKER_FPGA_ID = ResourceID(SPARK_WORKER_PREFIX, FPGA) + } diff --git a/docs/configuration.md b/docs/configuration.md index 57a53218f36e..84545475ae33 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -193,6 +193,25 @@ of the most common options to set are: and spark.driver.memory. + + spark.resources.coordinate.enable + true + + Whether to coordinate resources automatically among workers/drivers(client only) + in Standalone. If false, the user is responsible for configuring different resources + for workers/drivers that run on the same host. + + + + spark.resources.dir + SPARK_HOME + + Directory used to coordinate resources among workers/drivers(client only) in Standalone. + Default is SPARK_HOME. Make sure to use the same directory for worker and drivers in + client mode that run on the same host. Don't clean up this directory while workers/drivers + are still alive to avoid the most likely resources conflict. + + spark.driver.resource.{resourceName}.amount 0 @@ -209,7 +228,9 @@ of the most common options to set are: A script for the driver to run to discover a particular resource type. This should write to STDOUT a JSON string in the format of the ResourceInformation class. This has a - name and an array of addresses. + name and an array of addresses. For a client-submitted driver in Standalone, discovery + script must assign different resource addresses to this driver comparing to workers' and + other dirvers' when spark.resources.coordinate.enable is off. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 2ca3ee6aa721..bc77469b6664 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -243,6 +243,37 @@ SPARK_MASTER_OPTS supports the following system properties: receives no heartbeats. + + spark.worker.resource.{resourceName}.amount + (none) + + Amount of a particular resource to use on the worker. + + + + spark.worker.resource.{resourceName}.discoveryScript + (none) + + Path to resource discovery script, which is used to find a particular resource while worker starting up. + And the output of the script should be formatted like the ResourceInformation class. + When spark.resources.coordinate.enable is off, the discovery script must assign different + resources for workers and drivers in client mode that run on the same host to avoid resource conflict. + + + + spark.worker.resourcesFile + (none) + + Path to resources file which is used to find various resources while worker starting up. + The content of resources file should be formatted like + [[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]. + When spark.resources.coordinate.enable is off, resources file must assign different + resources for workers and drivers in client mode that run on the same host to avoid resource conflict. + If a particular resource is not found in the resources file, the discovery script would be used to + find that resource. If the discovery script also does not find the resources, the worker will fail + to start up. + + SPARK_WORKER_OPTS supports the following system properties: diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index bcd5d06c1b67..3f3150b0bd4e 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -273,7 +273,8 @@ def setUp(self): self.tempFile.close() os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH) - conf = SparkConf().set("spark.driver.resource.gpu.amount", "1") + conf = SparkConf().set("spark.test.home", SPARK_HOME) + conf = conf.set("spark.driver.resource.gpu.amount", "1") conf = conf.set("spark.driver.resource.gpu.discoveryScript", self.tempFile.name) self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf) diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index 66357b61c79e..66c5f9f3c2fd 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -23,7 +23,7 @@ import unittest from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext -from pyspark.testing.utils import PySparkTestCase +from pyspark.testing.utils import PySparkTestCase, SPARK_HOME class TaskContextTests(PySparkTestCase): @@ -194,9 +194,11 @@ def setUp(self): self.tempFile.close() os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH) - conf = SparkConf().set("spark.task.resource.gpu.amount", "1") + conf = SparkConf().set("spark.test.home", SPARK_HOME) + conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name) + conf = conf.set("spark.worker.resource.gpu.amount", 1) + conf = conf.set("spark.task.resource.gpu.amount", "1") conf = conf.set("spark.executor.resource.gpu.amount", "1") - conf = conf.set("spark.executor.resource.gpu.discoveryScript", self.tempFile.name) self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf) def test_resources(self):