Skip to content

Commit 338afe3

Browse files
authored
Retry synced-flush on conflict in tests (#66968)
Closes #66631
1 parent 2f7240d commit 338afe3

File tree

4 files changed

+24
-30
lines changed

4 files changed

+24
-30
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -731,13 +731,7 @@ public void testRecovery() throws Exception {
731731
assertOK(client().performRequest(flushRequest));
732732

733733
if (randomBoolean()) {
734-
// We had a bug before where we failed to perform peer recovery with sync_id from 5.x to 6.x.
735-
// We added this synced flush so we can exercise different paths of recovery code.
736-
try {
737-
performSyncedFlush(index);
738-
} catch (ResponseException ignored) {
739-
// synced flush is optional here
740-
}
734+
performSyncedFlush(index, randomBoolean());
741735
}
742736
if (shouldHaveTranslog) {
743737
// Update a few documents so we are sure to have a translog
@@ -1421,7 +1415,6 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
14211415
}
14221416
}
14231417

1424-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66631")
14251418
public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
14261419
if (isRunningAgainstOldCluster()) {
14271420
final Settings.Builder settings = Settings.builder()
@@ -1452,7 +1445,7 @@ public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
14521445
if (randomBoolean()) {
14531446
flush(index, randomBoolean());
14541447
} else if (randomBoolean()) {
1455-
performSyncedFlush(index);
1448+
performSyncedFlush(index, randomBoolean());
14561449
}
14571450
saveInfoDocument("doc_count", Integer.toString(numDocs));
14581451
}

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,8 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
308308
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
309309
}
310310
if (randomBoolean()) {
311-
syncedFlush(index);
311+
performSyncedFlush(index, randomBoolean());
312+
ensureGlobalCheckpointSynced(index);
312313
}
313314
}
314315

@@ -587,22 +588,6 @@ private void assertClosedIndex(final String index, final boolean checkRoutingTab
587588
}
588589
}
589590

590-
private void syncedFlush(String index) throws Exception {
591-
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
592-
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
593-
assertBusy(() -> {
594-
try {
595-
Response resp = performSyncedFlush(index);
596-
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
597-
assertThat(result.get("failed"), equalTo(0));
598-
} catch (ResponseException ex) {
599-
throw new AssertionError(ex); // cause assert busy to retry
600-
}
601-
});
602-
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
603-
ensureGlobalCheckpointSynced(index);
604-
}
605-
606591
@SuppressWarnings("unchecked")
607592
private void assertPeerRecoveredFiles(String reason, String index, String targetNode, Matcher<Integer> sizeMatcher) throws IOException {
608593
Map<?, ?> recoveryStats = entityAsMap(client().performRequest(new Request("GET", index + "/_recovery")));
@@ -668,7 +653,8 @@ public void testUpdateDoc() throws Exception {
668653
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
669654
}
670655
if (randomBoolean()) {
671-
syncedFlush(index);
656+
performSyncedFlush(index, randomBoolean());
657+
ensureGlobalCheckpointSynced(index);
672658
}
673659
}
674660

qa/translog-policy/src/test/java/org/elasticsearch/upgrades/TranslogPolicyIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void testRecoverReplica() throws Exception {
132132
if (randomBoolean()) {
133133
flush(index, randomBoolean());
134134
} else if (randomBoolean()) {
135-
performSyncedFlush(index);
135+
performSyncedFlush(index, randomBoolean());
136136
}
137137
}
138138
ensureGreen(index);

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,7 @@ protected static Version minimumNodeVersion() throws IOException {
15021502
return minVersion;
15031503
}
15041504

1505-
protected static Response performSyncedFlush(String indexName) throws IOException {
1505+
protected static void performSyncedFlush(String indexName, boolean retryOnConflict) throws Exception {
15061506
final Request request = new Request("POST", indexName + "/_flush/synced");
15071507
final List<String> expectedWarnings = Collections.singletonList(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE);
15081508
// prior to v7.11, the deprecation message for synced flush was incorrect so we also need to handle that
@@ -1519,7 +1519,22 @@ protected static Response performSyncedFlush(String indexName) throws IOExceptio
15191519
warnings.equals(expectedWarnings) == false && warnings.equals(incorrectWarningMessage) == false);
15201520
}
15211521
request.setOptions(options);
1522-
return client().performRequest(request);
1522+
// We have to spin a synced-flush request because we fire the global checkpoint sync for the last write operation.
1523+
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
1524+
assertBusy(() -> {
1525+
try {
1526+
Response resp = client().performRequest(request);
1527+
if (retryOnConflict) {
1528+
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
1529+
assertThat(result.get("failed"), equalTo(0));
1530+
}
1531+
} catch (ResponseException ex) {
1532+
assertThat(ex.getResponse().getStatusLine(), equalTo(HttpStatus.SC_CONFLICT));
1533+
if (retryOnConflict) {
1534+
throw new AssertionError(ex); // cause assert busy to retry
1535+
}
1536+
}
1537+
});
15231538
}
15241539

15251540
/**

0 commit comments

Comments
 (0)