Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -380,7 +379,6 @@ public static class AsyncAction {
// choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
private final String blobPath = "temp-analysis-" + UUIDs.randomBase64UUID();

private final AtomicLong expectedRegisterValue = new AtomicLong();
private final Queue<Consumer<Releasable>> queue = ConcurrentCollections.newQueue();
private final AtomicReference<Exception> failure = new AtomicReference<>();
private final Semaphore innerFailures = new Semaphore(5); // limit the number of suppressed failures
Expand Down Expand Up @@ -486,16 +484,17 @@ public void run() {
if (minClusterTransportVersion.onOrAfter(TransportVersions.V_8_8_0)) {
final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
final AtomicBoolean contendedRegisterAnalysisComplete = new AtomicBoolean();
final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this value need to be nodes.size()? Ie, if we have 5 nodes but there have been nothing registered so far, does it make sense to be passing 5 as a parameter here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're reading "register" as a verb, but it's a noun here. The variable name registerOperations means "number of operations on the register".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and we want to do at least one such operation from every node, maybe more if requested:

final DiscoveryNode node = nodes.get(i < nodes.size() ? i : random.nextInt(nodes.size()));

try (
var registerRefs = new RefCountingRunnable(
finalRegisterValueVerifier(
contendedRegisterName,
registerOperations,
random,
Releasables.wrap(requestRefs.acquire(), () -> contendedRegisterAnalysisComplete.set(true))
)
)
) {
final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
for (int i = 0; i < registerOperations; i++) {
final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
request.getRepositoryName(),
Expand Down Expand Up @@ -631,9 +630,7 @@ private void runContendedRegisterAnalysis(Releasable ref, ContendedRegisterAnaly
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty response) {
expectedRegisterValue.incrementAndGet();
}
public void onResponse(ActionResponse.Empty response) {}

@Override
public void onFailure(Exception exp) {
Expand All @@ -647,7 +644,7 @@ public void onFailure(Exception exp) {
}
}

private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
private Runnable finalRegisterValueVerifier(String registerName, int expectedFinalRegisterValue, Random random, Releasable ref) {
return new Runnable() {

final CheckedConsumer<ActionListener<OptionalBytesReference>, Exception> finalValueReader = switch (random.nextInt(3)) {
Expand Down Expand Up @@ -706,12 +703,9 @@ public String toString() {
}
};

long expectedFinalRegisterValue = Long.MIN_VALUE;

@Override
public void run() {
if (isRunning()) {
expectedFinalRegisterValue = expectedRegisterValue.get();
transportService.getThreadPool()
.executor(ThreadPool.Names.SNAPSHOT)
.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {
Expand Down