diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 709ce0060e15..523194d8a487 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -69,13 +69,15 @@ import org.apache.spark.util.Utils * [options] represent the specific property of this source or sink. */ private[spark] class MetricsSystem private ( - val instance: String, conf: SparkConf) extends Logging { + val instance: String, + conf: SparkConf, + registry: MetricRegistry) + extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] private val sources = new mutable.ArrayBuffer[Source] - private val registry = new MetricRegistry() private var running: Boolean = false @@ -257,8 +259,11 @@ private[spark] object MetricsSystem { } } - def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem = { - new MetricsSystem(instance, conf) + def createMetricsSystem( + instance: String, + conf: SparkConf, + registry: MetricRegistry = new MetricRegistry): MetricsSystem = { + new MetricsSystem(instance, conf, registry) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 80dc4ff75866..25c2280b7ae8 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -21,7 +21,7 @@ import java.util.Properties import scala.collection.mutable.ArrayBuffer -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{Gauge, MetricRegistry} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -281,6 +281,28 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricsSystem.invokePrivate(sinks()).length === 1) } + + test("MetricsSystem registers dynamically added metrics") { + val registry = new MetricRegistry() + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val instanceName = "testInstance" + val metricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, registry) + metricsSystem.registerSource(source) + assert(!registry.getNames.contains("dummySource.newMetric"), "Metric shouldn't be registered") + + source.metricRegistry.register( + "newMetric", + new Gauge[Integer] { + override def getValue: Integer = 1 + }) + assert( + registry.getNames.contains("dummySource.newMetric"), + "Metric should have been registered") + } } class ThreeParameterConstructorSink(