From c816a7fdefb82a611253188015e39c2a0ed44339 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Sun, 19 Dec 2021 02:14:43 -0800 Subject: [PATCH] Add tagged metric support in MetricsCollector --- .../org/apache/heron/api/metric/IMetric.java | 10 ++++ .../utils/metrics/MetricsCollector.java | 47 +++++++++++++++---- heron/proto/metrics.proto | 1 + 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/api/metric/IMetric.java b/heron/api/src/java/org/apache/heron/api/metric/IMetric.java index ae26a38f49d..716942ffbc3 100644 --- a/heron/api/src/java/org/apache/heron/api/metric/IMetric.java +++ b/heron/api/src/java/org/apache/heron/api/metric/IMetric.java @@ -19,10 +19,20 @@ package org.apache.heron.api.metric; +import java.util.Map; + /** * Interface for a metric that can be tracked * @param the type of the metric value being tracked */ public interface IMetric { T getValueAndReset(); + + /** + * Get the tag:value pairs of the metric and reset it to the identity value. + * @return the tag:value pairs of the metric. Return null if not supported. + */ + default Map getTaggedMetricsAndReset() { + return null; + } } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java index f0b1166348e..1c259ef41cf 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java @@ -110,12 +110,12 @@ public void forceGatherAllMetrics() { for (List metricNames : timeBucketToMetricNames.values()) { for (String metricName : metricNames) { - gatherOneMetric(metricName, builder); + gatherOneMetric(builder, metricName); } } metricCollectionCount.incr(); - addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount); + addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, null, metricCollectionCount); Metrics.MetricPublisherPublishMessage msg = builder.build(); @@ -124,6 +124,7 @@ public void forceGatherAllMetrics() { private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Builder builder, String metricName, + String tag, Object metricValue) { // Metric name is discarded if value is of type MetricsDatum or ExceptionData. if (metricValue instanceof Metrics.MetricDatum.Builder) { @@ -134,6 +135,9 @@ private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Buil assert metricName != null; Metrics.MetricDatum.Builder d = Metrics.MetricDatum.newBuilder(); d.setName(metricName).setValue(metricValue.toString()); + if (tag != null) { + d.setTag(tag); + } builder.addMetrics(d); } } @@ -149,11 +153,11 @@ private void gatherMetrics(final int timeBucketSizeInSecs) { Metrics.MetricPublisherPublishMessage.Builder builder = Metrics.MetricPublisherPublishMessage.newBuilder(); for (String metricName : timeBucketToMetricNames.get(timeBucketSizeInSecs)) { - gatherOneMetric(metricName, builder); + gatherOneMetric(builder, metricName); } metricCollectionCount.incr(); - addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, + addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, null, metricCollectionCount.getValueAndReset()); Metrics.MetricPublisherPublishMessage msg = builder.build(); @@ -171,13 +175,36 @@ public void run() { } } - // Gather the value of given metricName, convert it into protobuf, + // Gather the value of given metricName, convert it into protobuf, // and add it to MetricPublisherPublishMessage builder given. @SuppressWarnings("unchecked") private void gatherOneMetric( + Metrics.MetricPublisherPublishMessage.Builder builder, + String metricName) { + IMetric metric = metrics.get(metricName); + + Map taggedMetrics = metric.getTaggedMetricsAndReset(); + if (taggedMetrics != null) { + // If taggedMetrics is not null, it means the metric is tagged, and + // the tags should be reported to MetricPublisher. No need to report + // the non-tagged value of the metric in this case. + for (Map.Entry entry : taggedMetrics.entrySet()) { + gatherOneMetricValue(builder, metricName, entry.getKey().toString(), entry.getValue().getValueAndReset()); + } + } else { + // Regular metric without tag support. + Object metricValue = metric.getValueAndReset(); + gatherOneMetricValue(builder, metricName, null, metricValue); + } + } + + @SuppressWarnings("unchecked") + private void gatherOneMetricValue( + Metrics.MetricPublisherPublishMessage.Builder builder, String metricName, - Metrics.MetricPublisherPublishMessage.Builder builder) { - Object metricValue = metrics.get(metricName).getValueAndReset(); + String tag, + Object metricValue) { + // Decide how to handle the metric based on type if (metricValue == null) { return; @@ -186,16 +213,16 @@ private void gatherOneMetric( for (Map.Entry entry : ((Map) metricValue).entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { addDataToMetricPublisher( - builder, metricName + "/" + entry.getKey().toString(), entry.getValue()); + builder, metricName + "/" + entry.getKey().toString(), tag, entry.getValue()); } } } else if (metricValue instanceof Collection) { int index = 0; for (Object value : (Collection) metricValue) { - addDataToMetricPublisher(builder, metricName + "/" + (index++), value); + addDataToMetricPublisher(builder, metricName + "/" + (index++), tag, value); } } else { - addDataToMetricPublisher(builder, metricName, metricValue); + addDataToMetricPublisher(builder, metricName, tag, metricValue); } } } diff --git a/heron/proto/metrics.proto b/heron/proto/metrics.proto index d439c24688f..1d7a8047947 100644 --- a/heron/proto/metrics.proto +++ b/heron/proto/metrics.proto @@ -34,6 +34,7 @@ import "tmaster.proto"; message MetricDatum { required string name = 1; required string value = 2; + optional string tag = 3; } message ExceptionData {