Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Add tagged metric support in MetricsCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
nwangtw committed Dec 19, 2021
1 parent d03c112 commit c816a7f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
10 changes: 10 additions & 0 deletions heron/api/src/java/org/apache/heron/api/metric/IMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@

package org.apache.heron.api.metric;

import java.util.Map;

/**
* Interface for a metric that can be tracked
* @param <T> the type of the metric value being tracked
*/
public interface IMetric<T> {
T getValueAndReset();

/**
* Get the tag:value pairs of the metric and reset it to the identity value.
* @return the tag:value pairs of the metric. Return null if not supported.
*/
default Map<String, T> getTaggedMetricsAndReset() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public void forceGatherAllMetrics() {

for (List<String> metricNames : timeBucketToMetricNames.values()) {
for (String metricName : metricNames) {
gatherOneMetric(metricName, builder);
gatherOneMetric(builder, metricName);
}
}

metricCollectionCount.incr();
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount);
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, null, metricCollectionCount);

Metrics.MetricPublisherPublishMessage msg = builder.build();

Expand All @@ -124,6 +124,7 @@ public void forceGatherAllMetrics() {

private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName,
String tag,
Object metricValue) {
// Metric name is discarded if value is of type MetricsDatum or ExceptionData.
if (metricValue instanceof Metrics.MetricDatum.Builder) {
Expand All @@ -134,6 +135,9 @@ private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Buil
assert metricName != null;
Metrics.MetricDatum.Builder d = Metrics.MetricDatum.newBuilder();
d.setName(metricName).setValue(metricValue.toString());
if (tag != null) {
d.setTag(tag);
}
builder.addMetrics(d);
}
}
Expand All @@ -149,11 +153,11 @@ private void gatherMetrics(final int timeBucketSizeInSecs) {
Metrics.MetricPublisherPublishMessage.Builder builder =
Metrics.MetricPublisherPublishMessage.newBuilder();
for (String metricName : timeBucketToMetricNames.get(timeBucketSizeInSecs)) {
gatherOneMetric(metricName, builder);
gatherOneMetric(builder, metricName);
}

metricCollectionCount.incr();
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME,
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, null,
metricCollectionCount.getValueAndReset());

Metrics.MetricPublisherPublishMessage msg = builder.build();
Expand All @@ -171,13 +175,36 @@ public void run() {
}
}

// Gather the value of given metricName, convert it into protobuf,
// Gather the value of given metricName, convert it into protobuf,
// and add it to MetricPublisherPublishMessage builder given.
@SuppressWarnings("unchecked")
private void gatherOneMetric(
Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName) {
IMetric metric = metrics.get(metricName);

Map<String, IMetric> taggedMetrics = metric.getTaggedMetricsAndReset();
if (taggedMetrics != null) {
// If taggedMetrics is not null, it means the metric is tagged, and
// the tags should be reported to MetricPublisher. No need to report
// the non-tagged value of the metric in this case.
for (Map.Entry<String, IMetric> entry : taggedMetrics.entrySet()) {
gatherOneMetricValue(builder, metricName, entry.getKey().toString(), entry.getValue().getValueAndReset());
}
} else {
// Regular metric without tag support.
Object metricValue = metric.getValueAndReset();
gatherOneMetricValue(builder, metricName, null, metricValue);
}
}

@SuppressWarnings("unchecked")
private void gatherOneMetricValue(
Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName,
Metrics.MetricPublisherPublishMessage.Builder builder) {
Object metricValue = metrics.get(metricName).getValueAndReset();
String tag,
Object metricValue) {

// Decide how to handle the metric based on type
if (metricValue == null) {
return;
Expand All @@ -186,16 +213,16 @@ private void gatherOneMetric(
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) metricValue).entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
addDataToMetricPublisher(
builder, metricName + "/" + entry.getKey().toString(), entry.getValue());
builder, metricName + "/" + entry.getKey().toString(), tag, entry.getValue());
}
}
} else if (metricValue instanceof Collection) {
int index = 0;
for (Object value : (Collection) metricValue) {
addDataToMetricPublisher(builder, metricName + "/" + (index++), value);
addDataToMetricPublisher(builder, metricName + "/" + (index++), tag, value);
}
} else {
addDataToMetricPublisher(builder, metricName, metricValue);
addDataToMetricPublisher(builder, metricName, tag, metricValue);
}
}
}
1 change: 1 addition & 0 deletions heron/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import "tmaster.proto";
message MetricDatum {
required string name = 1;
required string value = 2;
optional string tag = 3;
}

message ExceptionData {
Expand Down

0 comments on commit c816a7f

Please sign in to comment.