11package org .logstash .instrument .metrics .timer ;
22
3+ import org .apache .logging .log4j .LogManager ;
4+ import org .apache .logging .log4j .Logger ;
35import org .logstash .instrument .metrics .AbstractMetric ;
46
57import java .util .Objects ;
8+ import java .util .concurrent .Executors ;
9+ import java .util .concurrent .ScheduledExecutorService ;
10+ import java .util .concurrent .TimeUnit ;
611import java .util .concurrent .atomic .AtomicReference ;
712import java .util .concurrent .atomic .LongAdder ;
813import java .util .function .LongSupplier ;
@@ -43,10 +48,13 @@ protected ConcurrentLiveTimerMetric(final String name) {
4348 @ Override
4449 public <T , E extends Throwable > T time (ExceptionalSupplier <T , E > exceptionalSupplier ) throws E {
4550 try {
46- trackedMillisState .getAndUpdate (TrackedMillisState :: withIncrementedConcurrency );
51+ trackedMillisState .getAndUpdate (existing -> existing . withIncrementedConcurrency ( nanoTimeSupplier . getAsLong ()) );
4752 return exceptionalSupplier .get ();
4853 } finally {
49- trackedMillisState .getAndUpdate (TrackedMillisState ::withDecrementedConcurrency );
54+ // lock in the actual time completed, and resolve state separately
55+ // so that contention for recording state is not included in measurement.
56+ final long endTime = nanoTimeSupplier .getAsLong ();
57+ trackedMillisState .getAndUpdate (existing -> existing .withDecrementedConcurrency (endTime ));
5058 }
5159 }
5260
@@ -65,13 +73,13 @@ private long getUntrackedMillis() {
6573 }
6674
6775 private long getTrackedMillis () {
68- return this .trackedMillisState .getAcquire ().getValue ();
76+ return this .trackedMillisState .getAcquire ().getValue (nanoTimeSupplier . getAsLong () );
6977 }
7078
7179 interface TrackedMillisState {
72- TrackedMillisState withIncrementedConcurrency ();
73- TrackedMillisState withDecrementedConcurrency ();
74- long getValue ();
80+ TrackedMillisState withIncrementedConcurrency (long asOfNanoTime );
81+ TrackedMillisState withDecrementedConcurrency (long asOfNanoTime );
82+ long getValue (long asOfNanoTime );
7583 }
7684
7785 private class StaticTrackedMillisState implements TrackedMillisState {
@@ -89,18 +97,18 @@ public StaticTrackedMillisState() {
8997 }
9098
9199 @ Override
92- public TrackedMillisState withIncrementedConcurrency () {
93- return new DynamicTrackedMillisState (nanoTimeSupplier . getAsLong () , this .cumulativeMillis , this .excessNanos , 1 );
100+ public TrackedMillisState withIncrementedConcurrency (final long asOfNanoTime ) {
101+ return new DynamicTrackedMillisState (asOfNanoTime , this .cumulativeMillis , this .excessNanos , 1 );
94102 }
95103
96104 @ Override
97- public TrackedMillisState withDecrementedConcurrency () {
105+ public TrackedMillisState withDecrementedConcurrency (final long asOfNanoTime ) {
98106 throw new IllegalStateException ("TimerMetrics cannot track negative concurrency" );
99107 }
100108
101109
102110 @ Override
103- public long getValue () {
111+ public long getValue (final long asOfNanoTime ) {
104112 return cumulativeMillis ;
105113 }
106114 }
@@ -122,26 +130,26 @@ private class DynamicTrackedMillisState implements TrackedMillisState {
122130 }
123131
124132 @ Override
125- public TrackedMillisState withIncrementedConcurrency () {
126- return withAdjustedConcurrency (Vector .INCREMENT );
133+ public TrackedMillisState withIncrementedConcurrency (final long asOfNanoTime ) {
134+ return withAdjustedConcurrency (asOfNanoTime , Vector .INCREMENT );
127135 }
128136
129137 @ Override
130- public TrackedMillisState withDecrementedConcurrency () {
131- return withAdjustedConcurrency (Vector .DECREMENT );
138+ public TrackedMillisState withDecrementedConcurrency (final long asOfNanoTime ) {
139+ return withAdjustedConcurrency (asOfNanoTime , Vector .DECREMENT );
132140 }
133141
134142 @ Override
135- public long getValue () {
136- final long nanoAdjustment = getNanoAdjustment (nanoTimeSupplier . getAsLong () );
143+ public long getValue (final long asOfNanoTime ) {
144+ final long nanoAdjustment = getNanoAdjustment (asOfNanoTime );
137145 final long milliAdjustment = wholeMillisFromNanos (nanoAdjustment );
138146
139147 return Math .addExact (this .millisAtCheckpoint , milliAdjustment );
140148 }
141149
142- private TrackedMillisState withAdjustedConcurrency (final Vector concurrencyAdjustmentVector ) {
150+ private TrackedMillisState withAdjustedConcurrency (final long asOfNanoTime , final Vector concurrencyAdjustmentVector ) {
143151 final int newConcurrency = Math .addExact (this .concurrencySinceCheckpoint , concurrencyAdjustmentVector .value ());
144- final long newCheckpointNanoTime = nanoTimeSupplier . getAsLong () ;
152+ final long newCheckpointNanoTime = asOfNanoTime ;
145153
146154 final long totalNanoAdjustment = getNanoAdjustment (newCheckpointNanoTime );
147155
@@ -165,7 +173,7 @@ private long getNanoAdjustment(final long checkpointNanoTime) {
165173
166174 /**
167175 * This private enum is a type-safety guard for
168- * {@link DynamicTrackedMillisState#withAdjustedConcurrency(Vector)}.
176+ * {@link DynamicTrackedMillisState#withAdjustedConcurrency(long, Vector)}.
169177 */
170178 private enum Vector {
171179 INCREMENT { int value () { return +1 ; } },
0 commit comments