diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index f673513950bb4..b5e96ac2a8b34 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -1,5 +1,6 @@ apply plugin: 'elasticsearch.internal-es-plugin' apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-java-rest-test' esplugin { name 'x-pack-ccr' description 'Elasticsearch Expanded Pack Plugin - CCR' @@ -33,6 +34,16 @@ tasks.named('internalClusterTestTestingConventions').configure { baseClass 'org.elasticsearch.test.ESIntegTestCase' } +tasks.named("javaRestTest").configure { + usesDefaultDistribution() +} + +restResources { + restApi { + include 'bulk', 'search', '_common', 'indices', 'index', 'cluster', 'data_stream' + } +} + addQaCheckDependencies(project) dependencies { diff --git a/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java b/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java new file mode 100644 index 0000000000000..e5dfea7b772f2 --- /dev/null +++ b/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.Build; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class ShardChangesRestIT extends ESRestTestCase { + private static final String CCR_SHARD_CHANGES_ENDPOINT = "/%s/ccr/shard_changes"; + private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk"; + + private static final String[] SHARD_RESPONSE_FIELDS = new String[] { + "took_in_millis", + "operations", + "shard_id", + "index", + "settings_version", + "max_seq_no_of_updates_or_deletes", + "number_of_operations", + "mapping_version", + "aliases_version", + "max_seq_no", + "global_checkpoint" }; + private static final String[] NAMES = { "skywalker", "leia", "obi-wan", "yoda", "chewbacca", "r2-d2", "c-3po", "darth-vader" }; + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .setting("xpack.security.enabled", "false") + .setting("xpack.license.self_generated.type", "trial") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Before + public void assumeSnapshotBuild() { + assumeTrue("/{index}/ccr/shard_changes endpoint only available in snapshot builds", Build.current().isSnapshot()); + } + + public void testShardChangesNoOperation() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build() + ); + assertTrue(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + assertOK(client().performRequest(shardChangesRequest)); + } + + public void testShardChangesDefaultParams() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build(); + final String mappings = """ + { + "properties": { + "name": { + "type": "keyword" + } + } + } + """; + createIndex(indexName, settings, mappings); + assertTrue(indexExists(indexName)); + + assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(10, 20)))); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + final Response response = client().performRequest(shardChangesRequest); + assertOK(response); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false) + ); + } + + public void testShardChangesWithAllParameters() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build() + ); + assertTrue(indexExists(indexName)); + + assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200)))); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + shardChangesRequest.addParameter("from_seq_no", "0"); + shardChangesRequest.addParameter("max_operations_count", "1"); + shardChangesRequest.addParameter("poll_timeout", "10s"); + shardChangesRequest.addParameter("max_batch_size", "1MB"); + + final Response response = client().performRequest(shardChangesRequest); + assertOK(response); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false) + ); + } + + public void testShardChangesMultipleRequests() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build() + ); + assertTrue(indexExists(indexName)); + + assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200)))); + + final Request firstRequest = new Request("GET", shardChangesEndpoint(indexName)); + firstRequest.addParameter("from_seq_no", "0"); + firstRequest.addParameter("max_operations_count", "10"); + firstRequest.addParameter("poll_timeout", "10s"); + firstRequest.addParameter("max_batch_size", "1MB"); + + final Response firstResponse = client().performRequest(firstRequest); + assertOK(firstResponse); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false) + ); + + final Request secondRequest = new Request("GET", shardChangesEndpoint(indexName)); + secondRequest.addParameter("from_seq_no", "10"); + secondRequest.addParameter("max_operations_count", "10"); + secondRequest.addParameter("poll_timeout", "10s"); + secondRequest.addParameter("max_batch_size", "1MB"); + + final Response secondResponse = client().performRequest(secondRequest); + assertOK(secondResponse); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false) + ); + } + + public void testShardChangesInvalidFromSeqNo() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + assertTrue(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + shardChangesRequest.addParameter("from_seq_no", "-1"); + final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); + assertResponseException(ex, RestStatus.BAD_REQUEST, "Validation Failed: 1: fromSeqNo [-1] cannot be lower than 0"); + } + + public void testShardChangesInvalidMaxOperationsCount() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + assertTrue(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + shardChangesRequest.addParameter("max_operations_count", "-1"); + final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); + assertResponseException(ex, RestStatus.BAD_REQUEST, "Validation Failed: 1: maxOperationCount [-1] cannot be lower than 0"); + } + + public void testShardChangesNegativePollTimeout() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + assertTrue(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + shardChangesRequest.addParameter("poll_timeout", "-1s"); + assertOK(client().performRequest(shardChangesRequest)); + } + + public void testShardChangesInvalidMaxBatchSize() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + assertTrue(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + shardChangesRequest.addParameter("max_batch_size", "-1MB"); + final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); + assertResponseException( + ex, + RestStatus.BAD_REQUEST, + "failed to parse setting [max_batch_size] with value [-1MB] as a size in bytes" + ); + } + + public void testShardChangesMissingIndex() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + assertFalse(indexExists(indexName)); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); + final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); + assertResponseException(ex, RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]"); + } + + private static Request bulkRequest(final String indexName, int numberOfDocuments) { + final StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < numberOfDocuments; i++) { + sb.append(String.format(Locale.ROOT, "{ \"index\": { \"_id\": \"%d\" } }\n{ \"name\": \"%s\" }\n", i + 1, randomFrom(NAMES))); + } + + final Request request = new Request("POST", bulkEndpoint(indexName)); + request.setJsonEntity(sb.toString()); + request.addParameter("refresh", "true"); + return request; + } + + private static String shardChangesEndpoint(final String indexName) { + return String.format(Locale.ROOT, CCR_SHARD_CHANGES_ENDPOINT, indexName); + } + + private static String bulkEndpoint(final String indexName) { + return String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName); + } + + private void assertResponseException(final ResponseException ex, final RestStatus restStatus, final String error) { + assertEquals(restStatus.getStatus(), ex.getResponse().getStatusLine().getStatusCode()); + assertThat(ex.getMessage(), Matchers.containsString(error)); + } + + private void assertShardChangesResponse(final Map shardChangesResponseBody) { + for (final String fieldName : SHARD_RESPONSE_FIELDS) { + final Object fieldValue = shardChangesResponseBody.get(fieldName); + assertNotNull("Field " + fieldName + " is missing or has a null value.", fieldValue); + + if ("operations".equals(fieldName)) { + if (fieldValue instanceof List operationsList) { + assertFalse("Field 'operations' is empty.", operationsList.isEmpty()); + + for (final Object operation : operationsList) { + assertNotNull("Operation is null.", operation); + if (operation instanceof Map operationMap) { + assertNotNull("seq_no is missing in operation.", operationMap.get("seq_no")); + assertNotNull("op_type is missing in operation.", operationMap.get("op_type")); + assertNotNull("primary_term is missing in operation.", operationMap.get("primary_term")); + } + } + } + } + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 87a4c2c7d4826..5305e179058b2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Build; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequest; @@ -91,6 +92,7 @@ import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestShardChangesAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackFeatureUsage; import org.elasticsearch.xpack.core.XPackField; @@ -112,6 +114,7 @@ import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -140,7 +143,34 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing"; public static final TransportVersion TRANSPORT_VERSION_ACTION_WITH_SHARD_ID = TransportVersions.V_8_9_X; + private static final List BASE_REST_HANDLERS = Arrays.asList( + // stats API + new RestFollowStatsAction(), + new RestCcrStatsAction(), + new RestFollowInfoAction(), + // follow APIs + new RestPutFollowAction(), + new RestResumeFollowAction(), + new RestPauseFollowAction(), + new RestUnfollowAction(), + // auto-follow APIs + new RestDeleteAutoFollowPatternAction(), + new RestPutAutoFollowPatternAction(), + new RestGetAutoFollowPatternAction(), + new RestPauseAutoFollowPatternAction(), + new RestResumeAutoFollowPatternAction(), + // forget follower API + new RestForgetFollowerAction() + ); + private static final List REST_HANDLERS = Collections.unmodifiableList(BASE_REST_HANDLERS); + + private static final List SNAPSHOT_BUILD_REST_HANDLERS; + static { + List snapshotBuildHandlers = new ArrayList<>(BASE_REST_HANDLERS); + snapshotBuildHandlers.add(new RestShardChangesAction()); + SNAPSHOT_BUILD_REST_HANDLERS = Collections.unmodifiableList(snapshotBuildHandlers); + } private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; @@ -272,25 +302,7 @@ public List getRestHandlers( return emptyList(); } - return Arrays.asList( - // stats API - new RestFollowStatsAction(), - new RestCcrStatsAction(), - new RestFollowInfoAction(), - // follow APIs - new RestPutFollowAction(), - new RestResumeFollowAction(), - new RestPauseFollowAction(), - new RestUnfollowAction(), - // auto-follow APIs - new RestDeleteAutoFollowPatternAction(), - new RestPutAutoFollowPatternAction(), - new RestGetAutoFollowPatternAction(), - new RestPauseAutoFollowPatternAction(), - new RestResumeAutoFollowPatternAction(), - // forget follower API - new RestForgetFollowerAction() - ); + return Build.current().isSnapshot() ? SNAPSHOT_BUILD_REST_HANDLERS : REST_HANDLERS; } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java new file mode 100644 index 0000000000000..84171ebce162f --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java @@ -0,0 +1,300 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +/** + * A REST handler that retrieves shard changes in a specific index whose name is provided as a parameter. + * It handles GET requests to the "/{index}/ccr/shard_changes" endpoint retrieving shard-level changes, + * such as translog operations, mapping version, settings version, aliases version, the global checkpoint, + * maximum sequence number and maximum sequence number of updates or deletes. + *

+ * Note: This handler is only available for snapshot builds. + */ +public class RestShardChangesAction extends BaseRestHandler { + + private static final long DEFAULT_FROM_SEQ_NO = 0L; + private static final ByteSizeValue DEFAULT_MAX_BATCH_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB); + private static final TimeValue DEFAULT_POLL_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); + private static final int DEFAULT_MAX_OPERATIONS_COUNT = 1024; + private static final int DEFAULT_TIMEOUT_SECONDS = 60; + private static final TimeValue GET_INDEX_UUID_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + private static final TimeValue SHARD_STATS_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + private static final String INDEX_PARAM_NAME = "index"; + private static final String FROM_SEQ_NO_PARAM_NAME = "from_seq_no"; + private static final String MAX_BATCH_SIZE_PARAM_NAME = "max_batch_size"; + private static final String POLL_TIMEOUT_PARAM_NAME = "poll_timeout"; + private static final String MAX_OPERATIONS_COUNT_PARAM_NAME = "max_operations_count"; + + @Override + public String getName() { + return "ccr_shard_changes_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/{index}/ccr/shard_changes")); + } + + /** + * Prepares the request for retrieving shard changes. + * + * @param restRequest The REST request. + * @param client The NodeClient for executing the request. + * @return A RestChannelConsumer for handling the request. + * @throws IOException If an error occurs while preparing the request. + */ + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { + final var indexName = restRequest.param(INDEX_PARAM_NAME); + final var fromSeqNo = restRequest.paramAsLong(FROM_SEQ_NO_PARAM_NAME, DEFAULT_FROM_SEQ_NO); + final var maxBatchSize = restRequest.paramAsSize(MAX_BATCH_SIZE_PARAM_NAME, DEFAULT_MAX_BATCH_SIZE); + final var pollTimeout = restRequest.paramAsTime(POLL_TIMEOUT_PARAM_NAME, DEFAULT_POLL_TIMEOUT); + final var maxOperationsCount = restRequest.paramAsInt(MAX_OPERATIONS_COUNT_PARAM_NAME, DEFAULT_MAX_OPERATIONS_COUNT); + + final CompletableFuture indexUUIDCompletableFuture = asyncGetIndexUUID( + client, + indexName, + client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME) + ); + final CompletableFuture shardStatsCompletableFuture = asyncShardStats( + client, + indexName, + client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME) + ); + + return channel -> CompletableFuture.allOf(indexUUIDCompletableFuture, shardStatsCompletableFuture).thenRun(() -> { + try { + final String indexUUID = indexUUIDCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + final ShardStats shardStats = shardStatsCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + final ShardId shardId = shardStats.getShardRouting().shardId(); + final String expectedHistoryUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + + final ShardChangesAction.Request shardChangesRequest = shardChangesRequest( + indexName, + indexUUID, + shardId, + expectedHistoryUUID, + fromSeqNo, + maxBatchSize, + pollTimeout, + maxOperationsCount + ); + client.execute(ShardChangesAction.INSTANCE, shardChangesRequest, new RestActionListener<>(channel) { + @Override + protected void processResponse(final ShardChangesAction.Response response) { + channel.sendResponse(new RestResponse(RestStatus.OK, shardChangesResponseToXContent(response, indexName, shardId))); + } + }); + + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Error while retrieving shard changes", e); + } catch (TimeoutException te) { + throw new IllegalStateException("Timeout while waiting for shard stats or index UUID", te); + } + }).exceptionally(ex -> { + channel.sendResponse(new RestResponse(RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]")); + return null; + }); + } + + /** + * Creates a ShardChangesAction.Request object with the provided parameters. + * + * @param indexName The name of the index for which to retrieve shard changes. + * @param indexUUID The UUID of the index. + * @param shardId The ShardId for which to retrieve shard changes. + * @param expectedHistoryUUID The expected history UUID of the shard. + * @param fromSeqNo The sequence number from which to start retrieving shard changes. + * @param maxBatchSize The maximum size of a batch of operations to retrieve. + * @param pollTimeout The maximum time to wait for shard changes. + * @param maxOperationsCount The maximum number of operations to retrieve in a single request. + * @return A ShardChangesAction.Request object with the provided parameters. + */ + private static ShardChangesAction.Request shardChangesRequest( + final String indexName, + final String indexUUID, + final ShardId shardId, + final String expectedHistoryUUID, + long fromSeqNo, + final ByteSizeValue maxBatchSize, + final TimeValue pollTimeout, + int maxOperationsCount + ) { + final ShardChangesAction.Request shardChangesRequest = new ShardChangesAction.Request( + new ShardId(new Index(indexName, indexUUID), shardId.id()), + expectedHistoryUUID + ); + shardChangesRequest.setFromSeqNo(fromSeqNo); + shardChangesRequest.setMaxBatchSize(maxBatchSize); + shardChangesRequest.setPollTimeout(pollTimeout); + shardChangesRequest.setMaxOperationCount(maxOperationsCount); + return shardChangesRequest; + } + + /** + * Converts the response to XContent JSOn format. + * + * @param response The ShardChangesAction response. + * @param indexName The name of the index. + * @param shardId The ShardId. + */ + private static XContentBuilder shardChangesResponseToXContent( + final ShardChangesAction.Response response, + final String indexName, + final ShardId shardId + ) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + builder.field("index", indexName); + builder.field("shard_id", shardId); + builder.field("mapping_version", response.getMappingVersion()); + builder.field("settings_version", response.getSettingsVersion()); + builder.field("aliases_version", response.getAliasesVersion()); + builder.field("global_checkpoint", response.getGlobalCheckpoint()); + builder.field("max_seq_no", response.getMaxSeqNo()); + builder.field("max_seq_no_of_updates_or_deletes", response.getMaxSeqNoOfUpdatesOrDeletes()); + builder.field("took_in_millis", response.getTookInMillis()); + if (response.getOperations() != null && response.getOperations().length > 0) { + operationsToXContent(response, builder); + } + builder.endObject(); + + return builder; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Converts the operations from a ShardChangesAction response to XContent JSON format. + * + * @param response The ShardChangesAction response containing the operations to be converted. + * @param builder The XContentBuilder to which the converted operations will be added. + * @throws IOException If an error occurs while writing to the XContentBuilder. + */ + private static void operationsToXContent(final ShardChangesAction.Response response, final XContentBuilder builder) throws IOException { + builder.field("number_of_operations", response.getOperations().length); + builder.field("operations"); + builder.startArray(); + for (final Translog.Operation operation : response.getOperations()) { + builder.startObject(); + builder.field("op_type", operation.opType()); + builder.field("seq_no", operation.seqNo()); + builder.field("primary_term", operation.primaryTerm()); + builder.endObject(); + } + builder.endArray(); + } + + /** + * Execute an asynchronous task using a task supplier and an executor service. + * + * @param The type of data to be retrieved. + * @param task The supplier task that provides the data. + * @param executorService The executorService service for executing the asynchronous task. + * @param errorMessage The error message to be thrown if the task execution fails. + * @return A CompletableFuture that completes with the retrieved data. + */ + private static CompletableFuture supplyAsyncTask( + final Supplier task, + final ExecutorService executorService, + final String errorMessage + ) { + return CompletableFuture.supplyAsync(() -> { + try { + return task.get(); + } catch (Exception e) { + throw new ElasticsearchException(errorMessage, e); + } + }, executorService); + } + + /** + * Asynchronously retrieves the shard stats for a given index using an executor service. + * + * @param client The NodeClient for executing the asynchronous request. + * @param indexName The name of the index for which to retrieve shard statistics. + * @param executorService The executorService service for executing the asynchronous task. + * @return A CompletableFuture that completes with the retrieved ShardStats. + * @throws ElasticsearchException If an error occurs while retrieving shard statistics. + */ + private static CompletableFuture asyncShardStats( + final NodeClient client, + final String indexName, + final ExecutorService executorService + ) { + return supplyAsyncTask( + () -> Arrays.stream(client.admin().indices().prepareStats(indexName).clear().get(SHARD_STATS_TIMEOUT).getShards()) + .max(Comparator.comparingLong(shardStats -> shardStats.getCommitStats().getGeneration())) + .orElseThrow(() -> new ElasticsearchException("Unable to retrieve shard stats for index: " + indexName)), + executorService, + "Error while retrieving shard stats for index [" + indexName + "]" + ); + } + + /** + * Asynchronously retrieves the index UUID for a given index using an executor service. + * + * @param client The NodeClient for executing the asynchronous request. + * @param indexName The name of the index for which to retrieve the index UUID. + * @param executorService The executorService service for executing the asynchronous task. + * @return A CompletableFuture that completes with the retrieved index UUID. + * @throws ElasticsearchException If an error occurs while retrieving the index UUID. + */ + private static CompletableFuture asyncGetIndexUUID( + final NodeClient client, + final String indexName, + final ExecutorService executorService + ) { + return supplyAsyncTask( + () -> client.admin() + .indices() + .prepareGetIndex() + .setIndices(indexName) + .get(GET_INDEX_UUID_TIMEOUT) + .getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID), + executorService, + "Error while retrieving index UUID for index [" + indexName + "]" + ); + } +}