diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java index fd5bacdf5592b..35e11ae40d51a 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java @@ -77,7 +77,6 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongSupplier; @@ -380,7 +379,6 @@ public static class AsyncAction { // choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction private final String blobPath = "temp-analysis-" + UUIDs.randomBase64UUID(); - private final AtomicLong expectedRegisterValue = new AtomicLong(); private final Queue> queue = ConcurrentCollections.newQueue(); private final AtomicReference failure = new AtomicReference<>(); private final Semaphore innerFailures = new Semaphore(5); // limit the number of suppressed failures @@ -486,16 +484,17 @@ public void run() { if (minClusterTransportVersion.onOrAfter(TransportVersions.V_8_8_0)) { final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random); final AtomicBoolean contendedRegisterAnalysisComplete = new AtomicBoolean(); + final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount()); try ( var registerRefs = new RefCountingRunnable( finalRegisterValueVerifier( contendedRegisterName, + registerOperations, random, Releasables.wrap(requestRefs.acquire(), () -> contendedRegisterAnalysisComplete.set(true)) ) ) ) { - final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount()); for (int i = 0; i < registerOperations; i++) { final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request( request.getRepositoryName(), @@ -631,9 +630,7 @@ private void runContendedRegisterAnalysis(Releasable ref, ContendedRegisterAnaly TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() { @Override - public void onResponse(ActionResponse.Empty response) { - expectedRegisterValue.incrementAndGet(); - } + public void onResponse(ActionResponse.Empty response) {} @Override public void onFailure(Exception exp) { @@ -647,7 +644,7 @@ public void onFailure(Exception exp) { } } - private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) { + private Runnable finalRegisterValueVerifier(String registerName, int expectedFinalRegisterValue, Random random, Releasable ref) { return new Runnable() { final CheckedConsumer, Exception> finalValueReader = switch (random.nextInt(3)) { @@ -706,12 +703,9 @@ public String toString() { } }; - long expectedFinalRegisterValue = Long.MIN_VALUE; - @Override public void run() { if (isRunning()) { - expectedFinalRegisterValue = expectedRegisterValue.get(); transportService.getThreadPool() .executor(ThreadPool.Names.SNAPSHOT) .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {