Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,39 @@
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# 5. MetricsServlet is added by default as a sink in master, worker and client
# driver, you can send http request "/metrics/json" to get a snapshot of all the
# registered metrics in json format. For master, requests "/metrics/master/json" and
# "/metrics/applications/json" can be sent seperately to get metrics snapshot of
# instance master and applications. MetricsServlet may not be configured by self.
#
# 5. The MetricsServlet sink is added by default as a sink in master, worker and
# driver, you can send HTTP requests to the "/metrics/json" endpoint to get a
# snapshot of all the registered metrics in json format. For master, requests to
# the "/metrics/master/json" and "/metrics/applications/json" endpoints can be
# sent separately to get metrics snapshots of the master instance and applications.
# This MetricsServlet does not have to be configured.

## List of available common sources and their properties.

# org.apache.spark.metrics.source.JvmSource
# Note: Currently, JvmSource is the only available common source
# to add additionaly to an instance, to enable this,
# set the "class" option to its fully qulified class name (see examples below)
# Note: Currently, JvmSource is the only available common source.
# It can be added to an instance by setting the "class" option to its fully
# qualified class name (see examples below).

## List of available sinks and their properties.

# org.apache.spark.metrics.sink.ConsoleSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# unit seconds Unit of the poll period

# org.apache.spark.metrics.sink.CSVSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# unit seconds Unit of the poll period
# directory /tmp Where to store CSV files

# org.apache.spark.metrics.sink.GangliaSink
# Name: Default: Description:
# host NONE Hostname or multicast group of Ganglia server
# port NONE Port of Ganglia server(s)
# host NONE Hostname or multicast group of the Ganglia server, must be set
# port NONE Port of the Ganglia server(s), must be set
# period 10 Poll period
# unit seconds Units of poll period
# unit seconds Unit of the poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'multicast')

Expand All @@ -106,11 +106,11 @@

# org.apache.spark.metrics.sink.GraphiteSink
# Name: Default: Description:
# host NONE Hostname of Graphite server
# port NONE Port of Graphite server
# host NONE Hostname of the Graphite server, must be set
# port NONE Port of the Graphite server, must be set
# period 10 Poll period
# unit seconds Units of poll period
# prefix EMPTY STRING Prefix to prepend to metric name
# unit seconds Unit of the poll period
# prefix EMPTY STRING Prefix to prepend to every metric's name
# protocol tcp Protocol ("tcp" or "udp") to use

## Examples
Expand All @@ -120,47 +120,45 @@
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

# Polling period for ConsoleSink
# Polling period for the ConsoleSink
#*.sink.console.period=10

# Unit of the polling period for the ConsoleSink
#*.sink.console.unit=seconds

# Master instance overlap polling period
# Polling period for the ConsoleSink specific for the master instance
#master.sink.console.period=15

# Unit of the polling period for the ConsoleSink specific for the master instance
#master.sink.console.unit=seconds

# Enable CsvSink for all instances
# Enable CsvSink for all instances by class name
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

# Polling period for CsvSink
# Polling period for the CsvSink
#*.sink.csv.period=1

# Unit of the polling period for the CsvSink
#*.sink.csv.unit=minutes

# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/

# Worker instance overlap polling period
# Polling period for the CsvSink specific for the worker instance
#worker.sink.csv.period=10

# Unit of the polling period for the CsvSink specific for the worker instance
#worker.sink.csv.unit=minutes

# Enable Slf4jSink for all instances by class name
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink

# Polling period for Slf4JSink
# Polling period for the Slf4JSink
#*.sink.slf4j.period=1

# Unit of the polling period for the Slf4jSink
#*.sink.slf4j.unit=minutes


# Enable jvm source for instance master, worker, driver and executor
# Enable JvmSource for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

27 changes: 7 additions & 20 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.metrics

import java.util.Properties
import java.util.concurrent.TimeUnit

import scala.collection.mutable

Expand Down Expand Up @@ -66,10 +65,10 @@ import org.apache.spark.util.Utils
* [options] is the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager
) extends Logging {

private[this] val metricsConfig = new MetricsConfig(conf)

Expand Down Expand Up @@ -97,12 +96,12 @@ private[spark] class MetricsSystem private (
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
sinks.foreach(_.start())
}

def stop() {
if (running) {
sinks.foreach(_.stop)
sinks.foreach(_.stop())
} else {
logWarning("Stopping a MetricsSystem that is not running")
}
Expand Down Expand Up @@ -195,10 +194,9 @@ private[spark] class MetricsSystem private (
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
Expand All @@ -209,17 +207,6 @@ private[spark] object MetricsSystem {
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r

private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
private[this] val MINIMAL_POLL_PERIOD = 1

def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
if (period < MINIMAL_POLL_PERIOD) {
throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
" below than minimal polling period ")
}
}

def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,17 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
private[spark] class ConsoleSink(
override val properties: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends Sink with HasPollingPeriod {

val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"

val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}

val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
val reporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()

override def start() {
reporter.start(pollPeriod, pollUnit)
Expand Down
42 changes: 14 additions & 28 deletions core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,26 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
private[spark] class CsvSink(
override val properties: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends Sink with HasPollingPeriod {

val CSV_DEFAULT_PERIOD = 10
val CSV_DEFAULT_UNIT = "SECONDS"
val CSV_DEFAULT_DIR = "/tmp/"
val DIR_KEY = "directory"
val DEFAULT_DIR = "/tmp/"

val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CSV_DEFAULT_PERIOD
}

val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
val pollDir = Option(properties.getProperty(DIR_KEY)) match {
case Some(s) => s
case None => CSV_DEFAULT_DIR
case None => DEFAULT_DIR
}

val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
val reporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))

override def start() {
reporter.start(pollPeriod, pollUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,40 @@ import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
val GRAPHITE_DEFAULT_UNIT = "SECONDS"
val GRAPHITE_DEFAULT_PREFIX = ""
private[spark] class GraphiteSink(
override val properties: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager
) extends Sink with HasPollingPeriod {
val PREFIX_KEY = "prefix"
val DEFAULT_PREFIX = ""

val GRAPHITE_KEY_HOST = "host"
val GRAPHITE_KEY_PORT = "port"
val GRAPHITE_KEY_PERIOD = "period"
val GRAPHITE_KEY_UNIT = "unit"
val GRAPHITE_KEY_PREFIX = "prefix"
val GRAPHITE_KEY_PROTOCOL = "protocol"
val HOST_KEY = "host"
val PORT_KEY = "port"
val PROTOCOL_KEY = "protocol"

def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(properties.getProperty(prop))

if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
throw new Exception("Graphite sink requires 'host' property.")
}

if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) {
throw new Exception("Graphite sink requires 'port' property.")
}

val host = propertyToOption(GRAPHITE_KEY_HOST).get
val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt

val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
case Some(s) => s.toInt
case None => GRAPHITE_DEFAULT_PERIOD
}

val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
}
require(propertyToOption(HOST_KEY).isDefined, "Graphite sink requires 'host' property.")
require(propertyToOption(PORT_KEY).isDefined, "Graphite sink requires 'port' property.")

val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
val host = propertyToOption(HOST_KEY).get
val port = propertyToOption(PORT_KEY).get.toInt

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val prefix = propertyToOption(PREFIX_KEY).getOrElse(DEFAULT_PREFIX)

val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
val graphite = propertyToOption(PROTOCOL_KEY).map(_.toLowerCase) match {
case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port))
case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port))
case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
}

val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.prefixedWith(prefix)
.build(graphite)
val reporter = GraphiteReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.prefixedWith(prefix)
.build(graphite)

override def start() {
reporter.start(pollPeriod, pollUnit)
Expand Down
Loading