diff --git a/LICENSE b/LICENSE index 1ef1f86fd704..6b169b1447f1 100644 --- a/LICENSE +++ b/LICENSE @@ -216,6 +216,7 @@ core/src/main/resources/org/apache/spark/ui/static/bootstrap* core/src/main/resources/org/apache/spark/ui/static/jsonFormatter* core/src/main/resources/org/apache/spark/ui/static/vis* docs/js/vendor/bootstrap.js +external/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java Python Software Foundation License diff --git a/LICENSE-binary b/LICENSE-binary index 7865d9df6314..6858193515a8 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -243,10 +243,10 @@ com.vlkan:flatbuffers com.ning:compress-lzf io.airlift:aircompressor io.dropwizard.metrics:metrics-core -io.dropwizard.metrics:metrics-ganglia io.dropwizard.metrics:metrics-graphite io.dropwizard.metrics:metrics-json io.dropwizard.metrics:metrics-jvm +io.dropwizard.metrics:metrics-jmx org.iq80.snappy:snappy com.clearspring.analytics:stream com.jamesmurty.utils:java-xmlbuilder diff --git a/NOTICE b/NOTICE index fefe08b38afc..d5ea8dedb311 100644 --- a/NOTICE +++ b/NOTICE @@ -26,3 +26,16 @@ The following provides more details on the included cryptographic software: This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to support authentication, and encryption and decryption of data sent across the network between services. + + +Metrics +Copyright 2010-2013 Coda Hale and Yammer, Inc. + +This product includes software developed by Coda Hale and Yammer, Inc. + +This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, +LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file diff --git a/NOTICE-binary b/NOTICE-binary index d99c2d1c64c2..4ce8bf2f86b2 100644 --- a/NOTICE-binary +++ b/NOTICE-binary @@ -1515,3 +1515,16 @@ Copyright 2014-2017 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + + +Metrics +Copyright 2010-2013 Coda Hale and Yammer, Inc. + +This product includes software developed by Coda Hale and Yammer, Inc. + +This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, +LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file diff --git a/core/pom.xml b/core/pom.xml index 38eb8adac500..3eedc69c9593 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -292,6 +292,16 @@ io.dropwizard.metrics metrics-graphite + + + com.rabbitmq + amqp-client + + + + + io.dropwizard.metrics + metrics-jmx com.fasterxml.jackson.core diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 9e94a868ccc3..a7b7b5573cfe 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -19,7 +19,8 @@ package org.apache.spark.metrics.sink import java.util.Properties -import com.codahale.metrics.{JmxReporter, MetricRegistry} +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.jmx.JmxReporter import org.apache.spark.SecurityManager diff --git a/dev/.rat-excludes b/dev/.rat-excludes index e12dc994b084..73f461255de4 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -118,3 +118,4 @@ announce.tmpl vote.tmpl SessionManager.java SessionHandler.java +GangliaReporter.java diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 945686de4996..804a178a5fe2 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -30,6 +30,8 @@ + spark-core_${scala.binary.version} ${project.version} - - io.dropwizard.metrics - metrics-ganglia + info.ganglia.gmetric4j + gmetric4j + 1.0.10 diff --git a/external/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java b/external/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java new file mode 100644 index 000000000000..019ee08e0918 --- /dev/null +++ b/external/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java @@ -0,0 +1,426 @@ +// Copied from +// https://raw.githubusercontent.com/dropwizard/metrics/v3.2.6/metrics-ganglia/ +// src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java + +package com.codahale.metrics.ganglia; + +import com.codahale.metrics.*; +import com.codahale.metrics.MetricAttribute; +import info.ganglia.gmetric4j.gmetric.GMetric; +import info.ganglia.gmetric4j.gmetric.GMetricSlope; +import info.ganglia.gmetric4j.gmetric.GMetricType; +import info.ganglia.gmetric4j.gmetric.GangliaException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import static com.codahale.metrics.MetricRegistry.name; +import static com.codahale.metrics.MetricAttribute.*; + +/** + * A reporter which announces metric values to a Ganglia cluster. + * + * @see Ganglia Monitoring System + */ +public class GangliaReporter extends ScheduledReporter { + + private static final Pattern SLASHES = Pattern.compile("\\\\"); + + /** + * Returns a new {@link Builder} for {@link GangliaReporter}. + * + * @param registry the registry to report + * @return a {@link Builder} instance for a {@link GangliaReporter} + */ + public static Builder forRegistry(MetricRegistry registry) { + return new Builder(registry); + } + + /** + * A builder for {@link GangliaReporter} instances. Defaults to using a {@code tmax} of {@code 60}, + * a {@code dmax} of {@code 0}, converting rates to events/second, converting durations to + * milliseconds, and not filtering metrics. + */ + public static class Builder { + private final MetricRegistry registry; + private String prefix; + private int tMax; + private int dMax; + private TimeUnit rateUnit; + private TimeUnit durationUnit; + private MetricFilter filter; + private ScheduledExecutorService executor; + private boolean shutdownExecutorOnStop; + private Set disabledMetricAttributes = Collections.emptySet(); + + private Builder(MetricRegistry registry) { + this.registry = registry; + this.tMax = 60; + this.dMax = 0; + this.rateUnit = TimeUnit.SECONDS; + this.durationUnit = TimeUnit.MILLISECONDS; + this.filter = MetricFilter.ALL; + this.executor = null; + this.shutdownExecutorOnStop = true; + } + + /** + * Specifies whether or not, the executor (used for reporting) will be stopped with same time with reporter. + * Default value is true. + * Setting this parameter to false, has the sense in combining with providing external managed executor via {@link #scheduleOn(ScheduledExecutorService)}. + * + * @param shutdownExecutorOnStop if true, then executor will be stopped in same time with this reporter + * @return {@code this} + */ + public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop) { + this.shutdownExecutorOnStop = shutdownExecutorOnStop; + return this; + } + + /** + * Specifies the executor to use while scheduling reporting of metrics. + * Default value is null. + * Null value leads to executor will be auto created on start. + * + * @param executor the executor to use while scheduling reporting of metrics. + * @return {@code this} + */ + public Builder scheduleOn(ScheduledExecutorService executor) { + this.executor = executor; + return this; + } + + /** + * Use the given {@code tmax} value when announcing metrics. + * + * @param tMax the desired gmond {@code tmax} value + * @return {@code this} + */ + public Builder withTMax(int tMax) { + this.tMax = tMax; + return this; + } + + /** + * Prefix all metric names with the given string. + * + * @param prefix the prefix for all metric names + * @return {@code this} + */ + public Builder prefixedWith(String prefix) { + this.prefix = prefix; + return this; + } + + /** + * Use the given {@code dmax} value when announcing metrics. + * + * @param dMax the desired gmond {@code dmax} value + * @return {@code this} + */ + public Builder withDMax(int dMax) { + this.dMax = dMax; + return this; + } + + /** + * Convert rates to the given time unit. + * + * @param rateUnit a unit of time + * @return {@code this} + */ + public Builder convertRatesTo(TimeUnit rateUnit) { + this.rateUnit = rateUnit; + return this; + } + + /** + * Convert durations to the given time unit. + * + * @param durationUnit a unit of time + * @return {@code this} + */ + public Builder convertDurationsTo(TimeUnit durationUnit) { + this.durationUnit = durationUnit; + return this; + } + + /** + * Only report metrics which match the given filter. + * + * @param filter a {@link MetricFilter} + * @return {@code this} + */ + public Builder filter(MetricFilter filter) { + this.filter = filter; + return this; + } + + /** + * Don't report the passed metric attributes for all metrics (e.g. "p999", "stddev" or "m15"). + * See {@link MetricAttribute}. + * + * @param disabledMetricAttributes a {@link MetricFilter} + * @return {@code this} + */ + public Builder disabledMetricAttributes(Set disabledMetricAttributes) { + this.disabledMetricAttributes = disabledMetricAttributes; + return this; + } + + /** + * Builds a {@link GangliaReporter} with the given properties, announcing metrics to the + * given {@link GMetric} client. + * + * @param gmetric the client to use for announcing metrics + * @return a {@link GangliaReporter} + */ + public GangliaReporter build(GMetric gmetric) { + return new GangliaReporter(registry, gmetric, null, prefix, tMax, dMax, rateUnit, durationUnit, filter, + executor, shutdownExecutorOnStop, disabledMetricAttributes); + } + + /** + * Builds a {@link GangliaReporter} with the given properties, announcing metrics to the + * given {@link GMetric} client. + * + * @param gmetrics the clients to use for announcing metrics + * @return a {@link GangliaReporter} + */ + public GangliaReporter build(GMetric... gmetrics) { + return new GangliaReporter(registry, null, gmetrics, prefix, tMax, dMax, rateUnit, durationUnit, + filter, executor, shutdownExecutorOnStop , disabledMetricAttributes); + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(GangliaReporter.class); + + private final GMetric gmetric; + private final GMetric[] gmetrics; + private final String prefix; + private final int tMax; + private final int dMax; + + private GangliaReporter(MetricRegistry registry, + GMetric gmetric, + GMetric[] gmetrics, + String prefix, + int tMax, + int dMax, + TimeUnit rateUnit, + TimeUnit durationUnit, + MetricFilter filter, + ScheduledExecutorService executor, + boolean shutdownExecutorOnStop, + Set disabledMetricAttributes) { + super(registry, "ganglia-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, + disabledMetricAttributes); + this.gmetric = gmetric; + this.gmetrics = gmetrics; + this.prefix = prefix; + this.tMax = tMax; + this.dMax = dMax; + } + + @Override + public void report(SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + for (Map.Entry entry : gauges.entrySet()) { + reportGauge(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : counters.entrySet()) { + reportCounter(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : histograms.entrySet()) { + reportHistogram(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : meters.entrySet()) { + reportMeter(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : timers.entrySet()) { + reportTimer(entry.getKey(), entry.getValue()); + } + } + + private void reportTimer(String name, Timer timer) { + final String sanitizedName = escapeSlashes(name); + final String group = group(name); + try { + final Snapshot snapshot = timer.getSnapshot(); + + announceIfEnabled(MAX, sanitizedName, group, convertDuration(snapshot.getMax()), getDurationUnit()); + announceIfEnabled(MEAN, sanitizedName, group, convertDuration(snapshot.getMean()), getDurationUnit()); + announceIfEnabled(MIN, sanitizedName, group, convertDuration(snapshot.getMin()), getDurationUnit()); + announceIfEnabled(STDDEV, sanitizedName, group, convertDuration(snapshot.getStdDev()), getDurationUnit()); + + announceIfEnabled(P50, sanitizedName, group, convertDuration(snapshot.getMedian()), getDurationUnit()); + announceIfEnabled(P75, sanitizedName, + group, + convertDuration(snapshot.get75thPercentile()), + getDurationUnit()); + announceIfEnabled(P95, sanitizedName, + group, + convertDuration(snapshot.get95thPercentile()), + getDurationUnit()); + announceIfEnabled(P98, sanitizedName, + group, + convertDuration(snapshot.get98thPercentile()), + getDurationUnit()); + announceIfEnabled(P99, sanitizedName, + group, + convertDuration(snapshot.get99thPercentile()), + getDurationUnit()); + announceIfEnabled(P999, sanitizedName, + group, + convertDuration(snapshot.get999thPercentile()), + getDurationUnit()); + + reportMetered(sanitizedName, timer, group, "calls"); + } catch (GangliaException e) { + LOGGER.warn("Unable to report timer {}", sanitizedName, e); + } + } + + private void reportMeter(String name, Meter meter) { + final String sanitizedName = escapeSlashes(name); + final String group = group(name); + try { + reportMetered(sanitizedName, meter, group, "events"); + } catch (GangliaException e) { + LOGGER.warn("Unable to report meter {}", name, e); + } + } + + private void reportMetered(String name, Metered meter, String group, String eventName) throws GangliaException { + final String unit = eventName + '/' + getRateUnit(); + announceIfEnabled(COUNT, name, group, meter.getCount(), eventName); + announceIfEnabled(M1_RATE, name, group, convertRate(meter.getOneMinuteRate()), unit); + announceIfEnabled(M5_RATE, name, group, convertRate(meter.getFiveMinuteRate()), unit); + announceIfEnabled(M15_RATE, name, group, convertRate(meter.getFifteenMinuteRate()), unit); + announceIfEnabled(MEAN_RATE, name, group, convertRate(meter.getMeanRate()), unit); + } + + private void reportHistogram(String name, Histogram histogram) { + final String sanitizedName = escapeSlashes(name); + final String group = group(name); + try { + final Snapshot snapshot = histogram.getSnapshot(); + + announceIfEnabled(COUNT, sanitizedName, group, histogram.getCount(), ""); + announceIfEnabled(MAX, sanitizedName, group, snapshot.getMax(), ""); + announceIfEnabled(MEAN, sanitizedName, group, snapshot.getMean(), ""); + announceIfEnabled(MIN, sanitizedName, group, snapshot.getMin(), ""); + announceIfEnabled(STDDEV, sanitizedName, group, snapshot.getStdDev(), ""); + announceIfEnabled(P50, sanitizedName, group, snapshot.getMedian(), ""); + announceIfEnabled(P75, sanitizedName, group, snapshot.get75thPercentile(), ""); + announceIfEnabled(P95, sanitizedName, group, snapshot.get95thPercentile(), ""); + announceIfEnabled(P98, sanitizedName, group, snapshot.get98thPercentile(), ""); + announceIfEnabled(P99, sanitizedName, group, snapshot.get99thPercentile(), ""); + announceIfEnabled(P999, sanitizedName, group, snapshot.get999thPercentile(), ""); + } catch (GangliaException e) { + LOGGER.warn("Unable to report histogram {}", sanitizedName, e); + } + } + + private void reportCounter(String name, Counter counter) { + final String sanitizedName = escapeSlashes(name); + final String group = group(name); + try { + announce(prefix(sanitizedName, COUNT.getCode()), group, Long.toString(counter.getCount()), GMetricType.DOUBLE, ""); + } catch (GangliaException e) { + LOGGER.warn("Unable to report counter {}", name, e); + } + } + + private void reportGauge(String name, Gauge gauge) { + final String sanitizedName = escapeSlashes(name); + final String group = group(name); + final Object obj = gauge.getValue(); + final String value = String.valueOf(obj); + final GMetricType type = detectType(obj); + try { + announce(name(prefix, sanitizedName), group, value, type, ""); + } catch (GangliaException e) { + LOGGER.warn("Unable to report gauge {}", name, e); + } + } + + private static final double MIN_VAL = 1E-300; + + private void announceIfEnabled(MetricAttribute metricAttribute, String metricName, String group, double value, String units) + throws GangliaException { + if (getDisabledMetricAttributes().contains(metricAttribute)) { + return; + } + final String string = Math.abs(value) < MIN_VAL ? "0" : Double.toString(value); + announce(prefix(metricName, metricAttribute.getCode()), group, string, GMetricType.DOUBLE, units); + } + + private void announceIfEnabled(MetricAttribute metricAttribute, String metricName, String group, long value, String units) + throws GangliaException { + if (getDisabledMetricAttributes().contains(metricAttribute)) { + return; + } + announce(prefix(metricName, metricAttribute.getCode()), group, Long.toString(value), GMetricType.DOUBLE, units); + } + + private void announce(String name, String group, String value, GMetricType type, String units) + throws GangliaException { + if (gmetric != null) { + gmetric.announce(name, value, type, units, GMetricSlope.BOTH, tMax, dMax, group); + } else { + for (GMetric gmetric : gmetrics) { + gmetric.announce(name, value, type, units, GMetricSlope.BOTH, tMax, dMax, group); + } + } + } + + private GMetricType detectType(Object o) { + if (o instanceof Float) { + return GMetricType.FLOAT; + } else if (o instanceof Double) { + return GMetricType.DOUBLE; + } else if (o instanceof Byte) { + return GMetricType.INT8; + } else if (o instanceof Short) { + return GMetricType.INT16; + } else if (o instanceof Integer) { + return GMetricType.INT32; + } else if (o instanceof Long) { + return GMetricType.DOUBLE; + } + return GMetricType.STRING; + } + + private String group(String name) { + final int i = name.lastIndexOf('.'); + if (i < 0) { + return ""; + } + return name.substring(0, i); + } + + private String prefix(String name, String n) { + return name(prefix, name, n); + } + + // ganglia metric names can't contain slashes. + private String escapeSlashes(String name) { + return SLASHES.matcher(name).replaceAll("_"); + } +} diff --git a/pom.xml b/pom.xml index b5c34790f1d3..8c4ec71f3685 100644 --- a/pom.xml +++ b/pom.xml @@ -148,7 +148,7 @@ 0.9.3 2.4.0 2.0.8 - 3.2.6 + 4.1.1 1.8.2 hadoop2 1.8.10 @@ -681,12 +681,12 @@ io.dropwizard.metrics - metrics-ganglia + metrics-graphite ${codahale.metrics.version} io.dropwizard.metrics - metrics-graphite + metrics-jmx ${codahale.metrics.version}