diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index ac1a6b7c3ec0..04595aca939c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.resource.ResourceAmountUtils import org.apache.spark.resource.ResourceUtils.GPU -class ExecutorResourceInfoSuite extends SparkFunSuite { +class ExecutorResourceInfoSuite extends SparkFunSuite with ExecutorResourceUtils { implicit def convertMapLongToDouble(resources: Map[String, Long]): Map[String, Double] = { resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } @@ -126,14 +126,6 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { } } - def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], - eps: Double = 0.00000001): Boolean = { - lhs.size == rhs.size && - lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => - lName == rName && (lAmount - rAmount).abs < eps - } - } - test("assign/release resource for different task requirements") { val execInfo = new ExecutorResourceInfo("gpu", Seq("0", "1", "2", "3")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceUtils.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceUtils.scala new file mode 100644 index 000000000000..7d96272968a4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceUtils.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +trait ExecutorResourceUtils { + + def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], + eps: Double = 0.00000001): Boolean = { + lhs.size == rhs.size && lhs.forall { case (lName, lAmount) => + rhs.get(lName).exists(rAmount => (lAmount - rAmount).abs < eps) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala index e512327fefa7..75a772dcdec8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfileBuilder, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils.GPU -class ExecutorResourcesAmountsSuite extends SparkFunSuite { +class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceUtils { implicit def toFractionalResource(resources: Map[String, Long]): Map[String, Double] = resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } @@ -39,14 +39,6 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite { k -> ResourceAmountUtils.toInternalResource(v) } } - def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], - eps: Double = 0.00000001): Boolean = { - lhs.size == rhs.size && - lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => - lName == rName && (lAmount - rAmount).abs < eps - } - } - test("assign to rp without task resources requirement") { val executorsInfo = Map( "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7d354f27ff73..3e43442583ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -2285,9 +2285,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext config.EXECUTOR_CORES.key -> executorCpus.toString) val taskSet = if (barrierMode) { - FakeTask.createTaskSet(100) - } else { FakeTask.createBarrierTaskSet(4 * taskNum) + } else { + FakeTask.createTaskSet(100) } val resources = new ExecutorResourcesAmounts( @@ -2298,7 +2298,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.submitTasks(taskSet) // Launch tasks on executor that satisfies resource requirements. - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + taskDescriptions = taskDescriptions.sortBy(t => t.index) assert(4 * taskNum === taskDescriptions.length) assert(!failedTaskSet) var gpuAddress = -1 @@ -2331,9 +2332,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext config.EXECUTOR_CORES.key -> executorCpus.toString) val taskSet = if (barrierMode) { - FakeTask.createTaskSet(100) - } else { FakeTask.createBarrierTaskSet(4 * taskNum) + } else { + FakeTask.createTaskSet(100) } val workerOffers = @@ -2390,9 +2391,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) val taskSet = if (barrierMode) { - FakeTask.createTaskSet(100, 0, 1, 1, rp.id) - } else { FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } else { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) } val resources = new ExecutorResourcesAmounts( Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) @@ -2403,7 +2404,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.submitTasks(taskSet) // Launch tasks on executor that satisfies resource requirements. - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + taskDescriptions = taskDescriptions.sortBy(t => t.index) assert(4 * taskNum === taskDescriptions.length) assert(!failedTaskSet) var gpuAddress = -1 @@ -2438,9 +2440,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) val taskSet = if (barrierMode) { - FakeTask.createTaskSet(100, 0, 1, 1, rp.id) - } else { FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } else { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) } val workerOffers =