From 9d44d7d4d86e8549cc4e524a7ea3d818b41084f2 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 16 Jun 2018 20:14:42 +0200 Subject: [PATCH 1/7] Methods returns total number of cores and executors in the cluster --- .../src/main/scala/org/apache/spark/SparkContext.scala | 10 ++++++++++ .../org/apache/spark/scheduler/SchedulerBackend.scala | 2 ++ .../org/apache/spark/scheduler/TaskScheduler.scala | 4 ++++ .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++++ .../cluster/CoarseGrainedSchedulerBackend.scala | 3 +++ .../spark/scheduler/local/LocalSchedulerBackend.scala | 3 +++ .../apache/spark/ExecutorAllocationManagerSuite.scala | 3 +++ .../spark/scheduler/BlacklistIntegrationSuite.scala | 6 +++++- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++++-- .../spark/scheduler/ExternalClusterManagerSuite.scala | 4 ++++ .../spark/scheduler/SchedulerIntegrationSuite.scala | 2 ++ .../spark/scheduler/TaskSchedulerImplSuite.scala | 2 ++ 12 files changed, 49 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5e8595603cc9..0378fb8e3c00 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2336,6 +2336,16 @@ class SparkContext(config: SparkConf) extends Logging { */ def defaultMinPartitions: Int = math.min(defaultParallelism, 2) + /** + * Total number of CPU cores of all executors in the cluster + */ + def coresCount: Int = taskScheduler.coresCount + + /** + * Total number of executors in the cluster + */ + def executorsCount: Int = taskScheduler.executorsCount + private val nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 22db3350abfa..ccd3ef17a845 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -29,6 +29,8 @@ private[spark] trait SchedulerBackend { def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int + def coresCount(): Int + def executorsCount(): Int /** * Requests that an executor kills a running task. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 90644fea23ab..01cc9168b4d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -67,6 +67,10 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int + def coresCount(): Int + + def executorsCount(): Int + /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f85a1f..ab48b5ad05b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -515,6 +515,10 @@ private[spark] class TaskSchedulerImpl( override def defaultParallelism(): Int = backend.defaultParallelism() + override def coresCount(): Int = backend.coresCount() + + override def executorsCount(): Int = backend.executorsCount() + // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d8794e8e551a..3d51a7dc1ed1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -457,6 +457,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } + override def coresCount(): Int = totalCoreCount.get() + override def executorsCount(): Int = totalRegisteredExecutors.get() + /** * Called by subclasses when notified of a lost worker. It just fires the message and returns * at once. diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 4c614c5c0f60..7fa3daffbfbe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -144,6 +144,9 @@ private[spark] class LocalSchedulerBackend( override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) + override def coresCount(): Int = totalCores + override def executorsCount(): Int = 1 + override def killTask( taskId: Long, executorId: String, interruptThread: Boolean, reason: String) { localEndpoint.send(KillTask(taskId, interruptThread, reason)) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3cfb0a9feb32..1f11bd187e82 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1376,6 +1376,9 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() + override def coresCount(): Int = sb.coresCount() + override def executorsCount(): Int = sb.executorsCount() + override def killExecutorsOnHost(host: String): Boolean = { false } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index d3bbfd11d406..67a95cb930ee 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -144,7 +144,11 @@ class MultiExecutorMockBackend( }.toMap } - override def defaultParallelism(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor + override def defaultParallelism(): Int = coresCount + + override def coresCount(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor + override def executorsCount(): Int = nHosts * nExecutorsPerHost + } class MockRDDWithLocalityPrefs( diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2987170bf502..06f1c7f06997 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -132,7 +132,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def killTaskAttempt( taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler) = {} - override def defaultParallelism() = 2 + override def defaultParallelism() = coresCount + override def coresCount(): Int = 2 + override def executorsCount(): Int = 1 + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -630,7 +633,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} - override def defaultParallelism(): Int = 2 + override def defaultParallelism(): Int = coresCount + override def coresCount(): Int = 2 + override def executorsCount(): Int = 1 override def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index a4e4ea7cd289..c1d79e8ad0c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -69,6 +69,8 @@ private class DummySchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def coresCount(): Int = 1 + def executorsCount(): Int = 1 } private class DummyTaskScheduler extends TaskScheduler { @@ -83,6 +85,8 @@ private class DummyTaskScheduler extends TaskScheduler { taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 + override def coresCount(): Int = 2 + override def executorsCount(): Int = 1 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 75ea409e16b4..fccdcf5acd64 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -430,6 +430,8 @@ private[spark] class SingleCoreMockBackend( val cores = 1 override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores) + override def coresCount(): Int = cores + override def executorsCount(): Int = 1 freeCores = cores val localExecutorId = SparkContext.DRIVER_IDENTIFIER diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 33f2ea1c94e7..27a8d35749f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -36,6 +36,8 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def coresCount(): Int = 1 + def executorsCount(): Int = 1 } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach From c6b354c466677c1101b30fc1b25ddc5750c8eaf6 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 16 Jun 2018 20:19:09 +0200 Subject: [PATCH 2/7] Update Java's Spark Context --- .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index f1936bf58728..28637edf3d32 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -128,6 +128,12 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions + /** Total number of CPU cores of all executors in the cluster */ + def coresCount: java.lang.Integer = sc.coresCount + + /** Total number of executors in the cluster */ + def executorsCount: java.lang.Integer = sc.executorsCount + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag From 54f04369c0f3329e8c27ad405a350ee20b788b21 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 16 Jun 2018 20:57:08 +0200 Subject: [PATCH 3/7] Tests for number of cores and executors in the local mode --- .../java/test/org/apache/spark/JavaSparkContextSuite.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java index 7e9cc70d8651..ccd57adbbe4c 100644 --- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java @@ -45,6 +45,11 @@ public void javaSparkContext() { new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop(); new JavaSparkContext("local", "name", "sparkHome", jars).stop(); new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop(); + + JavaSparkContext sc = new JavaSparkContext(new SparkConf().setMaster("local[2]").setAppName("name")); + assert sc.coresCount() == 2; + assert sc.executorsCount() == 1; + sc.stop(); } @Test From 4d645829c8d338451be81c4554cc1257b459f6a6 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 16 Jun 2018 22:09:35 +0200 Subject: [PATCH 4/7] Adding coresCount and executorsCount to PySpark --- python/pyspark/context.py | 14 ++++++++++++++ python/pyspark/tests.py | 8 ++++++++ 2 files changed, 22 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ede3b6af0a8c..50516e140034 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -406,6 +406,20 @@ def defaultMinPartitions(self): """ return self._jsc.sc().defaultMinPartitions() + @property + def coresCount(self): + """ + Total number of CPU cores of all executors in the cluster + """ + return self._jsc.sc().coresCount() + + @property + def executorsCount(self): + """ + Total number of executors in the cluster + """ + return self._jsc.sc().executorsCount() + def stop(self): """ Shut down the SparkContext. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index a4c5fb1db8b3..1ceb83744798 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -519,6 +519,14 @@ def setUp(self): # Allow retries even though they are normally disabled in local mode self.sc = SparkContext('local[4, 2]', class_name) + def test_cores_count(self): + """Test for number of cores in the cluster""" + self.assertEqual(self.sc.coresCount, 4) + + def test_executors_count(self): + """Test for number of executors in the cluster""" + self.assertEqual(self.sc.executorsCount, 1) + def test_stage_id(self): """Test the stage ids are available and incrementing as expected.""" rdd = self.sc.parallelize(range(10)) From 79633d9a3e7aebf40ee8940e8fcf00d43dc22ed7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Jun 2018 15:08:54 +0200 Subject: [PATCH 5/7] Improving comments --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- python/pyspark/context.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0378fb8e3c00..fb39475ee7b3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2337,12 +2337,14 @@ class SparkContext(config: SparkConf) extends Logging { def defaultMinPartitions: Int = math.min(defaultParallelism, 2) /** - * Total number of CPU cores of all executors in the cluster + * Total number of CPU cores of all executors registered in the cluster at the moment. + * The number reflects current status of the cluster and can change in the future. */ def coresCount: Int = taskScheduler.coresCount /** - * Total number of executors in the cluster + * Total number of executors registered in the cluster at the moment. + * The number reflects current status of the cluster and can change in the future. */ def executorsCount: Int = taskScheduler.executorsCount diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 50516e140034..a98cb8b40781 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -409,14 +409,16 @@ def defaultMinPartitions(self): @property def coresCount(self): """ - Total number of CPU cores of all executors in the cluster + Total number of CPU cores of all executors registered in the cluster at the moment. + The number reflects current status of the cluster and can change in the future. """ return self._jsc.sc().coresCount() @property def executorsCount(self): """ - Total number of executors in the cluster + Total number of executors registered in the cluster at the moment. + The number reflects current status of the cluster and can change in the future. """ return self._jsc.sc().executorsCount() From d7e94e10794964022d3dc98671b86f02af80d2e8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Jun 2018 15:31:48 +0200 Subject: [PATCH 6/7] Renaming of the methods --- .../main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../apache/spark/api/java/JavaSparkContext.scala | 14 ++++++++++---- .../apache/spark/scheduler/SchedulerBackend.scala | 4 ++-- .../org/apache/spark/scheduler/TaskScheduler.scala | 4 ++-- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scheduler/local/LocalSchedulerBackend.scala | 4 ++-- .../org/apache/spark/JavaSparkContextSuite.java | 4 ++-- .../spark/ExecutorAllocationManagerSuite.scala | 4 ++-- .../scheduler/BlacklistIntegrationSuite.scala | 6 +++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 ++++++------ .../scheduler/ExternalClusterManagerSuite.scala | 8 ++++---- .../scheduler/SchedulerIntegrationSuite.scala | 4 ++-- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- python/pyspark/context.py | 8 ++++---- python/pyspark/tests.py | 8 ++++---- 16 files changed, 51 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fb39475ee7b3..da4621ab8a74 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2340,13 +2340,13 @@ class SparkContext(config: SparkConf) extends Logging { * Total number of CPU cores of all executors registered in the cluster at the moment. * The number reflects current status of the cluster and can change in the future. */ - def coresCount: Int = taskScheduler.coresCount + def numCores: Int = taskScheduler.numCores /** * Total number of executors registered in the cluster at the moment. * The number reflects current status of the cluster and can change in the future. */ - def executorsCount: Int = taskScheduler.executorsCount + def numExecutors: Int = taskScheduler.numExecutors private val nextShuffleId = new AtomicInteger(0) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 28637edf3d32..cab185553525 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -128,11 +128,17 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions - /** Total number of CPU cores of all executors in the cluster */ - def coresCount: java.lang.Integer = sc.coresCount + /** + * Total number of CPU cores of all executors registered in the cluster at the moment. + * The number reflects current status of the cluster and can change in the future. + */ + def numCores: java.lang.Integer = sc.numCores - /** Total number of executors in the cluster */ - def executorsCount: java.lang.Integer = sc.executorsCount + /** + * Total number of executors registered in the cluster at the moment. + * The number reflects current status of the cluster and can change in the future. + */ + def numExecutors: java.lang.Integer = sc.numExecutors /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index ccd3ef17a845..549d1870a1db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -29,8 +29,8 @@ private[spark] trait SchedulerBackend { def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int - def coresCount(): Int - def executorsCount(): Int + def numCores(): Int + def numExecutors(): Int /** * Requests that an executor kills a running task. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 01cc9168b4d0..1ea19b99fff6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -67,9 +67,9 @@ private[spark] trait TaskScheduler { // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int - def coresCount(): Int + def numCores(): Int - def executorsCount(): Int + def numExecutors(): Int /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ab48b5ad05b5..c571033f6fca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -515,9 +515,9 @@ private[spark] class TaskSchedulerImpl( override def defaultParallelism(): Int = backend.defaultParallelism() - override def coresCount(): Int = backend.coresCount() + override def numCores(): Int = backend.numCores() - override def executorsCount(): Int = backend.executorsCount() + override def numExecutors(): Int = backend.numExecutors() // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3d51a7dc1ed1..44bd9a7efc35 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -457,8 +457,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } - override def coresCount(): Int = totalCoreCount.get() - override def executorsCount(): Int = totalRegisteredExecutors.get() + override def numCores(): Int = totalCoreCount.get() + override def numExecutors(): Int = totalRegisteredExecutors.get() /** * Called by subclasses when notified of a lost worker. It just fires the message and returns diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 7fa3daffbfbe..286aae7c1457 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -144,8 +144,8 @@ private[spark] class LocalSchedulerBackend( override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) - override def coresCount(): Int = totalCores - override def executorsCount(): Int = 1 + override def numCores(): Int = totalCores + override def numExecutors(): Int = 1 override def killTask( taskId: Long, executorId: String, interruptThread: Boolean, reason: String) { diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java index ccd57adbbe4c..23c0c2f7522d 100644 --- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java @@ -47,8 +47,8 @@ public void javaSparkContext() { new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop(); JavaSparkContext sc = new JavaSparkContext(new SparkConf().setMaster("local[2]").setAppName("name")); - assert sc.coresCount() == 2; - assert sc.executorsCount() == 1; + assert sc.numCores() == 2; + assert sc.numExecutors() == 1; sc.stop(); } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 1f11bd187e82..a7eb672b1a06 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1376,8 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() - override def coresCount(): Int = sb.coresCount() - override def executorsCount(): Int = sb.executorsCount() + override def numCores(): Int = sb.numCores() + override def numExecutors(): Int = sb.numExecutors() override def killExecutorsOnHost(host: String): Boolean = { false diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 67a95cb930ee..4b2141cc8e4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -144,10 +144,10 @@ class MultiExecutorMockBackend( }.toMap } - override def defaultParallelism(): Int = coresCount + override def defaultParallelism(): Int = numCores - override def coresCount(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor - override def executorsCount(): Int = nHosts * nExecutorsPerHost + override def numCores(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor + override def numExecutors(): Int = nHosts * nExecutorsPerHost } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 06f1c7f06997..03cc063063ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -132,9 +132,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def killTaskAttempt( taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler) = {} - override def defaultParallelism() = coresCount - override def coresCount(): Int = 2 - override def executorsCount(): Int = 1 + override def defaultParallelism() = numCores + override def numCores(): Int = 2 + override def numExecutors(): Int = 1 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} @@ -633,9 +633,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} - override def defaultParallelism(): Int = coresCount - override def coresCount(): Int = 2 - override def executorsCount(): Int = 1 + override def defaultParallelism(): Int = numCores + override def numCores(): Int = 2 + override def numExecutors(): Int = 1 override def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index c1d79e8ad0c0..e3821cec3fed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -69,8 +69,8 @@ private class DummySchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 - def coresCount(): Int = 1 - def executorsCount(): Int = 1 + def numCores(): Int = 1 + def numExecutors(): Int = 1 } private class DummyTaskScheduler extends TaskScheduler { @@ -85,8 +85,8 @@ private class DummyTaskScheduler extends TaskScheduler { taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 - override def coresCount(): Int = 2 - override def executorsCount(): Int = 1 + override def numCores(): Int = 2 + override def numExecutors(): Int = 1 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index fccdcf5acd64..e0ce5ce3d492 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -430,8 +430,8 @@ private[spark] class SingleCoreMockBackend( val cores = 1 override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores) - override def coresCount(): Int = cores - override def executorsCount(): Int = 1 + override def numCores(): Int = cores + override def numExecutors(): Int = 1 freeCores = cores val localExecutorId = SparkContext.DRIVER_IDENTIFIER diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 27a8d35749f6..a4611bac0009 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -36,8 +36,8 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 - def coresCount(): Int = 1 - def executorsCount(): Int = 1 + def numCores(): Int = 1 + def numExecutors(): Int = 1 } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a98cb8b40781..4f76a7bd1432 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -407,20 +407,20 @@ def defaultMinPartitions(self): return self._jsc.sc().defaultMinPartitions() @property - def coresCount(self): + def numCores(self): """ Total number of CPU cores of all executors registered in the cluster at the moment. The number reflects current status of the cluster and can change in the future. """ - return self._jsc.sc().coresCount() + return self._jsc.sc().numCores() @property - def executorsCount(self): + def numExecutors(self): """ Total number of executors registered in the cluster at the moment. The number reflects current status of the cluster and can change in the future. """ - return self._jsc.sc().executorsCount() + return self._jsc.sc().numExecutors() def stop(self): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1ceb83744798..b618cb4ce29d 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -519,13 +519,13 @@ def setUp(self): # Allow retries even though they are normally disabled in local mode self.sc = SparkContext('local[4, 2]', class_name) - def test_cores_count(self): + def test_num_cores(self): """Test for number of cores in the cluster""" - self.assertEqual(self.sc.coresCount, 4) + self.assertEqual(self.sc.numCores, 4) - def test_executors_count(self): + def test_num_executors(self): """Test for number of executors in the cluster""" - self.assertEqual(self.sc.executorsCount, 1) + self.assertEqual(self.sc.numExecutors, 1) def test_stage_id(self): """Test the stage ids are available and incrementing as expected.""" From 9be566f1ed3e066de7e3d3ad557756d22fc22a73 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Jun 2018 20:41:06 +0200 Subject: [PATCH 7/7] New methods for SparkR --- R/pkg/R/context.R | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 8ec727dd042b..08bbf52670f5 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -25,6 +25,22 @@ getMinPartitions <- function(sc, minPartitions) { as.integer(minPartitions) } +#' Total number of CPU cores of all executors registered in the cluster at the moment. +#' +#' @param sc SparkContext to use +#' @return current number of cores in the cluster. +numCores <- function(sc) { + callJMethod(sc, "numCores") +} + +#' Total number of executors registered in the cluster at the moment. +#' +#' @param sc SparkContext to use +#' @return current number of executors in the cluster. +numExecutors <- function(sc) { + callJMethod(sc, "numExecutors") +} + #' Create an RDD from a text file. #' #' This function reads a text file from HDFS, a local file system (available on all