@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
2121
2222import scala .collection .mutable
2323
24+ import com .codahale .metrics .{Gauge , MetricRegistry }
25+
2426import org .apache .spark .scheduler ._
27+ import org .apache .spark .metrics .source .Source
2528import org .apache .spark .util .{ThreadUtils , Clock , SystemClock , Utils }
2629
2730/**
@@ -144,6 +147,26 @@ private[spark] class ExecutorAllocationManager(
144147 private val executor =
145148 ThreadUtils .newDaemonSingleThreadScheduledExecutor(" spark-dynamic-executor-allocation" )
146149
150+ // Metric source for ExecutorAllocationManager to expose the its internal executor allocation
151+ // status to MetricsSystem.
152+ private [spark] val executorAllocationManagerSource = new Source {
153+ val sourceName = " ExecutorAllocationManager"
154+ val metricRegistry = new MetricRegistry ()
155+
156+ private def registerGauge [T ](name : String , value : => T , defaultValue : T ): Unit = {
157+ metricRegistry.register(MetricRegistry .name(" executors" , name), new Gauge [T ] {
158+ override def getValue : T = synchronized { Option (value).getOrElse(defaultValue) }
159+ })
160+ }
161+
162+ registerGauge(" numberExecutorsToAdd" , numExecutorsToAdd, 0 )
163+ registerGauge(" numberExecutorsPending" , numExecutorsPending, 0 )
164+ registerGauge(" numberExecutorsPendingToRemove" , executorsPendingToRemove.size, 0 )
165+ registerGauge(" numberAllExecutors" , executorIds.size, 0 )
166+ registerGauge(" numberTargetExecutors" , targetNumExecutors(), 0 )
167+ registerGauge(" numberMaxNeededExecutors" , maxNumExecutorsNeeded(), 0 )
168+ }
169+
147170 /**
148171 * Verify that the settings specified through the config are valid.
149172 * If not, throw an appropriate exception.
0 commit comments