diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala new file mode 100644 index 000000000000..f7343869744a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils.executeAndGetOutput + +/** + * Discovers resources (GPUs/FPGAs/etc). + * This class find resources by running and parses the output of the user specified script + * from the config spark.{driver/executor}.{resourceType}.discoveryScript. + * The output of the script it runs is expected to be a String that is in the format of + * count:unit:comma-separated list of addresses, where the list of addresses is + * specific for that resource type. The user is responsible for interpreting the address. + */ +private[spark] object ResourceDiscoverer extends Logging { + + def findResources(sparkconf: SparkConf, isDriver: Boolean): Map[String, ResourceInformation] = { + val prefix = if (isDriver) { + SPARK_DRIVER_RESOURCE_PREFIX + } else { + SPARK_EXECUTOR_RESOURCE_PREFIX + } + // get unique resource types + val resourceTypes = sparkconf.getAllWithPrefix(prefix).map(x => x._1.split('.')(0)).toSet + resourceTypes.map{ rtype => { + val rInfo = getResourceAddrsForType(sparkconf, prefix, rtype) + (rtype -> rInfo) + }}.toMap + } + + private def getResourceAddrsForType( + sparkconf: SparkConf, + prefix: String, + resourceType: String): ResourceInformation = { + val discoveryConf = prefix + resourceType + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX + val script = sparkconf.getOption(discoveryConf) + val result = if (script.nonEmpty) { + val scriptFile = new File(script.get) + // check that script exists and try to execute + if (scriptFile.exists()) { + try { + val output = executeAndGetOutput(Seq(script.get), new File(".")) + parseResourceTypeString(resourceType, output) + } catch { + case e @ (_: SparkException | _: NumberFormatException) => + throw new SparkException(s"Error running the resource discovery script: $scriptFile" + + s" for $resourceType", e) + } + } else { + throw new SparkException(s"Resource script: $scriptFile to discover $resourceType" + + s" doesn't exist!") + } + } else { + throw new SparkException(s"User is expecting to use $resourceType resources but " + + s"didn't specify a script via conf: $discoveryConf, to find them!") + } + result + } + + // this parses a resource information string in the format: + // count:unit:comma-separated list of addresses + // The units and addresses are optional. The idea being if the user has something like + // memory you don't have addresses to assign out. + def parseResourceTypeString(rtype: String, rInfoStr: String): ResourceInformation = { + // format should be: count:unit:addr1,addr2,addr3 + val singleResourceType = rInfoStr.split(':') + if (singleResourceType.size < 3) { + throw new SparkException("Format of the resourceAddrs parameter is invalid," + + " please specify all of count, unit, and addresses in the format:" + + " count:unit:addr1,addr2,addr3") + } + // format should be: addr1,addr2,addr3 + val splitAddrs = singleResourceType(2).split(',').map(_.trim()) + val retAddrs = if (splitAddrs.size == 1 && splitAddrs(0).isEmpty()) { + Array.empty[String] + } else { + splitAddrs + } + new ResourceInformation(rtype, singleResourceType(1), singleResourceType(0).toLong, retAddrs) + } +} diff --git a/core/src/main/scala/org/apache/spark/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/ResourceInformation.scala new file mode 100644 index 000000000000..3a749695a313 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ResourceInformation.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.Evolving + +/** + * Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc. + * The array of addresses are resource specific and its up to the user to interpret the address. + * The units and addresses could be empty if they doesn't apply to that resource. + * + * One example is GPUs, where the addresses would be the indices of the GPUs, the count would be the + * number of GPUs and the units would be an empty string. + * + * @param name the name of the resource + * @param units the units of the resources, can be an empty string if units don't apply + * @param count the number of resources available + * @param addresses an optional array of strings describing the addresses of the resource + */ +@Evolving +case class ResourceInformation( + private val name: String, + private val units: String, + private val count: Long, + private val addresses: Array[String] = Array.empty) { + + def getName(): String = name + def getUnits(): String = units + def getCount(): Long = count + def getAddresses(): Array[String] = addresses +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 645f58716de6..860409588ba0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,6 +31,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -44,7 +45,8 @@ private[spark] class CoarseGrainedExecutorBackend( hostname: String, cores: Int, userClassPath: Seq[URL], - env: SparkEnv) + env: SparkEnv, + resourceAddrs: Option[String]) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private[this] val stopping = new AtomicBoolean(false) @@ -61,7 +63,7 @@ private[spark] class CoarseGrainedExecutorBackend( // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, - extractAttributes)) + extractAttributes, parseResources(resourceAddrs))) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => @@ -71,6 +73,46 @@ private[spark] class CoarseGrainedExecutorBackend( }(ThreadUtils.sameThread) } + // visible for testing + def parseResources(resourceAddrsArg: Option[String]): Map[String, ResourceInformation] = { + // only parse the resources if a task requires them + val taskConfPrefix = SPARK_TASK_RESOURCE_PREFIX + val resourceInfo = if (env.conf.getAllWithPrefix(taskConfPrefix).size > 0) { + val resources = resourceAddrsArg.map(resourceStr => { + // format here would be: + // resourceType=count:unit:addr1,addr2,addr3;resourceType2=count:unit:r2addr1,r2addr2, + // first separate out resource types + val allResourceTypes = resourceStr.split(';').map(_.trim()).map( eachResource => { + // format here should be: resourceType=count:unit:addr1,addr2,addr3 + val typeAndValue = eachResource.split('=').map(_.trim) + if (typeAndValue.size < 2) { + throw new SparkException("Format of the resourceAddrs parameter is invalid," + + " please specify both resource type and the count:unit:addresses: " + + "--resourceAddrs ") + } + val resType = typeAndValue(0) + // format should be: count:unit:addr1,addr2,addr3 + val singleResourceInfo = + ResourceDiscoverer.parseResourceTypeString(resType, typeAndValue(1)) + (resType, singleResourceInfo) + }).toMap + allResourceTypes + }).getOrElse(ResourceDiscoverer.findResources(env.conf, false)) + + if (resources.size == 0) { + throw new SparkException(s"User specified resources per task via: $taskConfPrefix," + + s" but can't find any resources available on the executor.") + } + logInfo(s"Executor ${executorId} using resources: ${resources.values}") + // todo - add logDebug with full output? + resources + } else { + Map.empty[String, ResourceInformation] + } + resourceInfo + } + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -188,13 +230,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: mutable.ListBuffer[URL]) + userClassPath: mutable.ListBuffer[URL], + resourceAddrs: Option[String]) def main(args: Array[String]): Unit = { val createFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env) + arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourceAddrs) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -255,6 +298,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var executorId: String = null var hostname: String = null var cores: Int = 0 + var resourceAddrs: Option[String] = None var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -274,6 +318,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--cores") :: value :: tail => cores = value.toInt argv = tail + case ("--resourceAddrs") :: value :: tail => + resourceAddrs = Some(value) + argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail @@ -299,7 +346,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl, - userClassPath) + userClassPath, resourceAddrs) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -313,6 +360,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --executor-id | --hostname | --cores + | --resourceAddrs | --app-id | --worker-url | --user-class-path diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0bd46bef35d2..93582eace4f2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -30,6 +30,13 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_ package object config { + private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource." + private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource." + private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource." + + private[spark] val SPARK_RESOURCE_COUNT_POSTFIX = ".count" + private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX = ".discoveryScript" + private[spark] val DRIVER_CLASS_PATH = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index afb48a31754f..89425e702677 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import org.apache.spark.ResourceInformation import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason @@ -64,7 +65,8 @@ private[spark] object CoarseGrainedClusterMessages { hostname: String, cores: Int, logUrls: Map[String, String], - attributes: Map[String, String]) + attributes: Map[String, String], + resources: Map[String, ResourceInformation]) extends CoarseGrainedClusterMessage case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4830d0e6f800..f7cf212d0bfe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -185,7 +185,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) => + case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, + attributes, resources) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 061aeb366cf5..dc4f4b4c66d9 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -174,9 +174,11 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty, + Map.empty)) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty, + Map.empty)) heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala new file mode 100644 index 000000000000..569c415bf24f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files => JavaFiles} +import java.nio.file.attribute.PosixFilePermission._ +import java.util.EnumSet + +import com.google.common.io.Files + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + + +class ResourceDiscovererSuite extends SparkFunSuite + with LocalSparkContext { + + test("Resource discoverer no resources") { + val sparkconf = new SparkConf + val resources = ResourceDiscoverer.findResources(sparkconf, false) + assert(resources.size === 0) + assert(resources.get("gpu").isEmpty, + "Should have a gpus entry that is empty") + } + + test("Resource discoverer multiple gpus") { + val sparkconf = new SparkConf + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val file1 = new File(dir, "resourceDiscoverScript1") + Files.write("echo 2::0,1", file1, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file1.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + val resources = ResourceDiscoverer.findResources(sparkconf, false) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.getCount() == 2, "Should have 2") + assert(gpuValue.get.getName() == "gpu", "name should be gpu") + assert(gpuValue.get.getUnits() == "", "units should be empty") + assert(gpuValue.get.getAddresses().size == 2, "Should have 2 indexes") + assert(gpuValue.get.getAddresses().deep == Array("0", "1").deep, "should have 0,1 entries") + } + } + + test("Resource discoverer no addresses") { + val sparkconf = new SparkConf + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val file1 = new File(dir, "resourceDiscoverScript1") + Files.write("echo 2::", file1, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file1.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + val resources = ResourceDiscoverer.findResources(sparkconf, false) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.getCount() == 2, "Should have 2") + assert(gpuValue.get.getName() == "gpu", "name should be gpu") + assert(gpuValue.get.getUnits() == "", "units should be empty") + assert(gpuValue.get.getAddresses().size == 0, "Should have 0 indexes") + } + } + + test("Resource discoverer no count") { + val sparkconf = new SparkConf + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val file1 = new File(dir, "resourceDiscoverScript1") + Files.write("echo ::0,1", file1, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file1.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, false) + }.getMessage() + + assert(error.contains("Error running the resource discovery")) + } + } + + + test("Resource discoverer multiple resource types") { + val sparkconf = new SparkConf + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val gpuDiscovery = new File(dir, "resourceDiscoverScriptgpu") + Files.write("echo 2::0,1", gpuDiscovery, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(gpuDiscovery.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + + val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga") + Files.write("echo 3:mb:f1,f2,f3", fpgaDiscovery, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery.getPath()) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath()) + val resources = ResourceDiscoverer.findResources(sparkconf, false) + assert(resources.size === 2) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.getCount() == 2, "Should have 2") + assert(gpuValue.get.getName() == "gpu", "name should be gpu") + assert(gpuValue.get.getUnits() == "", "units should be empty") + assert(gpuValue.get.getAddresses().size == 2, "Should have 2 indexes") + assert(gpuValue.get.getAddresses().deep == Array("0", "1").deep, "should have 0,1 entries") + + val fpgaValue = resources.get("fpga") + assert(fpgaValue.nonEmpty, "Should have a gpu entry") + assert(fpgaValue.get.getCount() == 3, "Should have 3") + assert(fpgaValue.get.getName() == "fpga", "name should be fpga") + assert(fpgaValue.get.getUnits() == "mb", "units should be mb") + assert(fpgaValue.get.getAddresses().size == 3, "Should have 3 indexes") + assert(fpgaValue.get.getAddresses().deep == Array("f1", "f2", "f3").deep, + "should have f1,f2,f3 entries") + } + } + + test("Resource discoverer multiple gpus on driver") { + val sparkconf = new SparkConf + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val file1 = new File(dir, "resourceDiscoverScript2") + Files.write("echo 2::0,1", file1, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file1.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, "boguspath") + // make sure it reads from correct config, here it should use driver + val resources = ResourceDiscoverer.findResources(sparkconf, true) + val gpuValue = resources.get("gpu") + assert(gpuValue.nonEmpty, "Should have a gpu entry") + assert(gpuValue.get.getCount() == 2, "Should have 2") + assert(gpuValue.get.getName() == "gpu", "name should be gpu") + assert(gpuValue.get.getUnits() == "", "units should be empty") + assert(gpuValue.get.getAddresses().size == 2, "Should have 2 indexes") + assert(gpuValue.get.getAddresses().deep == Array("0", "1").deep, "should have 0,1 entries") + + } + } + + test("Resource discoverer script returns invalid format") { + val sparkconf = new SparkConf + assume(!(Utils.isWindows)) + + withTempDir { dir => + val file1 = new File(dir, "resourceDiscoverScript3") + Files.write("echo foo1", file1, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(file1.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, false) + }.getMessage() + + assert(error.contains("Error running the resource discovery")) + } + } + + test("Resource discoverer script doesn't exist") { + val sparkconf = new SparkConf + + val file1 = new File("/tmp/bogus") + try { + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath()) + + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, false) + }.getMessage() + + assert(error.contains("doesn't exist")) + } finally { + JavaFiles.deleteIfExists(file1.toPath()) + } + } + + test("gpu's specified but not discovery script") { + val sparkconf = new SparkConf + + val file1 = new File("/tmp/bogus") + try { + sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + val error = intercept[SparkException] { + ResourceDiscoverer.findResources(sparkconf, false) + }.getMessage() + + assert(error.contains("User is expecting to use")) + } finally { + JavaFiles.deleteIfExists(file1.toPath()) + } + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1ed2a1a2aeb4..39daf51ee613 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -497,7 +497,8 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty) + val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty, + Map.empty) // Get "localhost" on a blacklist. val taskScheduler = mock(classOf[TaskSchedulerImpl]) @@ -621,7 +622,8 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty) + val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty, + Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askSync[Boolean](message) } diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala new file mode 100644 index 000000000000..ff9ac1b094a9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + + +import java.io.File +import java.net.URL +import java.nio.charset.StandardCharsets +import java.nio.file.{Files => JavaFiles} +import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} +import java.util.EnumSet + +import com.google.common.io.Files +import org.mockito.Mockito.when +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} +import org.apache.spark.util.Utils + + +class CoarseGrainedExecutorBackendSuite extends SparkFunSuite + with LocalSparkContext with MockitoSugar { + + test("parsing no resources") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + var testResourceArgs = Some("") + var error = intercept[SparkException] { + val parsedResources = backend.parseResources(testResourceArgs) + }.getMessage() + + assert(error.contains("Format of the resourceAddrs parameter is invalid")) + + testResourceArgs = Some("gpu=::") + error = intercept[SparkException] { + val parsedResources = backend.parseResources(testResourceArgs) + }.getMessage() + + assert(error.contains("Format of the resourceAddrs parameter is invalid")) + } + + + test("parsing one resources") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + val testResourceArgs = Some("gpu=2::0,1") + val parsedResources = backend.parseResources(testResourceArgs) + + assert(parsedResources.size === 1) + assert(parsedResources.get("gpu").nonEmpty) + assert(parsedResources.get("gpu").get.getName() === "gpu") + assert(parsedResources.get("gpu").get.getUnits() === "") + assert(parsedResources.get("gpu").get.getCount() === 2) + assert(parsedResources.get("gpu").get.getAddresses().deep === Array("0", "1").deep) + } + + test("parsing multiple resources") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + val testResourceArgs = Some("gpu=2::0,1;fpga=3:mb:f1,f2,f3") + val parsedResources = backend.parseResources(testResourceArgs) + + assert(parsedResources.size === 2) + assert(parsedResources.get("gpu").nonEmpty) + assert(parsedResources.get("gpu").get.getName() === "gpu") + assert(parsedResources.get("gpu").get.getUnits() === "") + assert(parsedResources.get("gpu").get.getCount() === 2) + assert(parsedResources.get("gpu").get.getAddresses().deep === Array("0", "1").deep) + assert(parsedResources.get("fpga").nonEmpty) + assert(parsedResources.get("fpga").get.getName() === "fpga") + assert(parsedResources.get("fpga").get.getUnits() === "mb") + assert(parsedResources.get("fpga").get.getCount() === 3) + assert(parsedResources.get("fpga").get.getAddresses().deep === Array("f1", "f2", "f3").deep) + } + + test("use discoverer") { + val conf = new SparkConf + conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2") + + assume(!(Utils.isWindows)) + + withTempDir { dir => + val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga") + Files.write("echo 3::f1,f2,f3", fpgaDiscovery, StandardCharsets.UTF_8) + JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)) + conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath()) + + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + 4, Seq.empty[URL], env, None) + + val parsedResources = backend.parseResources(None) + + assert(parsedResources.size === 1) + assert(parsedResources.get("fpga").nonEmpty) + assert(parsedResources.get("fpga").get.getName() === "fpga") + assert(parsedResources.get("fpga").get.getUnits() === "") + assert(parsedResources.get("fpga").get.getCount() === 3) + assert(parsedResources.get("fpga").get.getAddresses().deep === Array("f1", "f2", "f3").deep) + } + } + + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { + val mockEnv = mock[SparkEnv] + val mockRpcEnv = mock[RpcEnv] + when(mockEnv.conf).thenReturn(conf) + when(mockEnv.serializer).thenReturn(serializer) + when(mockEnv.closureSerializer).thenReturn(serializer) + when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) + SparkEnv.set(mockEnv) + mockEnv + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 33f48b858a73..4858d38cad40 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -164,11 +164,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo sc.addSparkListener(listener) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes)) + RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) assert(executorAddedCount === 3) diff --git a/docs/configuration.md b/docs/configuration.md index 5325f8a352f7..9278f99244b9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -221,6 +221,25 @@ of the most common options to set are: This option is currently supported on YARN and Kubernetes. + + spark.executor.resource.{resourceType}.count + 0 + + The number of a particular resource type to use per executor process. + If this is used, you must also specify the + spark.executor.resource.{resourceType}.discoveryScript + for the executor to find the resource on startup. + + + + spark.executor.resource.{resourceType}.discoveryScript + None + + The script the executor should run to discover a particular resource type. This should + return a string in the format of: count:unit:comma-separated list of addresses. + unit and addresses can be empty if they don't apply to the resource type. + + spark.extraListeners (none) @@ -1793,6 +1812,15 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. + + spark.task.resource.{resourceType}.count + 1 + + Number of a particular resource type to allocate for each task. If this is specified + you must also provide the executor config spark.executor.resource.{resourceType}.count + and any corresponding discovery configs so that your executors are created with that resource type. + + spark.task.maxFailures 4 diff --git a/examples/src/main/resources/getGpuResources.sh b/examples/src/main/resources/getGpuResources.sh new file mode 100755 index 000000000000..8cb8ec673663 --- /dev/null +++ b/examples/src/main/resources/getGpuResources.sh @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# This is an example script that can be used to discover GPUs. It only works on NVIDIA GPUs since it +# uses the nvidia-smi command. This script will find all visible GPUs, so if you aren't running +# in an environment that can isolate GPUs to an executor and where multiple executors can run on a +# single node you may not want to use this. See your cluster manager specific configs for other +# options. +# +# It can be passed into Spark via the configs spark.executor.resource.gpu.discoveryScript and/or +# spark.driver.resource.gpu.discoveryScript. +# The script will return a string in the format: count:unit:comma-separated list of the resource addresses +# + +ADDRS=`nvidia-smi --query-gpu=index --format=csv,noheader | sed 'N;s/\n/,/'` +COUNT=`echo $ADDRS | tr -cd , | wc -c` +ALLCOUNT=`expr $COUNT + 1` +echo $ALLCOUNT::$ADDRS \ No newline at end of file diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 53e99d992db8..757b7a65813f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -37,7 +37,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( hostname: String, cores: Int, userClassPath: Seq[URL], - env: SparkEnv) + env: SparkEnv, + resourceAddrs: Option[String]) extends CoarseGrainedExecutorBackend( rpcEnv, driverUrl, @@ -45,7 +46,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( hostname, cores, userClassPath, - env) with Logging { + env, + resourceAddrs) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) @@ -66,7 +68,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env) + arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourceAddrs) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$"))