Skip to content
Closed
1 change: 1 addition & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
* `shuffleService`: The Spark shuffle service.
* `applicationMaster`: The Spark application master on YARN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to clarify as "The Spark ApplicationMaster when running on YARN."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated accordingly.

Each instance can report to zero or more _sinks_. Sinks are contained in the
`org.apache.spark.metrics.sink` package:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand All @@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

private val securityMgr = new SecurityManager(sparkConf)

private var metricsSystem: Option[MetricsSystem] = None

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
Expand Down Expand Up @@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
try {
metricsSystem.foreach { ms =>
ms.report()
ms.stop()
}
} catch {
case e: Exception =>
logInfo("Exception during stopping of the metric system: ", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to change to warning log if exception occurred.

}
}
}

Expand Down Expand Up @@ -434,6 +447,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
ms.registerSource(new ApplicationMasterSource(allocator))
ms.start()
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.metrics.source.Source

private[spark] class ApplicationMasterSource(yarnAllocator: YarnAllocator) extends Source {

override val sourceName: String = "applicationMaster"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this is the metrics output:

-- Gauges ----------------------------------------------------------------------
applicationMaster.numContainersPendingAllocate
             value = 0
applicationMaster.numExecutorsFailed
             value = 3
applicationMaster.numExecutorsRunning
             value = 9
applicationMaster.numLocalityAwareTasks
             value = 0
applicationMaster.numReleasedContainers
             value = 0
...

I would suggest to add application id as a prefix to differentiate between different apps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, I was thinking it automatically added the namespace but it looks like that is only on executor and driver instances. Perhaps we should just add it as system that will append in the spark.metrics.namespace setting. for yarn I see the applicationmaster metrics the same as the dag scheduler source, executor allocation manager, etc.. Allowing user to control this makes sense to me. thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Would you please explain more, are you going to add a new configuration "spark.metrics.namespace", also how do you use this configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the config spark.metrics.namespace already exists. see the metrics section in http://spark.apache.org/docs/latest/monitoring.html. But if you look at the code https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L129 its only applied for executor and driver metrics. I think we should have it apply to the yarn metrics as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But I think we may not get "spark.app.id" in AM side, instead I think we can get yarn application id, so either we can set this configuration with application id, or directly prepend to the source name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to make the metric names more app specific. So I will prepend the app ID to the sourcename. And rerun my test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah for the client mode yes there is an order issue with spark.app.id. I'm fine with using the yarn app id since that is essentially what the driver executor use anyway, but I think we should also make it configurable. I like to see these stay consistent. If the user can set the driver/executor metrics with spark.metrics.namespace we should allow them to set the yarn ones so that they all could have similar prefix. Perhaps we add a spark.yarn.metrics.namespace?

application_1530654167152_24008.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.ExecutorAllocationManager$ExecutorAllocationListener
application_1530654167152_25538.2.executor.recordsRead

override val metricRegistry: MetricRegistry = new MetricRegistry()

metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumExecutorsFailed
})

metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumExecutorsRunning
})

metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumReleasedContainers
})

metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
})

metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,16 @@ private[yarn] class YarnAllocator(
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty

// Number of tasks that have locality preferences in active stages
private var numLocalityAwareTasks: Int = 0
private[yarn] var numLocalityAwareTasks: Int = 0

// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumReleasedContainers: Int = releasedContainers.size()

def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
Expand All @@ -167,6 +169,10 @@ private[yarn] class YarnAllocator(
*/
def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)

def numContainersPendingAllocate: Int = synchronized {
getPendingAllocate.size
}

/**
* A sequence of pending container requests at the given location that have not yet been
* fulfilled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.BlacklistTracker
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{Clock, SystemClock}

/**
* YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes
Expand Down