diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index d6962e0da2f30..d70d9cd50b941 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -57,39 +57,39 @@ # added to Java properties using -Dspark.metrics.conf=xxx if you want to # customize metrics system. You can also put the file in ${SPARK_HOME}/conf # and it will be loaded automatically. -# 5. MetricsServlet is added by default as a sink in master, worker and client -# driver, you can send http request "/metrics/json" to get a snapshot of all the -# registered metrics in json format. For master, requests "/metrics/master/json" and -# "/metrics/applications/json" can be sent seperately to get metrics snapshot of -# instance master and applications. MetricsServlet may not be configured by self. -# +# 5. The MetricsServlet sink is added by default as a sink in master, worker and +# driver, you can send HTTP requests to the "/metrics/json" endpoint to get a +# snapshot of all the registered metrics in json format. For master, requests to +# the "/metrics/master/json" and "/metrics/applications/json" endpoints can be +# sent separately to get metrics snapshots of the master instance and applications. +# This MetricsServlet does not have to be configured. ## List of available common sources and their properties. # org.apache.spark.metrics.source.JvmSource -# Note: Currently, JvmSource is the only available common source -# to add additionaly to an instance, to enable this, -# set the "class" option to its fully qulified class name (see examples below) +# Note: Currently, JvmSource is the only available common source. +# It can be added to an instance by setting the "class" option to its fully +# qualified class name (see examples below). ## List of available sinks and their properties. # org.apache.spark.metrics.sink.ConsoleSink # Name: Default: Description: # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # org.apache.spark.metrics.sink.CSVSink # Name: Default: Description: # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # directory /tmp Where to store CSV files # org.apache.spark.metrics.sink.GangliaSink # Name: Default: Description: -# host NONE Hostname or multicast group of Ganglia server -# port NONE Port of Ganglia server(s) +# host NONE Hostname or multicast group of the Ganglia server, must be set +# port NONE Port of the Ganglia server(s), must be set # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # ttl 1 TTL of messages sent by Ganglia # mode multicast Ganglia network mode ('unicast' or 'multicast') @@ -106,11 +106,11 @@ # org.apache.spark.metrics.sink.GraphiteSink # Name: Default: Description: -# host NONE Hostname of Graphite server -# port NONE Port of Graphite server +# host NONE Hostname of the Graphite server, must be set +# port NONE Port of the Graphite server, must be set # period 10 Poll period -# unit seconds Units of poll period -# prefix EMPTY STRING Prefix to prepend to metric name +# unit seconds Unit of the poll period +# prefix EMPTY STRING Prefix to prepend to every metric's name # protocol tcp Protocol ("tcp" or "udp") to use ## Examples @@ -120,42 +120,41 @@ # Enable ConsoleSink for all instances by class name #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink -# Polling period for ConsoleSink +# Polling period for the ConsoleSink #*.sink.console.period=10 - +# Unit of the polling period for the ConsoleSink #*.sink.console.unit=seconds -# Master instance overlap polling period +# Polling period for the ConsoleSink specific for the master instance #master.sink.console.period=15 - +# Unit of the polling period for the ConsoleSink specific for the master instance #master.sink.console.unit=seconds -# Enable CsvSink for all instances +# Enable CsvSink for all instances by class name #*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink -# Polling period for CsvSink +# Polling period for the CsvSink #*.sink.csv.period=1 - +# Unit of the polling period for the CsvSink #*.sink.csv.unit=minutes # Polling directory for CsvSink #*.sink.csv.directory=/tmp/ -# Worker instance overlap polling period +# Polling period for the CsvSink specific for the worker instance #worker.sink.csv.period=10 - +# Unit of the polling period for the CsvSink specific for the worker instance #worker.sink.csv.unit=minutes # Enable Slf4jSink for all instances by class name #*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink -# Polling period for Slf4JSink +# Polling period for the Slf4JSink #*.sink.slf4j.period=1 - +# Unit of the polling period for the Slf4jSink #*.sink.slf4j.unit=minutes - -# Enable jvm source for instance master, worker, driver and executor +# Enable JvmSource for instance master, worker, driver and executor #master.source.jvm.class=org.apache.spark.metrics.source.JvmSource #worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource @@ -163,4 +162,3 @@ #driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource #executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource - diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index e34cfc698dcef..5f0cfee85c711 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -18,7 +18,6 @@ package org.apache.spark.metrics import java.util.Properties -import java.util.concurrent.TimeUnit import scala.collection.mutable @@ -66,10 +65,10 @@ import org.apache.spark.util.Utils * [options] is the specific property of this source or sink. */ private[spark] class MetricsSystem private ( - val instance: String, - conf: SparkConf, - securityMgr: SecurityManager) - extends Logging { + val instance: String, + conf: SparkConf, + securityMgr: SecurityManager +) extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) @@ -97,12 +96,12 @@ private[spark] class MetricsSystem private ( running = true registerSources() registerSinks() - sinks.foreach(_.start) + sinks.foreach(_.start()) } def stop() { if (running) { - sinks.foreach(_.stop) + sinks.foreach(_.stop()) } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -195,10 +194,9 @@ private[spark] class MetricsSystem private ( sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => { + case e: Exception => logError("Sink class " + classPath + " cannot be instantiated") throw e - } } } } @@ -209,17 +207,6 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r - private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS - private[this] val MINIMAL_POLL_PERIOD = 1 - - def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { - val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) - if (period < MINIMAL_POLL_PERIOD) { - throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + - " below than minimal polling period ") - } - } - def createMetricsSystem( instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 81b9056b40fb8..4e7602abcb4ea 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -23,32 +23,17 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val CONSOLE_DEFAULT_PERIOD = 10 - val CONSOLE_DEFAULT_UNIT = "SECONDS" +private[spark] class ConsoleSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { - val CONSOLE_KEY_PERIOD = "period" - val CONSOLE_KEY_UNIT = "unit" - - val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CONSOLE_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build() + val reporter = ConsoleReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build() override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 9d5f2ae9328ad..5dfc610dc4c25 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -24,40 +24,26 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val CSV_KEY_PERIOD = "period" - val CSV_KEY_UNIT = "unit" - val CSV_KEY_DIR = "directory" +private[spark] class CsvSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { - val CSV_DEFAULT_PERIOD = 10 - val CSV_DEFAULT_UNIT = "SECONDS" - val CSV_DEFAULT_DIR = "/tmp/" + val DIR_KEY = "directory" + val DEFAULT_DIR = "/tmp/" - val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CSV_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { + val pollDir = Option(properties.getProperty(DIR_KEY)) match { case Some(s) => s - case None => CSV_DEFAULT_DIR + case None => DEFAULT_DIR } - val reporter: CsvReporter = CsvReporter.forRegistry(registry) - .formatFor(Locale.US) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build(new File(pollDir)) + val reporter = CsvReporter.forRegistry(registry) + .formatFor(Locale.US) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 22454e50b14b4..47354644568dc 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -25,59 +25,40 @@ import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GRAPHITE_DEFAULT_PERIOD = 10 - val GRAPHITE_DEFAULT_UNIT = "SECONDS" - val GRAPHITE_DEFAULT_PREFIX = "" +private[spark] class GraphiteSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { + val PREFIX_KEY = "prefix" + val DEFAULT_PREFIX = "" - val GRAPHITE_KEY_HOST = "host" - val GRAPHITE_KEY_PORT = "port" - val GRAPHITE_KEY_PERIOD = "period" - val GRAPHITE_KEY_UNIT = "unit" - val GRAPHITE_KEY_PREFIX = "prefix" - val GRAPHITE_KEY_PROTOCOL = "protocol" + val HOST_KEY = "host" + val PORT_KEY = "port" + val PROTOCOL_KEY = "protocol" - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) + def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { - throw new Exception("Graphite sink requires 'host' property.") - } - - if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { - throw new Exception("Graphite sink requires 'port' property.") - } - - val host = propertyToOption(GRAPHITE_KEY_HOST).get - val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - - val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { - case Some(s) => s.toInt - case None => GRAPHITE_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) - } + require(propertyToOption(HOST_KEY).isDefined, "Graphite sink requires 'host' property.") + require(propertyToOption(PORT_KEY).isDefined, "Graphite sink requires 'port' property.") - val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + val host = propertyToOption(HOST_KEY).get + val port = propertyToOption(PORT_KEY).get.toInt - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + val prefix = propertyToOption(PREFIX_KEY).getOrElse(DEFAULT_PREFIX) - val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match { + val graphite = propertyToOption(PROTOCOL_KEY).map(_.toLowerCase) match { case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port)) case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port)) case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") } - val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .prefixedWith(prefix) - .build(graphite) + val reporter = GraphiteReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala new file mode 100644 index 0000000000000..fed56c499877f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +private[spark] trait HasPollingPeriod { + def properties: Properties + + protected val (pollPeriod, pollUnit) = { + val PERIOD_KEY = "period" + val DEFAULT_PERIOD = 10 + + val UNIT_KEY = "unit" + val DEFAULT_UNIT = TimeUnit.SECONDS + + val pp = Option(properties.getProperty(PERIOD_KEY)) match { + case Some(s) => s.toInt + case None => DEFAULT_PERIOD + } + val pu = Option(properties.getProperty(UNIT_KEY)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase) + case None => DEFAULT_UNIT + } + + // perform validation against the minimal 1 second period + val MINIMAL_PERIOD = 1 + val period = DEFAULT_UNIT.convert(pp, pu) + require(period > MINIMAL_PERIOD, s"Given polling period $pp $pu below the " + + s"minimal polling period ($MINIMAL_PERIOD $DEFAULT_UNIT)") + (pp, pu) + } +} diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 1992b42ac7f6b..20fbf2c8d065e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -23,10 +23,13 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry} import org.apache.spark.SecurityManager -private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class JmxSink( + val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink { - val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() + val reporter = JmxReporter.forRegistry(registry).build() override def start() { reporter.start() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 68b58b8490641..8af0400e6d8d0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -30,23 +30,22 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ private[spark] class MetricsServlet( - val property: Properties, - val registry: MetricRegistry, - securityMgr: SecurityManager) - extends Sink { + val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink { - val SERVLET_KEY_PATH = "path" - val SERVLET_KEY_SAMPLE = "sample" + val PATH_KEY = "path" + val SAMPLE_KEY = "sample" + val DEFAULT_SAMPLE = false - val SERVLET_DEFAULT_SAMPLE = false + val servletPath = properties.getProperty(PATH_KEY) - val servletPath = property.getProperty(SERVLET_KEY_PATH) + val servletShowSample = Option(properties.getProperty(SAMPLE_KEY)).map(_.toBoolean) + .getOrElse(DEFAULT_SAMPLE) - val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) - .getOrElse(SERVLET_DEFAULT_SAMPLE) - - val mapper = new ObjectMapper().registerModule( - new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) + val mapper = new ObjectMapper() + .registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers(conf: SparkConf): Array[ServletContextHandler] = { Array[ServletContextHandler]( diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index 773e074336cb0..16404a2a061e3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -26,29 +26,12 @@ import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem private[spark] class Slf4jSink( - val property: Properties, - val registry: MetricRegistry, - securityMgr: SecurityManager) - extends Sink { - val SLF4J_DEFAULT_PERIOD = 10 - val SLF4J_DEFAULT_UNIT = "SECONDS" + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { - val SLF4J_KEY_PERIOD = "period" - val SLF4J_KEY_UNIT = "unit" - - val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => SLF4J_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) + val reporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala new file mode 100644 index 0000000000000..b2da5619a40c6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import org.scalatest.PrivateMethodTester + +import org.apache.spark.SparkFunSuite + +class HasPollingPeriodSuite extends SparkFunSuite with PrivateMethodTester { + test("HasPollingPeriod should take default values if not given") { + val hasPollingPeriodImpl = new HasPollingPeriod { + override def properties = new Properties() + } + val pollPeriod = PrivateMethod[Int]('pollPeriod) + val pollUnit = PrivateMethod[TimeUnit]('pollUnit) + assert(hasPollingPeriodImpl.invokePrivate(pollPeriod()) === 10) + assert(hasPollingPeriodImpl.invokePrivate(pollUnit()) === TimeUnit.SECONDS) + } + + test("HasPollingPeriod should pick up values if given") { + val hasPollingPeriodImpl = new HasPollingPeriod { + override def properties = { + val p = new Properties() + p.put("period", "5") + p.put("unit", "minutes") + p + } + } + val pollPeriod = PrivateMethod[Int]('pollPeriod) + val pollUnit = PrivateMethod[TimeUnit]('pollUnit) + assert(hasPollingPeriodImpl.invokePrivate(pollPeriod()) === 5) + assert(hasPollingPeriodImpl.invokePrivate(pollUnit()) === TimeUnit.MINUTES) + } + + test("HasPollingPeriod should throw an exception if the polling period is < 1s") { + val thrown = intercept[IllegalArgumentException] { + new HasPollingPeriod { + override def properties = { + val p = new Properties() + p.put("period", "500") + p.put("unit", "milliseconds") + p + } + } + } + assert(thrown.getMessage contains + "Given polling period 500 MILLISECONDS below the minimal polling period (1 SECONDS)") + } +} diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 3b1880e143513..308892c18cd1a 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -26,51 +26,37 @@ import info.ganglia.gmetric4j.gmetric.GMetric import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GANGLIA_KEY_PERIOD = "period" - val GANGLIA_DEFAULT_PERIOD = 10 +class GangliaSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { - val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS - - val GANGLIA_KEY_MODE = "mode" - val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST + val MODE_KEY = "mode" + val DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST // TTL for multicast messages. If listeners are X hops away in network, must be at least X. - val GANGLIA_KEY_TTL = "ttl" - val GANGLIA_DEFAULT_TTL = 1 - - val GANGLIA_KEY_HOST = "host" - val GANGLIA_KEY_PORT = "port" - - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) + val TTL_KEY = "ttl" + val DEFAULT_TTL = 1 - if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { - throw new Exception("Ganglia sink requires 'host' property.") - } + val HOST_KEY = "host" + val PORT_KEY = "port" - if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { - throw new Exception("Ganglia sink requires 'port' property.") - } + def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - val host = propertyToOption(GANGLIA_KEY_HOST).get - val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt - val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) - .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) - val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) - .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase)) - .getOrElse(GANGLIA_DEFAULT_UNIT) + require(propertyToOption(HOST_KEY).isDefined, "Ganglia sink requires 'host' property.") + require(propertyToOption(PORT_KEY).isDefined, "Ganglia sink requires 'port' property.") - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + val host = propertyToOption(HOST_KEY).get + val port = propertyToOption(PORT_KEY).get.toInt + val ttl = propertyToOption(TTL_KEY).map(_.toInt).getOrElse(DEFAULT_TTL) + val mode: UDPAddressingMode = propertyToOption(MODE_KEY) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(DEFAULT_MODE) val ganglia = new GMetric(host, port, mode, ttl) - val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + + val reporter = GangliaReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(ganglia)