Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package org.logstash.instrument.metrics.timer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.instrument.metrics.AbstractMetric;

import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -43,10 +48,13 @@ protected ConcurrentLiveTimerMetric(final String name) {
@Override
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
try {
trackedMillisState.getAndUpdate(TrackedMillisState::withIncrementedConcurrency);
trackedMillisState.getAndUpdate(existing -> existing.withIncrementedConcurrency(nanoTimeSupplier.getAsLong()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the nanoTimeSupplier.getAsLong() for recording the start time still occurs inside the UnaryConsumer because if this thread loses and is retried, we don't want to record an old start time.

return exceptionalSupplier.get();
} finally {
trackedMillisState.getAndUpdate(TrackedMillisState::withDecrementedConcurrency);
// lock in the actual time completed, and resolve state separately
// so that contention for recording state is not included in measurement.
final long endTime = nanoTimeSupplier.getAsLong();
trackedMillisState.getAndUpdate(existing -> existing.withDecrementedConcurrency(endTime));
}
}

Expand All @@ -65,13 +73,13 @@ private long getUntrackedMillis() {
}

private long getTrackedMillis() {
return this.trackedMillisState.getAcquire().getValue();
return this.trackedMillisState.getAcquire().getValue(nanoTimeSupplier.getAsLong());
}

interface TrackedMillisState {
TrackedMillisState withIncrementedConcurrency();
TrackedMillisState withDecrementedConcurrency();
long getValue();
TrackedMillisState withIncrementedConcurrency(long asOfNanoTime);
TrackedMillisState withDecrementedConcurrency(long asOfNanoTime);
long getValue(long asOfNanoTime);
}

private class StaticTrackedMillisState implements TrackedMillisState {
Expand All @@ -89,18 +97,18 @@ public StaticTrackedMillisState() {
}

@Override
public TrackedMillisState withIncrementedConcurrency() {
return new DynamicTrackedMillisState(nanoTimeSupplier.getAsLong(), this.cumulativeMillis, this.excessNanos, 1);
public TrackedMillisState withIncrementedConcurrency(final long asOfNanoTime) {
return new DynamicTrackedMillisState(asOfNanoTime, this.cumulativeMillis, this.excessNanos, 1);
}

@Override
public TrackedMillisState withDecrementedConcurrency() {
public TrackedMillisState withDecrementedConcurrency(final long asOfNanoTime) {
throw new IllegalStateException("TimerMetrics cannot track negative concurrency");
}


@Override
public long getValue() {
public long getValue(final long asOfNanoTime) {
return cumulativeMillis;
}
}
Expand All @@ -122,26 +130,26 @@ private class DynamicTrackedMillisState implements TrackedMillisState {
}

@Override
public TrackedMillisState withIncrementedConcurrency() {
return withAdjustedConcurrency(Vector.INCREMENT);
public TrackedMillisState withIncrementedConcurrency(final long asOfNanoTime) {
return withAdjustedConcurrency(asOfNanoTime, Vector.INCREMENT);
}

@Override
public TrackedMillisState withDecrementedConcurrency() {
return withAdjustedConcurrency(Vector.DECREMENT);
public TrackedMillisState withDecrementedConcurrency(final long asOfNanoTime) {
return withAdjustedConcurrency(asOfNanoTime, Vector.DECREMENT);
}

@Override
public long getValue() {
final long nanoAdjustment = getNanoAdjustment(nanoTimeSupplier.getAsLong());
public long getValue(final long asOfNanoTime) {
final long nanoAdjustment = getNanoAdjustment(asOfNanoTime);
final long milliAdjustment = wholeMillisFromNanos(nanoAdjustment);

return Math.addExact(this.millisAtCheckpoint, milliAdjustment);
}

private TrackedMillisState withAdjustedConcurrency(final Vector concurrencyAdjustmentVector) {
private TrackedMillisState withAdjustedConcurrency(final long asOfNanoTime, final Vector concurrencyAdjustmentVector) {
final int newConcurrency = Math.addExact(this.concurrencySinceCheckpoint, concurrencyAdjustmentVector.value());
final long newCheckpointNanoTime = nanoTimeSupplier.getAsLong();
final long newCheckpointNanoTime = asOfNanoTime;

final long totalNanoAdjustment = getNanoAdjustment(newCheckpointNanoTime);

Expand All @@ -165,7 +173,7 @@ private long getNanoAdjustment(final long checkpointNanoTime) {

/**
* This private enum is a type-safety guard for
* {@link DynamicTrackedMillisState#withAdjustedConcurrency(Vector)}.
* {@link DynamicTrackedMillisState#withAdjustedConcurrency(long, Vector)}.
*/
private enum Vector {
INCREMENT{ int value() { return +1; } },
Expand Down