From 83d03d58aa5198af47a3bd716b8b2e23889461cd Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 19:31:11 +0100 Subject: [PATCH 01/14] HasPollingPeriod trait --- .../spark/metrics/sink/HasPollingPeriod.scala | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala new file mode 100644 index 0000000000000..cf475a0e62c94 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +private[spark] trait HasPollingPeriod { + def properties: Properties + + private val POLL_UNIT = TimeUnit.SECONDS + private val MINIMAL_POLL_PERIOD = 1 + + private def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { + val period = POLL_UNIT.convert(pollPeriod, pollUnit) + if (period < MINIMAL_POLL_PERIOD) { + throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + + " below than minimal polling period ") + } + } + + private val DEFAULT_PERIOD = 10 + private val DEFAULT_UNIT = "SECONDS" + + private val PERIOD_KEY = "period" + private val UNIT_KEY = "unit" + + protected val pollPeriod = Option(properties.getProperty(PERIOD_KEY)) match { + case Some(s) => s.toInt + case None => DEFAULT_PERIOD + } + + protected val pollUnit = Option(properties.getProperty(UNIT_KEY)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase) + case None => TimeUnit.valueOf(DEFAULT_UNIT) + } + + checkMinimalPollingPeriod(pollUnit, pollPeriod) +} From e2fd74016706a36326d50bd62b5dc0ee5c35a28b Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 22:40:57 +0100 Subject: [PATCH 02/14] made the different sinks mix-in HasPollingPeriod --- .../spark/metrics/sink/ConsoleSink.scala | 31 +++++----------- .../apache/spark/metrics/sink/CsvSink.scala | 27 ++++---------- .../spark/metrics/sink/GraphiteSink.scala | 35 +++++-------------- .../apache/spark/metrics/sink/JmxSink.scala | 7 ++-- .../spark/metrics/sink/MetricsServlet.scala | 16 ++++----- .../apache/spark/metrics/sink/Slf4jSink.scala | 25 +++---------- .../spark/metrics/sink/GangliaSink.scala | 31 +++++----------- 7 files changed, 47 insertions(+), 125 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 81b9056b40fb8..05ab6cb5b9a03 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -23,32 +23,17 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val CONSOLE_DEFAULT_PERIOD = 10 - val CONSOLE_DEFAULT_UNIT = "SECONDS" - - val CONSOLE_KEY_PERIOD = "period" - val CONSOLE_KEY_UNIT = "unit" - - val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CONSOLE_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) +private[spark] class ConsoleSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build() + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build() override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 9d5f2ae9328ad..57f694bc8d2ac 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -24,31 +24,16 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val CSV_KEY_PERIOD = "period" - val CSV_KEY_UNIT = "unit" +private[spark] class CsvSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { val CSV_KEY_DIR = "directory" - - val CSV_DEFAULT_PERIOD = 10 - val CSV_DEFAULT_UNIT = "SECONDS" val CSV_DEFAULT_DIR = "/tmp/" - val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CSV_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { + val pollDir = Option(properties.getProperty(CSV_KEY_DIR)) match { case Some(s) => s case None => CSV_DEFAULT_DIR } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 22454e50b14b4..ec6a85a76f6a4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -25,48 +25,29 @@ import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GRAPHITE_DEFAULT_PERIOD = 10 - val GRAPHITE_DEFAULT_UNIT = "SECONDS" +private[spark] class GraphiteSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { val GRAPHITE_DEFAULT_PREFIX = "" val GRAPHITE_KEY_HOST = "host" val GRAPHITE_KEY_PORT = "port" - val GRAPHITE_KEY_PERIOD = "period" - val GRAPHITE_KEY_UNIT = "unit" val GRAPHITE_KEY_PREFIX = "prefix" val GRAPHITE_KEY_PROTOCOL = "protocol" - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) + def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { - throw new Exception("Graphite sink requires 'host' property.") - } - - if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { - throw new Exception("Graphite sink requires 'port' property.") - } + require(propertyToOption(GRAPHITE_KEY_HOST).isDefined, "Graphite sink requires 'host' property.") + require(propertyToOption(GRAPHITE_KEY_PORT).isDefined, "Graphite sink requires 'port' property.") val host = propertyToOption(GRAPHITE_KEY_HOST).get val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { - case Some(s) => s.toInt - case None => GRAPHITE_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) - } - val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match { case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port)) case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port)) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 1992b42ac7f6b..728df67652c60 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -23,8 +23,11 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry} import org.apache.spark.SecurityManager -private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class JmxSink( + val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink { val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 68b58b8490641..4ebd05f1a9303 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -30,23 +30,23 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ private[spark] class MetricsServlet( - val property: Properties, - val registry: MetricRegistry, - securityMgr: SecurityManager) - extends Sink { + val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink { val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" val SERVLET_DEFAULT_SAMPLE = false - val servletPath = property.getProperty(SERVLET_KEY_PATH) + val servletPath = properties.getProperty(SERVLET_KEY_PATH) - val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) + val servletShowSample = Option(properties.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) .getOrElse(SERVLET_DEFAULT_SAMPLE) - val mapper = new ObjectMapper().registerModule( - new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) + val mapper = new ObjectMapper() + .registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers(conf: SparkConf): Array[ServletContextHandler] = { Array[ServletContextHandler]( diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index 773e074336cb0..3918b658ee7ca 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -26,27 +26,10 @@ import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem private[spark] class Slf4jSink( - val property: Properties, - val registry: MetricRegistry, - securityMgr: SecurityManager) - extends Sink { - val SLF4J_DEFAULT_PERIOD = 10 - val SLF4J_DEFAULT_UNIT = "SECONDS" - - val SLF4J_KEY_PERIOD = "period" - val SLF4J_KEY_UNIT = "unit" - - val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => SLF4J_DEFAULT_PERIOD - } - - val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) - case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 3b1880e143513..e44a2ca46561a 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -26,15 +26,12 @@ import info.ganglia.gmetric4j.gmetric.GMetric import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem -class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GANGLIA_KEY_PERIOD = "period" - val GANGLIA_DEFAULT_PERIOD = 10 - - val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS +class GangliaSink( + override val properties: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink with HasPollingPeriod { val GANGLIA_KEY_MODE = "mode" val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST @@ -46,28 +43,16 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, val GANGLIA_KEY_HOST = "host" val GANGLIA_KEY_PORT = "port" - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) + def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { - throw new Exception("Ganglia sink requires 'host' property.") - } - - if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { - throw new Exception("Ganglia sink requires 'port' property.") - } + require(propertyToOption(GANGLIA_KEY_HOST).isDefined, "Ganglia sink requires 'host' property.") + require(propertyToOption(GANGLIA_KEY_PORT).isDefined, "Ganglia sink requires 'port' property.") val host = propertyToOption(GANGLIA_KEY_HOST).get val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) - val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) - .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase)) - .getOrElse(GANGLIA_DEFAULT_UNIT) - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) val ganglia = new GMetric(host, port, mode, ttl) val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) From 1ebb657f6857ea7d5064677b378bd4eb223112bf Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 22:41:34 +0100 Subject: [PATCH 03/14] rmd checkMinimalPollingPeriod from MetricsSystem --- .../apache/spark/metrics/MetricsSystem.scala | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index e34cfc698dcef..fa6617ab05690 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -18,7 +18,6 @@ package org.apache.spark.metrics import java.util.Properties -import java.util.concurrent.TimeUnit import scala.collection.mutable @@ -68,8 +67,8 @@ import org.apache.spark.util.Utils private[spark] class MetricsSystem private ( val instance: String, conf: SparkConf, - securityMgr: SecurityManager) - extends Logging { + securityMgr: SecurityManager +) extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) @@ -97,12 +96,12 @@ private[spark] class MetricsSystem private ( running = true registerSources() registerSinks() - sinks.foreach(_.start) + sinks.foreach(_.start()) } def stop() { if (running) { - sinks.foreach(_.stop) + sinks.foreach(_.stop()) } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -195,10 +194,9 @@ private[spark] class MetricsSystem private ( sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => { + case e: Exception => logError("Sink class " + classPath + " cannot be instantiated") throw e - } } } } @@ -209,17 +207,6 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r - private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS - private[this] val MINIMAL_POLL_PERIOD = 1 - - def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { - val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) - if (period < MINIMAL_POLL_PERIOD) { - throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + - " below than minimal polling period ") - } - } - def createMetricsSystem( instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr) From ca0cefd8549e10670ec0643d054bb8fd704d9bbb Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 22:43:40 +0100 Subject: [PATCH 04/14] mvd the "checkMinimalPollingPeriod" method to the companion object of trait HasPollingPeriod --- .../spark/metrics/sink/HasPollingPeriod.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala index cf475a0e62c94..1aba33458e2f0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala @@ -19,19 +19,21 @@ package org.apache.spark.metrics.sink import java.util.Properties import java.util.concurrent.TimeUnit -private[spark] trait HasPollingPeriod { - def properties: Properties +private object HasPollingPeriod { + val POLL_UNIT = TimeUnit.SECONDS + val MINIMAL_POLL_PERIOD = 1 - private val POLL_UNIT = TimeUnit.SECONDS - private val MINIMAL_POLL_PERIOD = 1 - - private def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { + def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { val period = POLL_UNIT.convert(pollPeriod, pollUnit) if (period < MINIMAL_POLL_PERIOD) { throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + " below than minimal polling period ") } } +} + +private[spark] trait HasPollingPeriod { + def properties: Properties private val DEFAULT_PERIOD = 10 private val DEFAULT_UNIT = "SECONDS" @@ -49,5 +51,6 @@ private[spark] trait HasPollingPeriod { case None => TimeUnit.valueOf(DEFAULT_UNIT) } + import HasPollingPeriod._ checkMinimalPollingPeriod(pollUnit, pollPeriod) } From 47ed641b25e30e9e00823b27ae55b5aa174244ae Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 23:30:31 +0100 Subject: [PATCH 05/14] renamed a few variables --- .../apache/spark/metrics/sink/CsvSink.scala | 9 ++++--- .../spark/metrics/sink/GraphiteSink.scala | 22 ++++++++-------- .../spark/metrics/sink/MetricsServlet.scala | 13 +++++----- .../spark/metrics/sink/GangliaSink.scala | 26 +++++++++---------- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 57f694bc8d2ac..2879b287c3ae1 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -30,12 +30,13 @@ private[spark] class CsvSink( val registry: MetricRegistry, securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - val CSV_KEY_DIR = "directory" - val CSV_DEFAULT_DIR = "/tmp/" - val pollDir = Option(properties.getProperty(CSV_KEY_DIR)) match { + val DIR_KEY = "directory" + val DEFAULT_DIR = "/tmp/" + + val pollDir = Option(properties.getProperty(DIR_KEY)) match { case Some(s) => s - case None => CSV_DEFAULT_DIR + case None => DEFAULT_DIR } val reporter: CsvReporter = CsvReporter.forRegistry(registry) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index ec6a85a76f6a4..84bca75dafac6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -31,24 +31,24 @@ private[spark] class GraphiteSink( val registry: MetricRegistry, securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - val GRAPHITE_DEFAULT_PREFIX = "" + val PREFIX_KEY = "prefix" + val DEFAULT_PREFIX = "" - val GRAPHITE_KEY_HOST = "host" - val GRAPHITE_KEY_PORT = "port" - val GRAPHITE_KEY_PREFIX = "prefix" - val GRAPHITE_KEY_PROTOCOL = "protocol" + val HOST_KEY = "host" + val PORT_KEY = "port" + val PROTOCOL_KEY = "protocol" def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - require(propertyToOption(GRAPHITE_KEY_HOST).isDefined, "Graphite sink requires 'host' property.") - require(propertyToOption(GRAPHITE_KEY_PORT).isDefined, "Graphite sink requires 'port' property.") + require(propertyToOption(HOST_KEY).isDefined, "Graphite sink requires 'host' property.") + require(propertyToOption(PORT_KEY).isDefined, "Graphite sink requires 'port' property.") - val host = propertyToOption(GRAPHITE_KEY_HOST).get - val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + val host = propertyToOption(HOST_KEY).get + val port = propertyToOption(PORT_KEY).get.toInt - val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + val prefix = propertyToOption(PREFIX_KEY).getOrElse(DEFAULT_PREFIX) - val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match { + val graphite = propertyToOption(PROTOCOL_KEY).map(_.toLowerCase) match { case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port)) case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port)) case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 4ebd05f1a9303..8af0400e6d8d0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -35,15 +35,14 @@ private[spark] class MetricsServlet( securityMgr: SecurityManager ) extends Sink { - val SERVLET_KEY_PATH = "path" - val SERVLET_KEY_SAMPLE = "sample" + val PATH_KEY = "path" + val SAMPLE_KEY = "sample" + val DEFAULT_SAMPLE = false - val SERVLET_DEFAULT_SAMPLE = false + val servletPath = properties.getProperty(PATH_KEY) - val servletPath = properties.getProperty(SERVLET_KEY_PATH) - - val servletShowSample = Option(properties.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) - .getOrElse(SERVLET_DEFAULT_SAMPLE) + val servletShowSample = Option(properties.getProperty(SAMPLE_KEY)).map(_.toBoolean) + .getOrElse(DEFAULT_SAMPLE) val mapper = new ObjectMapper() .registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index e44a2ca46561a..835d4798a95cf 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -33,26 +33,26 @@ class GangliaSink( securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - val GANGLIA_KEY_MODE = "mode" - val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST + val MODE_KEY = "mode" + val DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST // TTL for multicast messages. If listeners are X hops away in network, must be at least X. - val GANGLIA_KEY_TTL = "ttl" - val GANGLIA_DEFAULT_TTL = 1 + val TTL_KEY = "ttl" + val DEFAULT_TTL = 1 - val GANGLIA_KEY_HOST = "host" - val GANGLIA_KEY_PORT = "port" + val HOST_KEY = "host" + val PORT_KEY = "port" def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop)) - require(propertyToOption(GANGLIA_KEY_HOST).isDefined, "Ganglia sink requires 'host' property.") - require(propertyToOption(GANGLIA_KEY_PORT).isDefined, "Ganglia sink requires 'port' property.") + require(propertyToOption(HOST_KEY).isDefined, "Ganglia sink requires 'host' property.") + require(propertyToOption(PORT_KEY).isDefined, "Ganglia sink requires 'port' property.") - val host = propertyToOption(GANGLIA_KEY_HOST).get - val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt - val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) - .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val host = propertyToOption(HOST_KEY).get + val port = propertyToOption(PORT_KEY).get.toInt + val ttl = propertyToOption(TTL_KEY).map(_.toInt).getOrElse(DEFAULT_TTL) + val mode: UDPAddressingMode = propertyToOption(MODE_KEY) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(DEFAULT_MODE) val ganglia = new GMetric(host, port, mode, ttl) val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) From 02cf327d68db3b05b7fff516f78aad05e399f6fb Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 23:41:55 +0100 Subject: [PATCH 06/14] added "reporter" to the Sink trait --- core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 9fad4e7deacb6..d17c1cc4c759a 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -17,7 +17,10 @@ package org.apache.spark.metrics.sink +import com.codahale.metrics.ScheduledReporter + private[spark] trait Sink { + def reporter: ScheduledReporter def start(): Unit def stop(): Unit def report(): Unit From a5f33e480bedb3eecbfb6e8403dae4cb433b890e Mon Sep 17 00:00:00 2001 From: BenFradet Date: Fri, 15 Jan 2016 23:42:28 +0100 Subject: [PATCH 07/14] overrode "reporter" for the sinks --- .../main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 2 +- .../src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 2 +- .../src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/MetricsServlet.scala | 2 ++ .../main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala | 2 +- .../main/scala/org/apache/spark/metrics/sink/GangliaSink.scala | 3 ++- 7 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 05ab6cb5b9a03..8d4ce677587b7 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -30,7 +30,7 @@ private[spark] class ConsoleSink( securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) + override val reporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 2879b287c3ae1..ed9b8615c3d50 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -39,7 +39,7 @@ private[spark] class CsvSink( case None => DEFAULT_DIR } - val reporter: CsvReporter = CsvReporter.forRegistry(registry) + override val reporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 84bca75dafac6..bc47eefb93da2 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -54,7 +54,7 @@ private[spark] class GraphiteSink( case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") } - val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + override val reporter = GraphiteReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .prefixedWith(prefix) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 728df67652c60..1ebe978783a6e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -29,7 +29,7 @@ private[spark] class JmxSink( securityMgr: SecurityManager ) extends Sink { - val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() + override val reporter = JmxReporter.forRegistry(registry).build() override def start() { reporter.start() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 8af0400e6d8d0..afea36bf5dedc 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -58,6 +58,8 @@ private[spark] class MetricsServlet( mapper.writeValueAsString(registry) } + override val reporter = _ + override def start() { } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index 3918b658ee7ca..a6b89d5edbd8d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -31,7 +31,7 @@ private[spark] class Slf4jSink( securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) + override val reporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 835d4798a95cf..f4bcaa6a6c6cc 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -55,7 +55,8 @@ class GangliaSink( .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(DEFAULT_MODE) val ganglia = new GMetric(host, port, mode, ttl) - val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + + override val reporter = GangliaReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(ganglia) From 6215291741628c51c90c2c370a4847953e0350df Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sat, 16 Jan 2016 15:47:39 +0100 Subject: [PATCH 08/14] rmd "reporter" from the Sink trait --- .../main/scala/org/apache/spark/metrics/MetricsSystem.scala | 6 +++--- .../scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 2 +- .../main/scala/org/apache/spark/metrics/sink/CsvSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 2 +- .../main/scala/org/apache/spark/metrics/sink/JmxSink.scala | 2 +- .../org/apache/spark/metrics/sink/MetricsServlet.scala | 2 -- .../src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 3 --- .../scala/org/apache/spark/metrics/sink/Slf4jSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/GangliaSink.scala | 2 +- 9 files changed, 9 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index fa6617ab05690..5f0cfee85c711 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -65,9 +65,9 @@ import org.apache.spark.util.Utils * [options] is the specific property of this source or sink. */ private[spark] class MetricsSystem private ( - val instance: String, - conf: SparkConf, - securityMgr: SecurityManager + val instance: String, + conf: SparkConf, + securityMgr: SecurityManager ) extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 8d4ce677587b7..4e7602abcb4ea 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -30,7 +30,7 @@ private[spark] class ConsoleSink( securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - override val reporter = ConsoleReporter.forRegistry(registry) + val reporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index ed9b8615c3d50..d299c005f206f 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -39,7 +39,7 @@ private[spark] class CsvSink( case None => DEFAULT_DIR } - override val reporter = CsvReporter.forRegistry(registry) + val reporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index bc47eefb93da2..9e99ca8685590 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -54,7 +54,7 @@ private[spark] class GraphiteSink( case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") } - override val reporter = GraphiteReporter.forRegistry(registry) + val reporter = GraphiteReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .prefixedWith(prefix) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 1ebe978783a6e..20fbf2c8d065e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -29,7 +29,7 @@ private[spark] class JmxSink( securityMgr: SecurityManager ) extends Sink { - override val reporter = JmxReporter.forRegistry(registry).build() + val reporter = JmxReporter.forRegistry(registry).build() override def start() { reporter.start() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index afea36bf5dedc..8af0400e6d8d0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -58,8 +58,6 @@ private[spark] class MetricsServlet( mapper.writeValueAsString(registry) } - override val reporter = _ - override def start() { } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index d17c1cc4c759a..9fad4e7deacb6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -17,10 +17,7 @@ package org.apache.spark.metrics.sink -import com.codahale.metrics.ScheduledReporter - private[spark] trait Sink { - def reporter: ScheduledReporter def start(): Unit def stop(): Unit def report(): Unit diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index a6b89d5edbd8d..16404a2a061e3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -31,7 +31,7 @@ private[spark] class Slf4jSink( securityMgr: SecurityManager ) extends Sink with HasPollingPeriod { - override val reporter = Slf4jReporter.forRegistry(registry) + val reporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index f4bcaa6a6c6cc..308892c18cd1a 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -56,7 +56,7 @@ class GangliaSink( val ganglia = new GMetric(host, port, mode, ttl) - override val reporter = GangliaReporter.forRegistry(registry) + val reporter = GangliaReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(ganglia) From 8f6bb1d38873099a14fb9b81823f968c2be3d225 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sat, 16 Jan 2016 16:10:40 +0100 Subject: [PATCH 09/14] corrected a few things in the metrics.properties.template --- conf/metrics.properties.template | 62 ++++++++++++++++---------------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index d6962e0da2f30..c07509a0f718c 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -57,39 +57,39 @@ # added to Java properties using -Dspark.metrics.conf=xxx if you want to # customize metrics system. You can also put the file in ${SPARK_HOME}/conf # and it will be loaded automatically. -# 5. MetricsServlet is added by default as a sink in master, worker and client -# driver, you can send http request "/metrics/json" to get a snapshot of all the -# registered metrics in json format. For master, requests "/metrics/master/json" and -# "/metrics/applications/json" can be sent seperately to get metrics snapshot of -# instance master and applications. MetricsServlet may not be configured by self. -# +# 5. The MetricsServlet sink is added by default as a sink in master, worker and +# driver, you can send HTTP requests to the "/metrics/json" endpoint to get a +# snapshot of all the registered metrics in json format. For master, requests to +# the "/metrics/master/json" and "/metrics/applications/json" endpoints can be +# sent separately to get metrics snapshots of the master instance and applications. +# This MetricsServlet does not have to be configured. ## List of available common sources and their properties. # org.apache.spark.metrics.source.JvmSource -# Note: Currently, JvmSource is the only available common source -# to add additionaly to an instance, to enable this, -# set the "class" option to its fully qulified class name (see examples below) +# Note: Currently, JvmSource is the only available common source. +# Additionally, you can add your own source by setting the +# "class" option to its fully qualified class name (see examples below). ## List of available sinks and their properties. # org.apache.spark.metrics.sink.ConsoleSink # Name: Default: Description: # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # org.apache.spark.metrics.sink.CSVSink # Name: Default: Description: # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # directory /tmp Where to store CSV files # org.apache.spark.metrics.sink.GangliaSink # Name: Default: Description: -# host NONE Hostname or multicast group of Ganglia server -# port NONE Port of Ganglia server(s) +# host NONE Hostname or multicast group of the Ganglia server, must be set +# port NONE Port of the Ganglia server(s), must be set # period 10 Poll period -# unit seconds Units of poll period +# unit seconds Unit of the poll period # ttl 1 TTL of messages sent by Ganglia # mode multicast Ganglia network mode ('unicast' or 'multicast') @@ -106,11 +106,11 @@ # org.apache.spark.metrics.sink.GraphiteSink # Name: Default: Description: -# host NONE Hostname of Graphite server -# port NONE Port of Graphite server +# host NONE Hostname of the Graphite server, must be set +# port NONE Port of the Graphite server, must be set # period 10 Poll period -# unit seconds Units of poll period -# prefix EMPTY STRING Prefix to prepend to metric name +# unit seconds Unit of the poll period +# prefix EMPTY STRING Prefix to prepend to every metric's name # protocol tcp Protocol ("tcp" or "udp") to use ## Examples @@ -120,42 +120,41 @@ # Enable ConsoleSink for all instances by class name #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink -# Polling period for ConsoleSink +# Polling period for the ConsoleSink #*.sink.console.period=10 - +# Unit of the polling period for the ConsoleSink #*.sink.console.unit=seconds -# Master instance overlap polling period +# Polling period for the ConsoleSink specific for the master instance #master.sink.console.period=15 - +# Unit of the polling period for the ConsoleSink specific for the master instance #master.sink.console.unit=seconds -# Enable CsvSink for all instances +# Enable CsvSink for all instances by class name #*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink -# Polling period for CsvSink +# Polling period for the CsvSink #*.sink.csv.period=1 - +# Unit of the polling period for the CsvSink #*.sink.csv.unit=minutes # Polling directory for CsvSink #*.sink.csv.directory=/tmp/ -# Worker instance overlap polling period +# Polling period for the CsvSink specific for the worker instance #worker.sink.csv.period=10 - +# Unit of the polling period for the CsvSink specific for the worker instance #worker.sink.csv.unit=minutes # Enable Slf4jSink for all instances by class name #*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink -# Polling period for Slf4JSink +# Polling period for the Slf4JSink #*.sink.slf4j.period=1 - +# Unit of the polling period for the Slf4jSink #*.sink.slf4j.unit=minutes - -# Enable jvm source for instance master, worker, driver and executor +# Enable JvmSource for instance master, worker, driver and executor #master.source.jvm.class=org.apache.spark.metrics.source.JvmSource #worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource @@ -163,4 +162,3 @@ #driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource #executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource - From 5af6e7da62b75103ce3d8626ec46258468a37923 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sat, 16 Jan 2016 17:23:03 +0100 Subject: [PATCH 10/14] better exception message in case of polling period < 1s --- .../org/apache/spark/metrics/sink/HasPollingPeriod.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala index 1aba33458e2f0..fc670282af583 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala @@ -26,8 +26,8 @@ private object HasPollingPeriod { def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { val period = POLL_UNIT.convert(pollPeriod, pollUnit) if (period < MINIMAL_POLL_PERIOD) { - throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + - " below than minimal polling period ") + throw new IllegalArgumentException(s"Given polling period $pollPeriod $pollUnit below the " + + s"minimal polling period ($MINIMAL_POLL_PERIOD $POLL_UNIT)") } } } From c6d62d3ded1aabff50013fab2f63a84763b469c5 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sat, 16 Jan 2016 17:26:17 +0100 Subject: [PATCH 11/14] test suite for the HasPollingPeriod trait --- .../metrics/sink/HasPollingPeriodSuite.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala new file mode 100644 index 0000000000000..aa1a668425781 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import org.scalatest.PrivateMethodTester + +import org.apache.spark.SparkFunSuite + +class HasPollingPeriodSuite extends SparkFunSuite with PrivateMethodTester { + test("HasPollingPeriod should take default values if not given") { + val hasPollingPeriodImpl = new HasPollingPeriod { + override def properties = new Properties() + } + val pollPeriod = PrivateMethod[Int]('pollPeriod) + val pollUnit = PrivateMethod[TimeUnit]('pollUnit) + assert(hasPollingPeriodImpl.invokePrivate(pollPeriod()) === 10) + assert(hasPollingPeriodImpl.invokePrivate(pollUnit()) === TimeUnit.SECONDS) + } + + test("HasPollingPeriod should pick up values if given") { + val hasPollingPeriodImpl = new HasPollingPeriod { + override def properties = { + val p = new Properties() + p.put("period", "5") + p.put("unit", "minutes") + p + } + } + val pollPeriod = PrivateMethod[Int]('pollPeriod) + val pollUnit = PrivateMethod[TimeUnit]('pollUnit) + assert(hasPollingPeriodImpl.invokePrivate(pollPeriod()) === 5) + assert(hasPollingPeriodImpl.invokePrivate(pollUnit()) === TimeUnit.MINUTES) + } + + test("HasPollingPeriod should throw an exception if the polling period is < 1s") { + val thrown = intercept[IllegalArgumentException] { + new HasPollingPeriod { + override def properties = { + val p = new Properties() + p.put("period", "500") + p.put("unit", "milliseconds") + p + } + } + } + assert(thrown.getMessage === + "Given polling period 500 MILLISECONDS below the minimal polling period (1 SECONDS)") + } +} From 1b9bda59faa25008dc47848e16d34c6afb3ba160 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sun, 17 Jan 2016 10:44:40 +0100 Subject: [PATCH 12/14] clarified metrics doc --- conf/metrics.properties.template | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index c07509a0f718c..d70d9cd50b941 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -68,8 +68,8 @@ # org.apache.spark.metrics.source.JvmSource # Note: Currently, JvmSource is the only available common source. -# Additionally, you can add your own source by setting the -# "class" option to its fully qualified class name (see examples below). +# It can be added to an instance by setting the "class" option to its fully +# qualified class name (see examples below). ## List of available sinks and their properties. From bd73bcba80b5f1af6490ad208075e1808c1d6420 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sun, 17 Jan 2016 11:12:58 +0100 Subject: [PATCH 13/14] refactored the HasPollingPeriod trait to make it more readable --- .../spark/metrics/sink/HasPollingPeriod.scala | 46 ++++++++----------- .../metrics/sink/HasPollingPeriodSuite.scala | 2 +- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala index fc670282af583..fed56c499877f 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/HasPollingPeriod.scala @@ -19,38 +19,30 @@ package org.apache.spark.metrics.sink import java.util.Properties import java.util.concurrent.TimeUnit -private object HasPollingPeriod { - val POLL_UNIT = TimeUnit.SECONDS - val MINIMAL_POLL_PERIOD = 1 - - def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { - val period = POLL_UNIT.convert(pollPeriod, pollUnit) - if (period < MINIMAL_POLL_PERIOD) { - throw new IllegalArgumentException(s"Given polling period $pollPeriod $pollUnit below the " + - s"minimal polling period ($MINIMAL_POLL_PERIOD $POLL_UNIT)") - } - } -} - private[spark] trait HasPollingPeriod { def properties: Properties - private val DEFAULT_PERIOD = 10 - private val DEFAULT_UNIT = "SECONDS" + protected val (pollPeriod, pollUnit) = { + val PERIOD_KEY = "period" + val DEFAULT_PERIOD = 10 - private val PERIOD_KEY = "period" - private val UNIT_KEY = "unit" + val UNIT_KEY = "unit" + val DEFAULT_UNIT = TimeUnit.SECONDS - protected val pollPeriod = Option(properties.getProperty(PERIOD_KEY)) match { - case Some(s) => s.toInt - case None => DEFAULT_PERIOD - } + val pp = Option(properties.getProperty(PERIOD_KEY)) match { + case Some(s) => s.toInt + case None => DEFAULT_PERIOD + } + val pu = Option(properties.getProperty(UNIT_KEY)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase) + case None => DEFAULT_UNIT + } - protected val pollUnit = Option(properties.getProperty(UNIT_KEY)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase) - case None => TimeUnit.valueOf(DEFAULT_UNIT) + // perform validation against the minimal 1 second period + val MINIMAL_PERIOD = 1 + val period = DEFAULT_UNIT.convert(pp, pu) + require(period > MINIMAL_PERIOD, s"Given polling period $pp $pu below the " + + s"minimal polling period ($MINIMAL_PERIOD $DEFAULT_UNIT)") + (pp, pu) } - - import HasPollingPeriod._ - checkMinimalPollingPeriod(pollUnit, pollPeriod) } diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala index aa1a668425781..b2da5619a40c6 100644 --- a/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/sink/HasPollingPeriodSuite.scala @@ -61,7 +61,7 @@ class HasPollingPeriodSuite extends SparkFunSuite with PrivateMethodTester { } } } - assert(thrown.getMessage === + assert(thrown.getMessage contains "Given polling period 500 MILLISECONDS below the minimal polling period (1 SECONDS)") } } From b43353f18f9252a20319d13701ab73f0e7f64673 Mon Sep 17 00:00:00 2001 From: BenFradet Date: Sun, 17 Jan 2016 22:37:52 +0100 Subject: [PATCH 14/14] coherent formatting --- .../scala/org/apache/spark/metrics/sink/CsvSink.scala | 8 ++++---- .../org/apache/spark/metrics/sink/GraphiteSink.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index d299c005f206f..5dfc610dc4c25 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -40,10 +40,10 @@ private[spark] class CsvSink( } val reporter = CsvReporter.forRegistry(registry) - .formatFor(Locale.US) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build(new File(pollDir)) + .formatFor(Locale.US) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 9e99ca8685590..47354644568dc 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -55,10 +55,10 @@ private[spark] class GraphiteSink( } val reporter = GraphiteReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .prefixedWith(prefix) - .build(graphite) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) override def start() { reporter.start(pollPeriod, pollUnit)