Skip to content

Commit c96139d

Browse files
authored
[Rest Api Compatibility] Deprecate the use of synced flush (#75372)
synced flush is going to be replaced by flush. This commit allows to synced_flush api only in v7 compatibility mode. Worth noting - sync_id is gone and won't be available in v7 responses from indices.stats relates removal pr #50882 relates #51816
1 parent 71546b3 commit c96139d

File tree

7 files changed

+77
-64
lines changed

7 files changed

+77
-64
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -682,9 +682,6 @@ public void testRecovery() throws Exception {
682682
flushRequest.addParameter("force", "true");
683683
flushRequest.addParameter("wait_if_ongoing", "true");
684684
assertOK(client().performRequest(flushRequest));
685-
if (randomBoolean()) {
686-
syncedFlush(index);
687-
}
688685

689686
if (shouldHaveTranslog) {
690687
// Update a few documents so we are sure to have a translog

qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.xcontent.MediaType;
21+
import org.elasticsearch.common.xcontent.XContentType;
2022
import org.elasticsearch.common.xcontent.json.JsonXContent;
2123
import org.elasticsearch.common.xcontent.support.XContentMapValues;
24+
import org.elasticsearch.core.RestApiVersion;
2225
import org.elasticsearch.index.seqno.SeqNoStats;
2326
import org.elasticsearch.rest.RestStatus;
2427
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -309,9 +312,58 @@ public void testSyncedFlushTransition() throws Exception {
309312
try (RestClient newNodeClient = buildClient(restClientSettings(),
310313
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
311314
Request request = new Request("POST", index + "/_flush/synced");
312-
List<String> warningMsg = List.of("Synced flush was removed and a normal flush was performed instead. " +
313-
"This transition will be removed in a future version.");
314-
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> warnings.equals(warningMsg) == false));
315+
final String v7MediaType = XContentType.VND_JSON.toParsedMediaType()
316+
.responseContentTypeHeader(Map.of(MediaType.COMPATIBLE_WITH_PARAMETER_NAME,
317+
String.valueOf(RestApiVersion.minimumSupported().major)));
318+
List<String> warningMsg = List.of("Synced flush is deprecated and will be removed in 8.0." +
319+
" Use flush at /_flush or /{index}/_flush instead.");
320+
request.setOptions(RequestOptions.DEFAULT.toBuilder()
321+
.setWarningsHandler(warnings -> warnings.equals(warningMsg) == false)
322+
.addHeader("Accept", v7MediaType));
323+
324+
assertBusy(() -> {
325+
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
326+
assertThat(result.get("total"), equalTo(totalShards));
327+
assertThat(result.get("successful"), equalTo(totalShards));
328+
assertThat(result.get("failed"), equalTo(0));
329+
});
330+
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
331+
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
332+
}
333+
}
334+
335+
public void testFlushTransition() throws Exception {
336+
Nodes nodes = buildNodeAndVersions();
337+
assumeFalse("no new node found", nodes.getNewNodes().isEmpty());
338+
assumeFalse("no bwc node found", nodes.getBWCNodes().isEmpty());
339+
// Allocate shards to new nodes then verify flush requests processed by old nodes/new nodes
340+
String newNodes = nodes.getNewNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
341+
int numShards = randomIntBetween(1, 10);
342+
int numOfReplicas = randomIntBetween(0, nodes.getNewNodes().size() - 1);
343+
int totalShards = numShards * (numOfReplicas + 1);
344+
final String index = "test_flush";
345+
createIndex(index, Settings.builder()
346+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
347+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
348+
.put("index.routing.allocation.include._name", newNodes).build());
349+
ensureGreen(index);
350+
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
351+
try (RestClient oldNodeClient = buildClient(restClientSettings(),
352+
nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
353+
Request request = new Request("POST", index + "/_flush");
354+
assertBusy(() -> {
355+
Map<String, Object> result = ObjectPath.createFromResponse(oldNodeClient.performRequest(request)).evaluate("_shards");
356+
assertThat(result.get("total"), equalTo(totalShards));
357+
assertThat(result.get("successful"), equalTo(totalShards));
358+
assertThat(result.get("failed"), equalTo(0));
359+
});
360+
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
361+
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
362+
}
363+
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
364+
try (RestClient newNodeClient = buildClient(restClientSettings(),
365+
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
366+
Request request = new Request("POST", index + "/_flush");
315367
assertBusy(() -> {
316368
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
317369
assertThat(result.get("total"), equalTo(totalShards));

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
271271
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
272272
}
273273
if (randomBoolean()) {
274-
syncedFlush(index);
274+
flush(index, randomBoolean());
275275
}
276276
}
277277

@@ -309,7 +309,7 @@ public void testRecovery() throws Exception {
309309
}
310310
}
311311
if (randomBoolean()) {
312-
syncedFlush(index);
312+
flush(index, randomBoolean());
313313
}
314314
ensureGreen(index);
315315
}
@@ -584,7 +584,7 @@ public void testUpdateDoc() throws Exception {
584584
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
585585
}
586586
if (randomBoolean()) {
587-
syncedFlush(index);
587+
flush(index, randomBoolean());
588588
}
589589
}
590590

rest-api-spec/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ tasks.named("yamlRestCompatTest").configure {
8989
OS.current() != OS.WINDOWS
9090
}
9191
systemProperty 'tests.rest.blacklist', ([
92-
'indices.flush/10_basic/Index synced flush rest test',
9392
'search.aggregation/200_top_hits_metric/top_hits aggregation with sequence numbers',
9493
'search/310_match_bool_prefix/multi_match multiple fields with cutoff_frequency throws exception', //cutoff_frequency
9594
'search/340_type_query/type query', // type_query - probably should behave like match_all
@@ -227,6 +226,10 @@ tasks.named("transformV7RestTests").configure({ task ->
227226
task.replaceValueTextByKeyValue("catch",
228227
'/Please set node identifiers correctly. One and only one of \\[node_name\\], \\[node_names\\] and \\[node_ids\\] has to be set/',
229228
'/You must set \\[node_names\\] or \\[node_ids\\] but not both/')
229+
230+
// sync_id is no longer available in SegmentInfos.userData // "indices.flush/10_basic/Index synced flush rest test"
231+
task.replaceIsTrue("indices.testing.shards.0.0.commit.user_data.sync_id", "indices.testing.shards.0.0.commit.user_data")
232+
230233
})
231234

232235
tasks.register('enforceYamlTestConvention').configure {

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestSyncedFlushAction.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
import org.elasticsearch.action.support.IndicesOptions;
1414
import org.elasticsearch.client.node.NodeClient;
1515
import org.elasticsearch.common.Strings;
16-
import org.elasticsearch.common.logging.DeprecationCategory;
17-
import org.elasticsearch.common.logging.DeprecationLogger;
1816
import org.elasticsearch.common.xcontent.XContentBuilder;
17+
import org.elasticsearch.core.RestApiVersion;
1918
import org.elasticsearch.rest.BaseRestHandler;
2019
import org.elasticsearch.rest.BytesRestResponse;
2120
import org.elasticsearch.rest.RestChannel;
@@ -32,15 +31,23 @@
3231

3332
public class RestSyncedFlushAction extends BaseRestHandler {
3433

35-
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class);
36-
34+
private static final String DEPRECATION_MESSAGE =
35+
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.";
3736
@Override
3837
public List<Route> routes() {
3938
return List.of(
40-
new Route(GET, "/_flush/synced"),
41-
new Route(POST, "/_flush/synced"),
42-
new Route(GET, "/{index}/_flush/synced"),
43-
new Route(POST, "/{index}/_flush/synced"));
39+
Route.builder(GET, "/_flush/synced")
40+
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
41+
.build(),
42+
Route.builder(POST, "/_flush/synced")
43+
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
44+
.build(),
45+
Route.builder(GET, "/{index}/_flush/synced")
46+
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
47+
.build(),
48+
Route.builder(POST, "/{index}/_flush/synced")
49+
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
50+
.build());
4451
}
4552

4653
@Override
@@ -50,8 +57,6 @@ public String getName() {
5057

5158
@Override
5259
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
53-
DEPRECATION_LOGGER.deprecate(DeprecationCategory.API, "synced_flush",
54-
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
5560
final FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
5661
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
5762
return channel -> client.admin().indices().flush(flushRequest, new SimulateSyncedFlushResponseListener(channel));

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,48 +1651,6 @@ protected static Version minimumNodeVersion() throws IOException {
16511651
return minVersion;
16521652
}
16531653

1654-
protected void syncedFlush(String indexName) throws Exception {
1655-
final List<String> deprecationMessages = List.of(
1656-
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.");
1657-
final List<String> fixedDeprecationMessages = List.of(
1658-
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.");
1659-
final List<String> transitionMessages = List.of(
1660-
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
1661-
final WarningsHandler warningsHandler;
1662-
if (minimumNodeVersion().onOrAfter(Version.V_8_0_0)) {
1663-
warningsHandler = warnings -> warnings.equals(transitionMessages) == false;
1664-
} else if (minimumNodeVersion().onOrAfter(Version.V_7_6_0)) {
1665-
warningsHandler = warnings -> warnings.equals(deprecationMessages) == false && warnings.equals(transitionMessages) == false &&
1666-
warnings.equals(fixedDeprecationMessages) == false;
1667-
} else if (nodeVersions.stream().anyMatch(n -> n.onOrAfter(Version.V_8_0_0))) {
1668-
warningsHandler = warnings -> warnings.isEmpty() == false && warnings.equals(transitionMessages) == false;
1669-
} else {
1670-
warningsHandler = warnings -> warnings.isEmpty() == false;
1671-
}
1672-
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
1673-
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
1674-
assertBusy(() -> {
1675-
try {
1676-
final Request request = new Request("POST", indexName + "/_flush/synced");
1677-
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
1678-
Response resp = client().performRequest(request);
1679-
if (nodeVersions.stream().allMatch(v -> v.before(Version.V_8_0_0))) {
1680-
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
1681-
assertThat(result.get("failed"), equalTo(0));
1682-
}
1683-
} catch (ResponseException ex) {
1684-
if (ex.getResponse().getStatusLine().getStatusCode() == RestStatus.CONFLICT.getStatus()
1685-
&& ex.getResponse().getWarnings().equals(transitionMessages)) {
1686-
logger.info("a normal flush was performed instead");
1687-
} else {
1688-
throw new AssertionError(ex); // cause assert busy to retry
1689-
}
1690-
}
1691-
});
1692-
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
1693-
ensureGlobalCheckpointSynced(indexName);
1694-
}
1695-
16961654
@SuppressWarnings("unchecked")
16971655
private void ensureGlobalCheckpointSynced(String index) throws Exception {
16981656
assertBusy(() -> {

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/IndexPrivilegeIntegTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,10 @@ private void assertUserExecutes(String user, String action, String index, boolea
472472
if (userIsAllowed) {
473473
assertAccessIsAllowed(user, "POST", "/" + index + "/_refresh");
474474
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush");
475-
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush/synced");
476475
assertAccessIsAllowed(user, "POST", "/" + index + "/_forcemerge");
477476
} else {
478477
assertAccessIsDenied(user, "POST", "/" + index + "/_refresh");
479478
assertAccessIsDenied(user, "POST", "/" + index + "/_flush");
480-
assertAccessIsDenied(user, "POST", "/" + index + "/_flush/synced");
481479
assertAccessIsDenied(user, "POST", "/" + index + "/_forcemerge");
482480
}
483481
break;

0 commit comments

Comments
 (0)