From 9b033ccfa572c93d7c2dc7bca06f9be1e363f88a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 19 Jun 2018 21:40:20 +0200 Subject: [PATCH 01/11] Initial commit (yarn metrics) --- .../spark/deploy/yarn/ApplicationMaster.scala | 18 ++++++++-- .../spark/deploy/yarn/YarnAllocator.scala | 4 +-- .../yarn/YarnAllocatorBlacklistTracker.scala | 3 +- .../spark/deploy/yarn/YarnRMClient.scala | 5 +-- .../cluster/YarnClusterSchedulerSource.scala | 35 +++++++++++++++++++ .../deploy/yarn/YarnAllocatorSuite.scala | 2 +- 6 files changed, 58 insertions(+), 9 deletions(-) create mode 100644 resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ecc576910db9..c72962c99334 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -43,8 +43,9 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.AMCredentialRenewer import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnClusterSchedulerSource, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util._ @@ -67,6 +68,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends private val securityMgr = new SecurityManager(sparkConf) + private[spark] val failureTracker = new FailureTracker(sparkConf, new SystemClock) + + private val metricsSystem: MetricsSystem = + MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) + + metricsSystem.registerSource(new YarnClusterSchedulerSource(failureTracker)) + metricsSystem.start() + // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode @@ -309,6 +318,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) + } finally { + metricsSystem.report() + metricsSystem.stop() } } @@ -424,7 +436,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends driverUrl, driverRef, securityMgr, - localResources) + localResources, + failureTracker) credentialRenewer.foreach(_.setDriverRef(driverRef)) @@ -771,6 +784,7 @@ object ApplicationMaster extends Logging { private var master: ApplicationMaster = _ + def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index fae054e0eea0..ca08993634b8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -67,7 +67,7 @@ private[yarn] class YarnAllocator( securityMgr: SecurityManager, localResources: Map[String, LocalResource], resolver: SparkRackResolver, - clock: Clock = new SystemClock) + failureTracker: FailureTracker) extends Logging { import YarnAllocator._ @@ -103,8 +103,6 @@ private[yarn] class YarnAllocator( private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) - private[spark] val failureTracker = new FailureTracker(sparkConf, clock) - private val allocatorBlacklistTracker = new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index 1b48a0ee7ad3..4b8438bfd4cb 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.scheduler.BlacklistTracker +import org.apache.spark.scheduler.cluster.YarnClusterSchedulerSource import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -177,7 +178,7 @@ private[spark] class FailureTracker( failedExecutorsTimeStamps.enqueue(timeMillis) } - def numFailuresOnHost(hostname: String): Int = { + def numFailuresOnHost(hostname: String): Int = synchronized { failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost => updateAndCountFailures(failedExecutorsOnHost) }.getOrElse(0) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index b59dcf158d87..495c0cb362fe 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -78,10 +78,11 @@ private[spark] class YarnRMClient extends Logging { driverUrl: String, driverRef: RpcEndpointRef, securityMgr: SecurityManager, - localResources: Map[String, LocalResource]): YarnAllocator = { + localResources: Map[String, LocalResource], + failureTracker: FailureTracker): YarnAllocator = { require(registered, "Must register AM before creating allocator.") new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources, new SparkRackResolver()) + localResources, new SparkRackResolver(), failureTracker) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala new file mode 100644 index 000000000000..355da0c65bc1 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.deploy.yarn.FailureTracker +import org.apache.spark.metrics.source.Source + +private[spark] class YarnClusterSchedulerSource(failureTracker: FailureTracker) extends Source { + + override val sourceName: String = "yarn_cluster" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + metricRegistry.register( + MetricRegistry.name("numFailedExecutors"), + new Gauge[Int] { + override def getValue: Int = failureTracker.numFailedExecutors + }) +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 3f783baed110..efe2959614cd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -105,7 +105,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter new SecurityManager(sparkConf), Map(), new MockResolver(), - clock) + new FailureTracker(sparkConf, clock)) } def createContainer(host: String): Container = { From 4968ad6ab35aa3777cb54ce26b4fce495c1d2013 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 26 Jun 2018 13:26:53 +0200 Subject: [PATCH 02/11] adding more YARN metrics --- .../spark/deploy/yarn/ApplicationMaster.scala | 17 +++---- .../spark/deploy/yarn/YarnAllocator.scala | 8 ++- .../yarn/YarnAllocatorBlacklistTracker.scala | 5 +- .../yarn/YarnClusterSchedulerSource.scala | 49 +++++++++++++++++++ .../spark/deploy/yarn/YarnRMClient.scala | 5 +- .../cluster/YarnClusterSchedulerSource.scala | 35 ------------- .../deploy/yarn/YarnAllocatorSuite.scala | 2 +- 7 files changed, 66 insertions(+), 55 deletions(-) create mode 100644 resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala delete mode 100644 resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c72962c99334..9e955e7aab3c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -45,7 +45,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnClusterSchedulerSource, YarnSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util._ @@ -68,13 +68,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends private val securityMgr = new SecurityManager(sparkConf) - private[spark] val failureTracker = new FailureTracker(sparkConf, new SystemClock) - - private val metricsSystem: MetricsSystem = - MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) - - metricsSystem.registerSource(new YarnClusterSchedulerSource(failureTracker)) - metricsSystem.start() + private var metricsSystem: MetricsSystem = _ // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class @@ -436,8 +430,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends driverUrl, driverRef, securityMgr, - localResources, - failureTracker) + localResources) credentialRenewer.foreach(_.setDriverRef(driverRef)) @@ -447,6 +440,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() + metricsSystem = MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) + metricsSystem.registerSource(new YarnClusterSchedulerSource(allocator)) + metricsSystem.start() reporterThread = launchReporterThread() } @@ -784,7 +780,6 @@ object ApplicationMaster extends Logging { private var master: ApplicationMaster = _ - def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ca08993634b8..e2b53b0e426f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -67,7 +67,7 @@ private[yarn] class YarnAllocator( securityMgr: SecurityManager, localResources: Map[String, LocalResource], resolver: SparkRackResolver, - failureTracker: FailureTracker) + clock: Clock = new SystemClock) extends Logging { import YarnAllocator._ @@ -103,6 +103,8 @@ private[yarn] class YarnAllocator( private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) + private[spark] val failureTracker = new FailureTracker(sparkConf, clock) + private val allocatorBlacklistTracker = new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker) @@ -148,7 +150,7 @@ private[yarn] class YarnAllocator( private var hostToLocalTaskCounts: Map[String, Int] = Map.empty // Number of tasks that have locality preferences in active stages - private var numLocalityAwareTasks: Int = 0 + private[yarn] var numLocalityAwareTasks: Int = 0 // A container placement strategy based on pending tasks' locality preference private[yarn] val containerPlacementStrategy = @@ -156,6 +158,8 @@ private[yarn] class YarnAllocator( def getNumExecutorsRunning: Int = runningExecutors.size() + def getNumReleasedContainers: Int = releasedContainers.size() + def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index 4b8438bfd4cb..ceac7cda5f8b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -28,8 +28,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.scheduler.BlacklistTracker -import org.apache.spark.scheduler.cluster.YarnClusterSchedulerSource -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{Clock, SystemClock} /** * YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes @@ -178,7 +177,7 @@ private[spark] class FailureTracker( failedExecutorsTimeStamps.enqueue(timeMillis) } - def numFailuresOnHost(hostname: String): Int = synchronized { + def numFailuresOnHost(hostname: String): Int = { failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost => updateAndCountFailures(failedExecutorsOnHost) }.getOrElse(0) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala new file mode 100644 index 000000000000..6cf5c5b84c77 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) extends Source { + + override val sourceName: String = "yarn" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumExecutorsFailed + }) + + metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumExecutorsRunning + }) + + metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumReleasedContainers + }) + + metricRegistry.register(MetricRegistry.name("numPendingLossReasonRequests"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumPendingLossReasonRequests + }) + + metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.numLocalityAwareTasks + }) + +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 495c0cb362fe..b59dcf158d87 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -78,11 +78,10 @@ private[spark] class YarnRMClient extends Logging { driverUrl: String, driverRef: RpcEndpointRef, securityMgr: SecurityManager, - localResources: Map[String, LocalResource], - failureTracker: FailureTracker): YarnAllocator = { + localResources: Map[String, LocalResource]): YarnAllocator = { require(registered, "Must register AM before creating allocator.") new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources, new SparkRackResolver(), failureTracker) + localResources, new SparkRackResolver()) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala deleted file mode 100644 index 355da0c65bc1..000000000000 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerSource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import org.apache.spark.deploy.yarn.FailureTracker -import org.apache.spark.metrics.source.Source - -private[spark] class YarnClusterSchedulerSource(failureTracker: FailureTracker) extends Source { - - override val sourceName: String = "yarn_cluster" - override val metricRegistry: MetricRegistry = new MetricRegistry() - - metricRegistry.register( - MetricRegistry.name("numFailedExecutors"), - new Gauge[Int] { - override def getValue: Int = failureTracker.numFailedExecutors - }) -} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index efe2959614cd..3f783baed110 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -105,7 +105,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter new SecurityManager(sparkConf), Map(), new MockResolver(), - new FailureTracker(sparkConf, clock)) + clock) } def createContainer(host: String): Container = { From 9735525157b63248cba699abbd1ecba20c5d1904 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Wed, 27 Jun 2018 17:19:47 +0200 Subject: [PATCH 03/11] adding doc --- docs/monitoring.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/monitoring.md b/docs/monitoring.md index 6eaf33135744..e78667dda168 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl * `executor`: A Spark executor. * `driver`: The Spark driver process (the process in which your SparkContext is created). * `shuffleService`: The Spark shuffle service. +* `yarn`: Spark resource allocations on YARN. Each instance can report to zero or more _sinks_. Sinks are contained in the `org.apache.spark.metrics.sink` package: From a8f314694834933c1e47cdf523ab87c57f3a821b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Wed, 27 Jun 2018 22:54:02 +0200 Subject: [PATCH 04/11] store metric system in Option --- .../spark/deploy/yarn/ApplicationMaster.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9e955e7aab3c..d77e1092bf1d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -68,7 +68,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends private val securityMgr = new SecurityManager(sparkConf) - private var metricsSystem: MetricsSystem = _ + private var metricsSystem: Option[MetricsSystem] = None // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class @@ -313,8 +313,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) } finally { - metricsSystem.report() - metricsSystem.stop() + metricsSystem.foreach { ms => + ms.report() + ms.stop() + } } } @@ -440,9 +442,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() - metricsSystem = MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) - metricsSystem.registerSource(new YarnClusterSchedulerSource(allocator)) - metricsSystem.start() + val ms = MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) + ms.registerSource(new YarnClusterSchedulerSource(allocator)) + ms.start() + metricsSystem = Some(ms) reporterThread = launchReporterThread() } From b0ee4ec158e96deba98c0e1e3b9df5039f616ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Wed, 27 Jun 2018 23:14:57 +0200 Subject: [PATCH 05/11] add numPendingAllocate as a metric --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++++ .../apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e2b53b0e426f..26c3e7287272 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -169,6 +169,10 @@ private[yarn] class YarnAllocator( */ def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST) + def getNumPendingAllocate: Int = synchronized { + getPendingAllocate.size + } + /** * A sequence of pending container requests at the given location that have not yet been * fulfilled. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala index 6cf5c5b84c77..7e6ff667381c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala @@ -46,4 +46,8 @@ private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) ex override def getValue: Int = yarnAllocator.numLocalityAwareTasks }) + metricRegistry.register(MetricRegistry.name("numPendingAllocate"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.getNumPendingAllocate + }) + } From 68ba47e0211748b449684a9fdc50b7ba5c919388 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Mon, 2 Jul 2018 18:25:36 +0200 Subject: [PATCH 06/11] Remove numPendingLossReasonRequests metric. --- .../apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala index 7e6ff667381c..90c0c0910887 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala @@ -38,10 +38,6 @@ private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) ex override def getValue: Int = yarnAllocator.getNumReleasedContainers }) - metricRegistry.register(MetricRegistry.name("numPendingLossReasonRequests"), new Gauge[Int] { - override def getValue: Int = yarnAllocator.getNumPendingLossReasonRequests - }) - metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] { override def getValue: Int = yarnAllocator.numLocalityAwareTasks }) From f3781bdcadb84f90cbb3402f38df9c6493fb3ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Tue, 10 Jul 2018 14:58:01 +0200 Subject: [PATCH 07/11] rename numPendingAllocate to numContainersPendingAllocate --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 26c3e7287272..40f1222fcd83 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -169,7 +169,7 @@ private[yarn] class YarnAllocator( */ def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST) - def getNumPendingAllocate: Int = synchronized { + def numContainersPendingAllocate: Int = synchronized { getPendingAllocate.size } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala index 90c0c0910887..4283ece0cc63 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala @@ -42,8 +42,8 @@ private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) ex override def getValue: Int = yarnAllocator.numLocalityAwareTasks }) - metricRegistry.register(MetricRegistry.name("numPendingAllocate"), new Gauge[Int] { - override def getValue: Int = yarnAllocator.getNumPendingAllocate + metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] { + override def getValue: Int = yarnAllocator.numContainersPendingAllocate }) } From 6751ec5221be590da74010e8d3781585f1045a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 12 Jul 2018 12:23:57 +0200 Subject: [PATCH 08/11] Ignore exceptions during the stopping of the metric system. --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d77e1092bf1d..e25774622ef7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -313,9 +313,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) } finally { - metricsSystem.foreach { ms => - ms.report() - ms.stop() + try { + metricsSystem.foreach { ms => + ms.report() + ms.stop() + } + } catch { + case e: Exception => + logInfo("Exception during stopping of the metric system: ", e) } } } From c0c4748aeb0f93a881835567af02e98643ad894a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 12 Jul 2018 14:53:42 +0200 Subject: [PATCH 09/11] Rename metric source/system from yarn to applicationMaster. --- docs/monitoring.md | 2 +- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++-- ...terSchedulerSource.scala => ApplicationMasterSource.scala} | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) rename resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/{YarnClusterSchedulerSource.scala => ApplicationMasterSource.scala} (92%) diff --git a/docs/monitoring.md b/docs/monitoring.md index e78667dda168..8dcea704b4b8 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -435,7 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl * `executor`: A Spark executor. * `driver`: The Spark driver process (the process in which your SparkContext is created). * `shuffleService`: The Spark shuffle service. -* `yarn`: Spark resource allocations on YARN. +* `applicationMaster`: The Spark application master on YARN. Each instance can report to zero or more _sinks_. Sinks are contained in the `org.apache.spark.metrics.sink` package: diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e25774622ef7..c93753ccd767 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -447,8 +447,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() - val ms = MetricsSystem.createMetricsSystem("yarn", sparkConf, securityMgr) - ms.registerSource(new YarnClusterSchedulerSource(allocator)) + val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) + ms.registerSource(new ApplicationMasterSource(allocator)) ms.start() metricsSystem = Some(ms) reporterThread = launchReporterThread() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala similarity index 92% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala rename to resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala index 4283ece0cc63..8ea8308d7b23 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnClusterSchedulerSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala @@ -21,9 +21,9 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source -private[spark] class YarnClusterSchedulerSource(yarnAllocator: YarnAllocator) extends Source { +private[spark] class ApplicationMasterSource(yarnAllocator: YarnAllocator) extends Source { - override val sourceName: String = "yarn" + override val sourceName: String = "applicationMaster" override val metricRegistry: MetricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] { From 79585251d46b6d94e3a2d95e1e635e2801108cee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cattilapiros=E2=80=9D?= Date: Thu, 19 Jul 2018 19:58:49 +0200 Subject: [PATCH 10/11] adding namespace prefix --- docs/monitoring.md | 2 +- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 +++-- .../apache/spark/deploy/yarn/ApplicationMasterSource.scala | 5 +++-- .../src/main/scala/org/apache/spark/deploy/yarn/config.scala | 5 +++++ 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 8dcea704b4b8..2717dd091c75 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -435,7 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl * `executor`: A Spark executor. * `driver`: The Spark driver process (the process in which your SparkContext is created). * `shuffleService`: The Spark shuffle service. -* `applicationMaster`: The Spark application master on YARN. +* `applicationMaster`: The Spark ApplicationMaster when running on YARN. Each instance can report to zero or more _sinks_. Sinks are contained in the `org.apache.spark.metrics.sink` package: diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c93753ccd767..55ed114f8500 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -320,7 +320,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } catch { case e: Exception => - logInfo("Exception during stopping of the metric system: ", e) + logWarning("Exception during stopping of the metric system: ", e) } } } @@ -448,7 +448,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) - ms.registerSource(new ApplicationMasterSource(allocator)) + val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) + ms.registerSource(new ApplicationMasterSource(prefix, allocator)) ms.start() metricsSystem = Some(ms) reporterThread = launchReporterThread() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala index 8ea8308d7b23..0fec91658260 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala @@ -21,9 +21,10 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source -private[spark] class ApplicationMasterSource(yarnAllocator: YarnAllocator) extends Source { +private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: YarnAllocator) + extends Source { - override val sourceName: String = "applicationMaster" + override val sourceName: String = prefix + ".applicationMaster" override val metricRegistry: MetricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 129084a86597..1013fd2cc4a8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -152,6 +152,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("100s") + private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace") + .doc("The root namespace for AM metrics reporting.") + .stringConf + .createOptional + private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression") .doc("Node label expression for the AM.") .stringConf From 0b86788e7ec7b367c779cb5517f9dd294f99dd4b Mon Sep 17 00:00:00 2001 From: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com> Date: Mon, 23 Jul 2018 11:10:04 +0200 Subject: [PATCH 11/11] Update running-on-yarn.md --- docs/running-on-yarn.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 0b265b0cb1b3..1c1f40c028a9 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -421,7 +421,14 @@ To use a custom metrics.properties for the application master and executors, upd spark.blacklist.application.maxFailedExecutorsPerNode. - + + spark.yarn.metrics.namespace + (none) + + The root namespace for AM metrics reporting. + If it is not set then the YARN application ID is used. + + # Important notes