Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;


/**
* A {@link ListGauge} is a {@link org.apache.samza.metrics.Metric} that buffers multiple instances of a type T in a list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Do you need the fully qualified name "{@link org.apache.samza.metrics.Metric}" here? it seems like you should be able to use {@link Metric}

* {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
* For example, a set of specific logging-events (e.g., errors).
*
* Eviction from list is either done by consuming-code using the remove APIs or by specifying an eviction policy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nuke L:36 and update it so that it's inline with the current implementation

* at creation time.
*
* All public methods are thread-safe.
*
*/
public class ListGauge<T> implements Metric {
private final String name;
private final List<T> metricList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/metricList/elements

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private ListGaugeEvictionPolicy<T> listGaugeEvictionPolicy;

private final static int DEFAULT_POLICY_NUM_RETAIN = 60;

/**
* Create a new {@link ListGauge} with no auto eviction, callers can add/remove items as desired.
* @param name Name to be assigned
*/
public ListGauge(String name) {
this.name = name;
this.metricList = new ArrayList<T>(DEFAULT_POLICY_NUM_RETAIN);
this.listGaugeEvictionPolicy = new RetainLastNPolicy<T>(this, DEFAULT_POLICY_NUM_RETAIN);
}

/**
* Get the name assigned to this {@link ListGauge}
* @return the assigned name
*/
public String getName() {
return this.name;
}

/**
* Get the Collection of Gauge values currently in the list, used when serializing this Gauge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Returns an immutable copy of the values in this {@link ListGauge}
  • don't need the comment on "Also evicts values based on the configured maxItems and maxStaleness."

* @return the collection of gauge values
*/
public synchronized Collection<T> getValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getValue/getValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return Collections.unmodifiableList(this.metricList);
}

/**
* Package-private method to change the eviction policy
* @param listGaugeEvictionPolicy
*/
synchronized void setEvictionPolicy(ListGaugeEvictionPolicy<T> listGaugeEvictionPolicy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to set this using a constructor instead of a setter? Then you don't need to have a synchronized method for setting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that the policy needed a handle on the listgauge, so it couldnt be created and set during the listgauge instantiation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can the callback instead accept the ListGauge? It would be nice to avoid this mutability and a circular dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified the impl by removing configurability since there is no use case for it yet.

this.listGaugeEvictionPolicy = listGaugeEvictionPolicy;
}

/**
* Add a gauge to the list
* @param value The Gauge value to be added
*/
public synchronized void add(T value) {
this.metricList.add(value);

// notify the policy object (if one is present), for performing any eviction that may be needed.
// note: monitor is being held
if (this.listGaugeEvictionPolicy != null) {
this.listGaugeEvictionPolicy.elementAddedCallback();
}
}

public synchronized boolean remove(T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a use case for being able to remove items from the middle of the list, or other arbitrary eviction? Could this maybe go a simpler route and have a bounded queue with a max size instead of a list with an arbitrary removal/eviction policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a usecase for time-based and size-based eviction, thats why we had to separate out the eviction policy logic, it could be all baked into this class, in which remove(T val) can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both time and size-based eviction, it seems like you wouldn't need arbitrary removal. Wouldn't you just need a removeHead()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer wrapping the eviction logic in custom-evictors and getting rid off the remove method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return this.metricList.remove(value);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized void visit(MetricsVisitor visitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all these instance methods need to be synchronized? Could you use some "concurrent" data structure instead? The main thing I am worried about is this visit method which can do arbitrary logic and possibly hold on to the lock for an uncontrolled period of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, looking into using concurrentlinkedqueue/deque, but the
current concurrency and periodicity scale for this are very low, for example, visit gets invoked once a minute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time that minute comes around though, all other methods on the ListGauge have to wait for visit to complete. For example, if visit makes a remote call somewhere, then all add methods have to wait for that remote call to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified by using a concurrent queue.

visitor.listGauge(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

public interface ListGaugeEvictionPolicy<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to rethink this interface as follows:

public class Evictor {
  public void evict(List<T> elements);
}

Evictors can mutate the list they are passed. In addition to getting rid off the setter, you should be able to support arbitrary eviction policies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline

void elementAddedCallback();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public interface MetricsRegistry {
*/
<T> Gauge<T> newGauge(String group, Gauge<T> value);

/**
* Register a {@link org.apache.samza.metrics.ListGauge}
* @param group Group for this ListGauge
* @param listGauge the ListGauge to register
* @param <T> Type of the ListGauge
* @return ListGauge registered
*/
<T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);

/**
* Create and Register a new {@link org.apache.samza.metrics.Timer}
* @param group Group for this Timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ public abstract class MetricsVisitor {

public abstract void timer(Timer timer);

public abstract <T> void listGauge(ListGauge<T> listGauge);

public void visit(Metric metric) {
if (metric instanceof Counter) {
// Cast for metrics of type ListGauge
if (metric instanceof ListGauge<?>) {
listGauge((ListGauge<?>) metric);
} else if (metric instanceof Counter) {
counter((Counter) metric);
} else if (metric instanceof Gauge<?>) {
gauge((Gauge<?>) metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener {

void onGauge(String group, Gauge<?> gauge);

void onListGauge(String group, ListGauge<?> listGauge);

void onTimer(String group, Timer timer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.util.Collection;
import java.util.Iterator;


public class RetainLastNPolicy<T> implements ListGaugeEvictionPolicy<T> {

private final ListGauge<T> listGauge;
private final int nItems;

public RetainLastNPolicy(ListGauge<T> listGauge, int numItems) {
this.listGauge = listGauge;
this.nItems = numItems;
}

@Override
public void elementAddedCallback() {
// get a snapshot of the list
Collection<T> listGaugeCollection = this.listGauge.getValue();
int numToEvict = listGaugeCollection.size() - nItems;
Iterator<T> iterator = listGaugeCollection.iterator();
while (numToEvict > 0 && iterator.hasNext()) {
// Remove in FIFO order to retain the last nItems
listGauge.remove(iterator.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It looks like this is an n^2 removal, since you are always removing from the beginning of the list. Is it possible to optimize it?
  2. Can there be problems with iterating over the listGauge values and removing from the listGauge at the same time? It looks like Collections.unmodifiableList doesn't make a copy of the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, an easy way is to bake in the eviction logic and expose only control-knows #items or durationThreshold, and remove policy as configurable, and get rid of remove altogether.

  2. unmodifiableList doesnt give a copy of the elements but it does give a copy of the references to the elements, and listGauge.remove is sync.
    so for most immutable elements (for which this was intended), this could work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Above you mentioned that you would like to keep remove, but here it sounds like you are suggesting to get rid of remove. Could you please clarify?
  2. I wasn't referring to multithreaded concurrency in this case. My concern was that the iterator you are iterating over in the while loop and the listGauge that you are calling remove on seem to be both backed by the same underlying list object. I was wondering if the removal can cause some inconsistency in the iterator. It could be helpful to have a removeHead() here too so you don't need to actually iterate through the elements.

numToEvict--;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;


/**
* {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
* recorded but a registry is still required.
Expand All @@ -49,6 +51,11 @@ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
return gauge;
}

@Override
public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
return listGauge;
}

@Override
public Timer newTimer(String group, String name) {
return new Timer(name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import org.junit.Assert;
import org.junit.Test;


/**
* Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
*/
public class TestListGauge {

private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);

@Test
public void basicTest() {
ListGauge<String> listGauge = new ListGauge<String>("sampleListGauge");
listGauge.add("sampleValue");
Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge");
Assert.assertEquals("List sizes should match", listGauge.getValue().size(), 1);
Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValue().contains("sampleValue"), true);
}

@Test
public void testSizeEnforcement() {
ListGauge listGauge = new ListGauge<String>("listGauge");
listGauge.setEvictionPolicy(new RetainLastNPolicy(listGauge, 10));
for (int i = 15; i > 0; i--) {
listGauge.add("v" + i);
}
Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValue().size(), 10);

int valueIndex = 10;
Collection<String> currentList = listGauge.getValue();
Iterator iterator = currentList.iterator();
while (iterator.hasNext()) {
String gaugeValue = (String) iterator.next();
Assert.assertTrue(gaugeValue.equals("v" + valueIndex));
valueIndex--;
}
}

@Test
public void testThreadSafety() throws InterruptedException {
ListGauge<Integer> listGauge = new ListGauge<Integer>("listGauge");
listGauge.setEvictionPolicy(new RetainLastNPolicy(listGauge, 20));

Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(i);
}
}
});

Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(i);
}
}
});

thread1.start();
thread2.start();

thread1.join(THREAD_TEST_TIMEOUT.toMillis());
thread2.join(THREAD_TEST_TIMEOUT.toMillis());

Assert.assertTrue("ListGauge should have the last 20 values", listGauge.getValue().size() == 20);
for (Integer gaugeValue : listGauge.getValue()) {
Assert.assertTrue("Values should have the last 20 range", gaugeValue <= 100 && gaugeValue > 80);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testTimerWithDifferentWindowSize() {
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
assertEquals(3, snapshot.getValues().size());

// The time is 500 for update(4L) because getSnapshot calls clock once + 3
// The time is 500 for update(4L) because getValue calls clock once + 3
// updates that call clock 3 times
timer.update(4L);
Snapshot snapshot2 = timer.getSnapshot();
Expand Down
Loading