Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Add tagged metric support in MetricsCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
nwangtw committed Feb 22, 2022
1 parent abb2767 commit 1b2bf3a
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.api.metric;

import java.util.List;
import java.util.Map;

// A thread safe count metric
public class ConcurrentCountMetricWithTag implements IMetric<CountMetric> {
private CountMetricWithTag counter;

public ConcurrentCountMetricWithTag() {
counter = new CountMetricWithTag();
}

public synchronized void incr(String... tags) {
counter.incr(tags);
}

public synchronized void incrBy(long incrementBy, String... tags) {
counter.incrBy(incrementBy, tags);
}

@Override
public CountMetric getValueAndReset() {
return null; // Not needed. `getTaggedMetricsAndReset` should be used instead.
}

@Override
public Map<List<String>, CountMetric> getTaggedMetricsAndReset() {
return counter.getTaggedMetricsAndReset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.api.metric;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CountMetricWithTag implements IMetric<CountMetric> {
/**
* List<String> is the list of tags. The counters are aggregated by tag lists.
* For example, ["device:ios", "endpoint:v2"] is a entirely different entry from ["device:ios"]
*/
private Map<List<String>, CountMetric> taggedCounts = new HashMap<>();

public CountMetricWithTag() {
}

public void incr(String... tags) {
incrBy(1, tags);
}

/**
* Increment the metrics with optional tags.
* Tags are comma separated strings: "tagName:tagValue".
* For example: "device:ios", and "endpoint:v2".
* Normally tag names and values should have limited values,
* otherwise, like "id:192.168.0.123", the memory utililization
* and the cost of tracking metrics could increase dramatically.
* @param tags optional comma separated tags, like "device:ios", "endpoint:v2".
* Normally tag names and values should have limited values,
*/
public void incrBy(long incrementBy, String... tags) {
if (tags.length > 0) {
Arrays.sort(tags); // Sort the tags to make the key deterministic.
List<String> tagList = Arrays.asList(tags);
incrTaggedCountBy(tagList, incrementBy);
} else {
incrTaggedCountBy(Collections.emptyList(), incrementBy);
}
}

private void incrTaggedCountBy(List<String> tagList, long incrementBy) {
if (!taggedCounts.containsKey(tagList)) {
taggedCounts.put(tagList, new CountMetric());
}
taggedCounts.get(tagList).incrBy(incrementBy);
}

@Override
public CountMetric getValueAndReset() {
return null; // Not needed. `getTaggedMetricsAndReset` should be used instead.
}

@Override
public Map<List<String>, CountMetric> getTaggedMetricsAndReset() {
Map<List<String>, CountMetric> ret = taggedCounts;
taggedCounts = new HashMap<>();
return ret;
}
}
11 changes: 11 additions & 0 deletions heron/api/src/java/org/apache/heron/api/metric/IMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@

package org.apache.heron.api.metric;

import java.util.List;
import java.util.Map;

/**
* Interface for a metric that can be tracked
* @param <T> the type of the metric value being tracked
*/
public interface IMetric<T> {
T getValueAndReset();

/**
* Get the <tag list, value> pairs of the metric and reset it to the identity value.
* @return a map of <tag list, value> pairs. Return null if this function is not supported.
*/
default Map<List<String>, T> getTaggedMetricsAndReset() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public void forceGatherAllMetrics() {

for (List<String> metricNames : timeBucketToMetricNames.values()) {
for (String metricName : metricNames) {
gatherOneMetric(metricName, builder);
gatherOneMetric(builder, metricName);
}
}

metricCollectionCount.incr();
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount);
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount, null);

Metrics.MetricPublisherPublishMessage msg = builder.build();

Expand All @@ -124,7 +124,8 @@ public void forceGatherAllMetrics() {

private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName,
Object metricValue) {
Object metricValue,
List<String> tagList) {
// Metric name is discarded if value is of type MetricsDatum or ExceptionData.
if (metricValue instanceof Metrics.MetricDatum.Builder) {
builder.addMetrics((Metrics.MetricDatum.Builder) metricValue);
Expand All @@ -134,6 +135,11 @@ private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Buil
assert metricName != null;
Metrics.MetricDatum.Builder d = Metrics.MetricDatum.newBuilder();
d.setName(metricName).setValue(metricValue.toString());
if (tagList != null) {
for (String tag : tagList) {
d.addTag(tag);
}
}
builder.addMetrics(d);
}
}
Expand All @@ -149,12 +155,11 @@ private void gatherMetrics(final int timeBucketSizeInSecs) {
Metrics.MetricPublisherPublishMessage.Builder builder =
Metrics.MetricPublisherPublishMessage.newBuilder();
for (String metricName : timeBucketToMetricNames.get(timeBucketSizeInSecs)) {
gatherOneMetric(metricName, builder);
gatherOneMetric(builder, metricName);
}

metricCollectionCount.incr();
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME,
metricCollectionCount.getValueAndReset());
addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount.getValueAndReset(), null);

Metrics.MetricPublisherPublishMessage msg = builder.build();

Expand All @@ -171,13 +176,36 @@ public void run() {
}
}

// Gather the value of given metricName, convert it into protobuf,
// Gather the value of given metricName, convert it into protobuf,
// and add it to MetricPublisherPublishMessage builder given.
@SuppressWarnings("unchecked")
private void gatherOneMetric(
Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName) {
IMetric metric = metrics.get(metricName);

Map<List<String>, IMetric> taggedMetrics = metric.getTaggedMetricsAndReset();
if (taggedMetrics != null) {
// If taggedMetrics is not null, it means the metric is tagged, and
// the tags should be reported to MetricPublisher. No need to report
// the non-tagged value of the metric in this case.
for (Map.Entry<List<String>, IMetric> entry : taggedMetrics.entrySet()) {
gatherOneMetricValue(builder, metricName, entry.getValue().getValueAndReset(), entry.getKey());
}
} else {
// Regular metric without tag support.
Object metricValue = metric.getValueAndReset();
gatherOneMetricValue(builder, metricName, metricValue, null);
}
}

@SuppressWarnings("unchecked")
private void gatherOneMetricValue(
Metrics.MetricPublisherPublishMessage.Builder builder,
String metricName,
Metrics.MetricPublisherPublishMessage.Builder builder) {
Object metricValue = metrics.get(metricName).getValueAndReset();
Object metricValue,
List<String> tagList) {

// Decide how to handle the metric based on type
if (metricValue == null) {
return;
Expand All @@ -186,16 +214,16 @@ private void gatherOneMetric(
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) metricValue).entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
addDataToMetricPublisher(
builder, metricName + "/" + entry.getKey().toString(), entry.getValue());
builder, metricName + "/" + entry.getKey().toString(), entry.getValue(), tagList);
}
}
} else if (metricValue instanceof Collection) {
int index = 0;
for (Object value : (Collection) metricValue) {
addDataToMetricPublisher(builder, metricName + "/" + (index++), value);
addDataToMetricPublisher(builder, metricName + "/" + (index++), value, tagList);
}
} else {
addDataToMetricPublisher(builder, metricName, metricValue);
addDataToMetricPublisher(builder, metricName, metricValue, tagList);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ private void handlePublisherPublishMessage(Metrics.MetricPublisher request,

List<MetricsInfo> metricsInfos = new ArrayList<MetricsInfo>(message.getMetricsCount());
for (Metrics.MetricDatum metricDatum : message.getMetricsList()) {
MetricsInfo info = new MetricsInfo(metricDatum.getName(), metricDatum.getValue());
// TODO: support tags. metricDatum.getTags()
MetricsInfo info = new MetricsInfo(metricDatum.getName(), metricDatum.getValue(), metricDatum.getTagList());
metricsInfos.add(info);
}

Expand Down
1 change: 1 addition & 0 deletions heron/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import "tmanager.proto";
message MetricDatum {
required string name = 1;
required string value = 2;
repeated string tag = 3;
}

message ExceptionData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@

package org.apache.heron.spi.metricsmgr.metrics;

import java.util.List;

/**
* An immutable class providing a view of MetricsInfo
* The value is in type String, and IMetricsSink would determine how to parse it.
*/
public class MetricsInfo {
private final String name;
private final String value;
private final List<String> tags;

public MetricsInfo(String name, String value) {
this(name, value, null);
}

public MetricsInfo(String name, String value, List<String> tags) {
this.name = name;
this.value = value;
this.tags = tags;
}

/**
Expand All @@ -50,8 +58,21 @@ public String getValue() {
return value;
}

/**
* Get the tags of the metric
*
* @return the tags of the metric
*/
public List<String> getTags() {
return tags;
}

@Override
public String toString() {
return String.format("%s = %s", getName(), getValue());
if (tags == null) {
return name + "=" + value;
} else {
return name + "=" + value + " tags=" + tags;
}
}
}

0 comments on commit 1b2bf3a

Please sign in to comment.