Skip to content
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.samza.util.TimestampedValue;


/**
* A {@link ListGauge} is a {@link org.apache.samza.metrics.Metric} that buffers multiple instances of a type T in a list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Do you need the fully qualified name "{@link org.apache.samza.metrics.Metric}" here? it seems like you should be able to use {@link Metric}

* {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
* For example, a set of specific logging-events (e.g., errors).
*
* Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
* Eviction happens during element addition or during reads of the ListGauge (getValues).
*
* All public methods are thread-safe.
*
*/
public class ListGauge<T> implements Metric {
private final String name;
private final Queue<TimestampedValue<T>> elements;

private final int maxNumberOfItems;
private final Duration maxStaleness;
private final static int DEFAULT_MAX_NITEMS = 1000;
private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);

/**
* Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
*
* @param name Name to be assigned
* @param maxNumberOfItems The max number of items that can remain in the listgauge
* @param maxStaleness The max staleness of items permitted in the listgauge
*/
public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) {
this.name = name;
this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
this.maxNumberOfItems = maxNumberOfItems;
this.maxStaleness = maxStaleness;
}

/**
* Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
* @param name Name to be assigned
*/
public ListGauge(String name) {
this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
}

/**
* Get the name assigned to this {@link ListGauge}
* @return the assigned name
*/
public String getName() {
return this.name;
}

/**
* Get the Collection of values currently in the list.
* @return the collection of values
*/
public Collection<T> getValues() {
this.evict();
return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList()));
}

/**
* Add a value to the list.
* (Timestamp assigned to this value is the current timestamp.)
* @param value The Gauge value to be added
*/
public void add(T value) {
this.elements.add(new TimestampedValue<T>(value, Instant.now().toEpochMilli()));

// perform any evictions that may be needed.
this.evict();
}

/**
* {@inheritDoc}
*/
@Override
public void visit(MetricsVisitor visitor) {
visitor.listGauge(this);
}

/**
* Evicts entries from the elements list, based on the given item-size and durationThreshold.
*/
private void evict() {
this.evictBasedOnSize();
this.evictBasedOnTimestamp();
}

/**
* Evicts entries from elements in FIFO order until it has maxNumberOfItems
*/
private void evictBasedOnSize() {
int numToEvict = this.elements.size() - this.maxNumberOfItems;
while (numToEvict > 0) {
this.elements.poll(); // remove head
numToEvict--;
}
}

/**
* Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp.
*/
private void evictBasedOnTimestamp() {
Instant currentTimestamp = Instant.now();
TimestampedValue<T> valueInfo = this.elements.peek();

// continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold
while (valueInfo != null
&& currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) {
this.elements.poll();
valueInfo = this.elements.peek();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public interface MetricsRegistry {
*/
<T> Gauge<T> newGauge(String group, Gauge<T> value);

/**
* Register a {@link org.apache.samza.metrics.ListGauge}
* @param group Group for this ListGauge
* @param listGauge the ListGauge to register
* @param <T> Type of the ListGauge
* @return ListGauge registered
*/
<T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);

/**
* Create and Register a new {@link org.apache.samza.metrics.Timer}
* @param group Group for this Timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ public abstract class MetricsVisitor {

public abstract void timer(Timer timer);

public abstract <T> void listGauge(ListGauge<T> listGauge);

public void visit(Metric metric) {
if (metric instanceof Counter) {
// Cast for metrics of type ListGauge
if (metric instanceof ListGauge<?>) {
listGauge((ListGauge<?>) metric);
} else if (metric instanceof Counter) {
counter((Counter) metric);
} else if (metric instanceof Gauge<?>) {
gauge((Gauge<?>) metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener {

void onGauge(String group, Gauge<?> gauge);

void onListGauge(String group, ListGauge<?> listGauge);

void onTimer(String group, Timer timer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;


/**
* {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
* recorded but a registry is still required.
Expand All @@ -49,6 +51,11 @@ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
return gauge;
}

@Override
public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
return listGauge;
}

@Override
public Timer newTimer(String group, String name) {
return new Timer(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*
*/
package org.apache.samza.operators.impl.store;
package org.apache.samza.util;

/**
* An immutable pair of a value, and its corresponding timestamp.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import org.junit.Assert;
import org.junit.Test;


/**
* Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
*/
public class TestListGauge {

private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);

private <T> ListGauge<T> getListGaugeForTest() {
return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60));
}

@Test
public void basicTest() {
ListGauge<String> listGauge = getListGaugeForTest();
listGauge.add("sampleValue");
Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge");
Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1);
Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true);
}

@Test
public void testSizeEnforcement() {
ListGauge listGauge = getListGaugeForTest();
for (int i = 15; i > 0; i--) {
listGauge.add("v" + i);
}
Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10);

int valueIndex = 10;
Collection<String> currentList = listGauge.getValues();
Iterator iterator = currentList.iterator();
while (iterator.hasNext()) {
String gaugeValue = (String) iterator.next();
Assert.assertTrue(gaugeValue.equals("v" + valueIndex));
valueIndex--;
}
}

@Test
public void testThreadSafety() throws InterruptedException {
ListGauge<Integer> listGauge = getListGaugeForTest();

Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(i);
}
}
});

Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(i);
}
}
});

thread1.start();
thread2.start();

thread1.join(THREAD_TEST_TIMEOUT.toMillis());
thread2.join(THREAD_TEST_TIMEOUT.toMillis());

Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10);
for (Integer gaugeValue : listGauge.getValues()) {
Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

package org.apache.samza.metrics;

import static org.junit.Assert.*;

import java.util.Arrays;

import org.apache.samza.util.Clock;
import org.junit.Test;

import static org.junit.Assert.*;

public class TestTimer {

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

package org.apache.samza.system.eventhub;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.map.HashedMap;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TestMetricsRegistry implements MetricsRegistry {

private Map<String, List<Counter>> counters = new HashedMap<>();
private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
private Map<String, List<ListGauge>> listGauges = new HashedMap<>();

public List<Counter> getCounters(String groupName) {
return counters.get(groupName);
Expand Down Expand Up @@ -68,6 +69,14 @@ public Counter newCounter(String group, Counter counter) {
return null;
}

@Override
public ListGauge newListGauge(String group, ListGauge listGauge) {
listGauges.putIfAbsent(group, new ArrayList());
ListGauge value = new ListGauge(group);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/value/listGauge

listGauges.get(group).add(value);
return value;
}

@Override
public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
if (!gauges.containsKey(group)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public Counter newCounter(String name) {
return registry.newCounter(groupName, (prefix + name).toLowerCase());
}

public <T> ListGauge newListGauge(String name) {
return registry.newListGauge(groupName, new ListGauge(name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you do a ListGauge<T> for explicit type inference here since the method is parameterized by T already?

}

public <T> Gauge<T> newGauge(String name, T value) {
return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.samza.operators.functions;

import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.util.TimestampedValue;
import org.apache.samza.storage.kv.KeyValueStore;

/**
Expand Down
Loading