@@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
7676 private val sources = new mutable.ArrayBuffer [Source ]
7777 private val registry = new MetricRegistry ()
7878
79+ private var running : Boolean = false
80+
7981 // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
8082 private var metricsServlet : Option [MetricsServlet ] = None
8183
82- /** Get any UI handlers used by this metrics system. */
83- def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array ())
84+ /**
85+ * Get any UI handlers used by this metrics system; can only be called after start().
86+ */
87+ def getServletHandlers = {
88+ require(running, " Can only call getServletHandlers on a running MetricsSystem" )
89+ metricsServlet.map(_.getHandlers).getOrElse(Array ())
90+ }
8491
8592 metricsConfig.initialize()
8693
8794 def start () {
95+ require(! running, " Attempting to start a MetricsSystem that is already running" )
96+ running = true
8897 registerSources()
8998 registerSinks()
9099 sinks.foreach(_.start)
91100 }
92101
93102 def stop () {
94- sinks.foreach(_.stop)
103+ if (running) {
104+ sinks.foreach(_.stop)
105+ } else {
106+ logWarning(" Stopping a MetricsSystem that is not running" )
107+ }
108+ running = false
95109 }
96110
97111 def report () {
@@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
107121 * @return An unique metric name for each combination of
108122 * application, executor/driver and metric source.
109123 */
110- def buildRegistryName (source : Source ): String = {
124+ private [spark] def buildRegistryName (source : Source ): String = {
111125 val appId = conf.getOption(" spark.app.id" )
112126 val executorId = conf.getOption(" spark.executor.id" )
113127 val defaultName = MetricRegistry .name(source.sourceName)
@@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
144158 })
145159 }
146160
147- def registerSources () {
161+ private def registerSources () {
148162 val instConfig = metricsConfig.getInstance(instance)
149163 val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem .SOURCE_REGEX )
150164
@@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
160174 }
161175 }
162176
163- def registerSinks () {
177+ private def registerSinks () {
164178 val instConfig = metricsConfig.getInstance(instance)
165179 val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem .SINK_REGEX )
166180
0 commit comments