Skip to content

Conversation

@rayman7718
Copy link
Contributor

@rayman7718 rayman7718 commented May 31, 2018

This PR introduces a ListGauge type,
A subsequent PR: #543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter.

rmatharu added 5 commits May 31, 2018 15:44
…pshot reporter.

Switched producer-shutdown logic to ensure metric-flush at shutdown-time
Added exception metric (as string-guage), to be emitted using the snapshot reporter.
Switched producer-shutdown logic to ensure metric-flush at shutdown-time

Adding ListGauge and integration with SamzaContainerMetric

Typifying ListGauge to ListGauge<T>, adding an eviction policy, default policy to retain last N
Copy link
Contributor

@cameronlee314 cameronlee314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some context to the description about how you plan to use this? One of the javadocs gives an example about errors, but are there other use cases and how do you plan to expose this data?

}
}

public synchronized boolean remove(T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a use case for being able to remove items from the middle of the list, or other arbitrary eviction? Could this maybe go a simpler route and have a bounded queue with a max size instead of a list with an arbitrary removal/eviction policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a usecase for time-based and size-based eviction, thats why we had to separate out the eviction policy logic, it could be all baked into this class, in which remove(T val) can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both time and size-based eviction, it seems like you wouldn't need arbitrary removal. Wouldn't you just need a removeHead()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer wrapping the eviction logic in custom-evictors and getting rid off the remove method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Package-private method to change the eviction policy
* @param listGaugeEvictionPolicy
*/
synchronized void setEvictionPolicy(ListGaugeEvictionPolicy<T> listGaugeEvictionPolicy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to set this using a constructor instead of a setter? Then you don't need to have a synchronized method for setting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that the policy needed a handle on the listgauge, so it couldnt be created and set during the listgauge instantiation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can the callback instead accept the ListGauge? It would be nice to avoid this mutability and a circular dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified the impl by removing configurability since there is no use case for it yet.

* {@inheritDoc}
*/
@Override
public synchronized void visit(MetricsVisitor visitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all these instance methods need to be synchronized? Could you use some "concurrent" data structure instead? The main thing I am worried about is this visit method which can do arbitrary logic and possibly hold on to the lock for an uncontrolled period of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, looking into using concurrentlinkedqueue/deque, but the
current concurrency and periodicity scale for this are very low, for example, visit gets invoked once a minute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time that minute comes around though, all other methods on the ListGauge have to wait for visit to complete. For example, if visit makes a remote call somewhere, then all add methods have to wait for that remote call to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified by using a concurrent queue.

Iterator<T> iterator = listGaugeCollection.iterator();
while (numToEvict > 0 && iterator.hasNext()) {
// Remove in FIFO order to retain the last nItems
listGauge.remove(iterator.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It looks like this is an n^2 removal, since you are always removing from the beginning of the list. Is it possible to optimize it?
  2. Can there be problems with iterating over the listGauge values and removing from the listGauge at the same time? It looks like Collections.unmodifiableList doesn't make a copy of the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes, an easy way is to bake in the eviction logic and expose only control-knows #items or durationThreshold, and remove policy as configurable, and get rid of remove altogether.

  2. unmodifiableList doesnt give a copy of the elements but it does give a copy of the references to the elements, and listGauge.remove is sync.
    so for most immutable elements (for which this was intended), this could work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Above you mentioned that you would like to keep remove, but here it sounds like you are suggesting to get rid of remove. Could you please clarify?
  2. I wasn't referring to multithreaded concurrency in this case. My concern was that the iterator you are iterating over in the while loop and the listGauge that you are calling remove on seem to be both backed by the same underlying list object. I was wondering if the removal can cause some inconsistency in the iterator. It could be helpful to have a removeHead() here too so you don't need to actually iterate through the elements.

registry.getGroup(group).asScala.foreach {
case (name, metric) =>
metric.visit(new MetricsVisitor {
// for listGauge the value is returned as a list, which gets serialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might need to update AmfReporter.java too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.
The plan was also to use AmfReporter's filter to filter out the "exception" metric, because amf cant handle exceptions/string types

*/
public class ListGauge<T> implements Metric {
private final String name;
private final List<T> metricList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/metricList/elements

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

public synchronized boolean remove(T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer wrapping the eviction logic in custom-evictors and getting rid off the remove method

* Get the Collection of Gauge values currently in the list, used when serializing this Gauge.
* @return the collection of gauge values
*/
public synchronized Collection<T> getValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getValue/getValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*/
package org.apache.samza.metrics;

public interface ListGaugeEvictionPolicy<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to rethink this interface as follows:

public class Evictor {
  public void evict(List<T> elements);
}

Evictors can mutate the list they are passed. In addition to getting rid off the setter, you should be able to support arbitrary eviction policies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline

@rayman7718
Copy link
Contributor Author

I have a separate PR that shows how this abstraction is used for error-tracking.
#543

this.elements.add(new ValueInfo<T>(Instant.now(), value));

// notify the policy object for performing any eviction that may be needed.
this.listGaugeEvictionPolicy.elementAddedCallback();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that there is a need for only 2 types of evictions (size, time), I wonder if we could just do evictions
inline on add? We wouldn't need a separate interface then. This would be the preferable approach.

If you still think we need an interface for a pluggable eviction policy, it could be something on the lines of:

public class EvictionPolicy<T> {
    void evict(Iterable<TimestampedValue<T>> currentElements);
}

instead of

public class ListGaugeEvictionPolicy<T> {
  public void elementAddedCallback();
}```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, purged the interface and just used the default class impl

* This class is used for bookkeeping of values added to the ListGauge.
* @param <T>
*/
public static class ValueInfo<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You wouldn't need this class and can use TimestampedValue instead

Copy link
Contributor Author

@rayman7718 rayman7718 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purged the interfaces to simplify current impl,
removed ValueInfo, and moved TimestampedInfo to samza-api

rmatharu added 2 commits June 4, 2018 18:33
Removing ValueInfo class and moving TimestampedValue to samza-api util

Adding TimestampedValue.java in core
@vjagadish1989
Copy link
Contributor

vjagadish1989 commented Jun 5, 2018

@rayman7718 : I meant that you don't need the interface and the corresponding implementations. I was proposing to inline the logic inside the ListGauge itself. We don't need the ability to schedule evictions asynchronously - we can instead trigger them when a new element is added.

* Evicts entries from the elements list, based on the given item-size and durationThreshold.
* Callers are responsible for thread-safety.
*/
public void evict(Queue<TimestampedValue<T>> elements, int maxNumberOfItems, Duration maxStaleness) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we could get rid off the eviction policy class as well and move this inside the ListGauge itself.

@rayman7718
Copy link
Contributor Author

Completely removed evictionpolicy separation, merged all eviction logic into listgauge

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more pass!

* {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
* For example, a set of specific logging-events (e.g., errors).
*
* Eviction from list is either done by consuming-code using the remove APIs or by specifying an eviction policy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nuke L:36 and update it so that it's inline with the current implementation

}

/**
* Get the Collection of Gauge values currently in the list, used when serializing this Gauge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Returns an immutable copy of the values in this {@link ListGauge}
  • don't need the comment on "Also evicts values based on the configured maxItems and maxStaleness."

* @return the collection of gauge values
*/
public Collection<T> getValues() {
// notify the policy object for performing any eviction that may be needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this comment since the code has changed?

* Evicts entries from the elements list, based on the given item-size and durationThreshold.
* Callers are responsible for thread-safety.
*/
public void evict(Queue<TimestampedValue<T>> elements, int maxNumberOfItems, Duration maxStaleness) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a public method? can this be internal to the gauge?

* Callers are responsible for thread-safety.
*/
public void evict(Queue<TimestampedValue<T>> elements, int maxNumberOfItems, Duration maxStaleness) {
this.evictBasedOnSize(elements, maxNumberOfItems);
Copy link
Contributor

@vjagadish1989 vjagadish1989 Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't ned to pass in elements, maxNum, staleness - should be simpler to instead use the data members of the class


/**
* Evicts entries from the elements list, based on the given item-size and durationThreshold.
* Callers are responsible for thread-safety.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contradicts the ListGauge docs a little bit since public methods are expected to be thread-safe?

assertEquals(3, snapshot.getValues().size());

// The time is 500 for update(4L) because getSnapshot calls clock once + 3
// The time is 500 for update(4L) because getValues calls clock once + 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this?

* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Copy link
Contributor

@vjagadish1989 vjagadish1989 Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert unnecessary changes here (and everywhere else)
nit 1: revert changes to license header formats
nit 2: revert superfluous new lines in other classes

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved! thank you Rayman for the contribution

@Override
public ListGauge newListGauge(String group, ListGauge listGauge) {
listGauges.putIfAbsent(group, new ArrayList());
ListGauge value = new ListGauge(group);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/value/listGauge

}

public <T> ListGauge newListGauge(String name) {
return registry.newListGauge(groupName, new ListGauge(name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you do a ListGauge<T> for explicit type inference here since the method is parameterized by T already?


val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()

// A string-gauge metric to capture exceptions at this container
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/exception/exceptions

minor: can nuke comment since it's obvious


producer.stop
// Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown
executor.schedule(this,0, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a "best-effort" flush? if not, we should probably schedule this from the current thread synchronously. I believe a best effort flush might be good enough for most cases since the timeout is 60s



/**
* A {@link ListGauge} is a {@link org.apache.samza.metrics.Metric} that buffers multiple instances of a type T in a list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Do you need the fully qualified name "{@link org.apache.samza.metrics.Metric}" here? it seems like you should be able to use {@link Metric}

@rayman7718 rayman7718 changed the title New metric type Listgauge New metric type Listgauge Jun 13, 2018
@rayman7718 rayman7718 changed the title New metric type Listgauge New metric type Listgauge JIRA: SAMZA-1733 Jun 13, 2018
@vjagadish1989
Copy link
Contributor

@rayman7718 : can we change the title to be something on the lines of:
"SAMZA-1234: Add .."

@rayman7718 rayman7718 changed the title New metric type Listgauge JIRA: SAMZA-1733 SAMZA-1733 New metric type Listgauge Jun 14, 2018
@asfgit asfgit closed this in f6a2d54 Jun 14, 2018
@vjagadish1989
Copy link
Contributor

merged and submitted. Thanks @rayman7718!

weisong44 pushed a commit to weisong44/samza that referenced this pull request Jun 22, 2018
This PR introduces a ListGauge type,
A subsequent PR: apache#543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter.

Author: Ray Matharu <[email protected]>

Reviewers: Jagadish <[email protected]>, Cameron Lee <[email protected]>

Closes apache#541 from rayman7718/listgauge
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants