diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java index 4adbef05a0e..9b681944b59 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/ConcurrentLiveTimerMetric.java @@ -43,10 +43,13 @@ protected ConcurrentLiveTimerMetric(final String name) { @Override public T time(ExceptionalSupplier exceptionalSupplier) throws E { try { - trackedMillisState.getAndUpdate(TrackedMillisState::withIncrementedConcurrency); + trackedMillisState.getAndUpdate(existing -> existing.withIncrementedConcurrency(nanoTimeSupplier.getAsLong())); return exceptionalSupplier.get(); } finally { - trackedMillisState.getAndUpdate(TrackedMillisState::withDecrementedConcurrency); + // lock in the actual time completed, and resolve state separately + // so that contention for recording state is not included in measurement. + final long endTime = nanoTimeSupplier.getAsLong(); + trackedMillisState.getAndUpdate(existing -> existing.withDecrementedConcurrency(endTime)); } } @@ -65,13 +68,13 @@ private long getUntrackedMillis() { } private long getTrackedMillis() { - return this.trackedMillisState.getAcquire().getValue(); + return this.trackedMillisState.getAcquire().getValue(nanoTimeSupplier.getAsLong()); } interface TrackedMillisState { - TrackedMillisState withIncrementedConcurrency(); - TrackedMillisState withDecrementedConcurrency(); - long getValue(); + TrackedMillisState withIncrementedConcurrency(long asOfNanoTime); + TrackedMillisState withDecrementedConcurrency(long asOfNanoTime); + long getValue(long asOfNanoTime); } private class StaticTrackedMillisState implements TrackedMillisState { @@ -89,18 +92,18 @@ public StaticTrackedMillisState() { } @Override - public TrackedMillisState withIncrementedConcurrency() { - return new DynamicTrackedMillisState(nanoTimeSupplier.getAsLong(), this.cumulativeMillis, this.excessNanos, 1); + public TrackedMillisState withIncrementedConcurrency(final long asOfNanoTime) { + return new DynamicTrackedMillisState(asOfNanoTime, this.cumulativeMillis, this.excessNanos, 1); } @Override - public TrackedMillisState withDecrementedConcurrency() { + public TrackedMillisState withDecrementedConcurrency(final long asOfNanoTime) { throw new IllegalStateException("TimerMetrics cannot track negative concurrency"); } @Override - public long getValue() { + public long getValue(final long asOfNanoTime) { return cumulativeMillis; } } @@ -122,26 +125,26 @@ private class DynamicTrackedMillisState implements TrackedMillisState { } @Override - public TrackedMillisState withIncrementedConcurrency() { - return withAdjustedConcurrency(Vector.INCREMENT); + public TrackedMillisState withIncrementedConcurrency(final long asOfNanoTime) { + return withAdjustedConcurrency(asOfNanoTime, Vector.INCREMENT); } @Override - public TrackedMillisState withDecrementedConcurrency() { - return withAdjustedConcurrency(Vector.DECREMENT); + public TrackedMillisState withDecrementedConcurrency(final long asOfNanoTime) { + return withAdjustedConcurrency(asOfNanoTime, Vector.DECREMENT); } @Override - public long getValue() { - final long nanoAdjustment = getNanoAdjustment(nanoTimeSupplier.getAsLong()); + public long getValue(final long asOfNanoTime) { + final long nanoAdjustment = getNanoAdjustment(asOfNanoTime); final long milliAdjustment = wholeMillisFromNanos(nanoAdjustment); return Math.addExact(this.millisAtCheckpoint, milliAdjustment); } - private TrackedMillisState withAdjustedConcurrency(final Vector concurrencyAdjustmentVector) { + private TrackedMillisState withAdjustedConcurrency(final long asOfNanoTime, final Vector concurrencyAdjustmentVector) { final int newConcurrency = Math.addExact(this.concurrencySinceCheckpoint, concurrencyAdjustmentVector.value()); - final long newCheckpointNanoTime = nanoTimeSupplier.getAsLong(); + final long newCheckpointNanoTime = asOfNanoTime; final long totalNanoAdjustment = getNanoAdjustment(newCheckpointNanoTime); @@ -165,7 +168,7 @@ private long getNanoAdjustment(final long checkpointNanoTime) { /** * This private enum is a type-safety guard for - * {@link DynamicTrackedMillisState#withAdjustedConcurrency(Vector)}. + * {@link DynamicTrackedMillisState#withAdjustedConcurrency(long, Vector)}. */ private enum Vector { INCREMENT{ int value() { return +1; } },