From 8afdda20169618b3aae4023d7205b91eef68c871 Mon Sep 17 00:00:00 2001 From: sukriti sinha Date: Mon, 12 May 2025 11:46:51 +0530 Subject: [PATCH] Created an API to fetch remote store segment data Signed-off-by: sukriti sinha --- CHANGELOG.md | 1 + .../remotestore/RemoteStoreMetadataIT.java | 284 +++++++++++++++++ .../org/opensearch/action/ActionModule.java | 5 + .../metadata/RemoteStoreMetadataAction.java | 25 ++ .../metadata/RemoteStoreMetadataRequest.java | 51 ++++ .../RemoteStoreMetadataRequestBuilder.java | 46 +++ .../metadata/RemoteStoreMetadataResponse.java | 101 +++++++ .../metadata/RemoteStoreShardMetadata.java | 133 ++++++++ .../TransportRemoteStoreMetadataAction.java | 286 ++++++++++++++++++ .../remotestore/metadata/package-info.java | 10 + .../store/RemoteSegmentStoreDirectory.java | 29 ++ .../transfer/BlobStoreTransferService.java | 4 + .../translog/transfer/TransferService.java | 2 + .../transfer/TranslogTransferManager.java | 31 ++ .../RestRemoteStoreMetadataAction.java | 58 ++++ .../transport/client/ClusterAdminClient.java | 7 + .../client/support/AbstractClient.java | 22 ++ ...emoteStoreMetadataRequestBuilderTests.java | 52 ++++ .../RemoteStoreMetadataRequestTests.java | 56 ++++ .../RemoteStoreMetadataResponseTests.java | 262 ++++++++++++++++ .../RemoteStoreMetadataTestHelper.java | 72 +++++ .../metadata/RemoteStoreMetadataTests.java | 134 ++++++++ ...ansportRemoteStoreMetadataActionTests.java | 182 +++++++++++ 23 files changed, 1853 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMetadataIT.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilder.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreShardMetadata.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/package-info.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRemoteStoreMetadataAction.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilderTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponseTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTestHelper.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c45bbe6c2a77e..eba71d739cdf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add NodeResourceUsageStats to ClusterInfo ([#18480](https://github.com/opensearch-project/OpenSearch/issues/18472)) - Introduce SecureHttpTransportParameters experimental API (to complement SecureTransportParameters counterpart) ([#18572](https://github.com/opensearch-project/OpenSearch/issues/18572)) - Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346)) +- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMetadataIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMetadataIT.java new file mode 100644 index 0000000000000..3ff4f044dfda7 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMetadataIT.java @@ -0,0 +1,284 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreShardMetadata; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService.TestPlugin; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasKey; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreMetadataIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-meta-api-test"; + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(TestPlugin.class)).collect(Collectors.toList()); + } + + public void setup() { + internalCluster().startNodes(3); + } + + @SuppressWarnings("unchecked") + public void testMetadataResponseFromAllNodes() { + setup(); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 3)); + ensureGreen(INDEX_NAME); + indexDocs(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); + + ClusterState state = getClusterState(); + List nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList()); + + for (String node : nodes) { + RemoteStoreMetadataResponse response = client(node).admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get(); + assertTrue(response.getSuccessfulShards() > 0); + assertNotNull(response.groupByIndexAndShards()); + + response.groupByIndexAndShards().forEach((index, shardMap) -> { + shardMap.forEach((shardId, metadataList) -> { + assertFalse(metadataList.isEmpty()); + + for (RemoteStoreShardMetadata metadata : metadataList) { + assertEquals(index, metadata.getIndexName()); + assertEquals((int) shardId, metadata.getShardId()); + + assertNotNull(metadata.getLatestSegmentMetadataFileName()); + assertNotNull(metadata.getLatestTranslogMetadataFileName()); + + Map> segmentFiles = metadata.getSegmentMetadataFiles(); + assertNotNull(segmentFiles); + assertFalse(segmentFiles.isEmpty()); + + for (Map fileMeta : segmentFiles.values()) { + Map files = (Map) fileMeta.get("files"); + assertNotNull(files); + assertFalse(files.isEmpty()); + for (Object value : files.values()) { + Map meta = (Map) value; + assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length"))); + } + + Map checkpoint = (Map) fileMeta.get("replication_checkpoint"); + assertNotNull(checkpoint); + assertThat( + checkpoint, + allOf( + hasKey("primary_term"), + hasKey("segments_gen"), + hasKey("segment_infos_version"), + hasKey("codec"), + hasKey("created_timestamp") + ) + ); + } + + Map> translogFiles = metadata.getTranslogMetadataFiles(); + assertNotNull(translogFiles); + assertFalse(translogFiles.isEmpty()); + for (Map translogMeta : translogFiles.values()) { + assertThat( + translogMeta, + allOf( + hasKey("primary_term"), + hasKey("generation"), + hasKey("min_translog_gen"), + hasKey("generation_to_primary_term") + ) + ); + } + } + }); + }); + } + } + + @SuppressWarnings("unchecked") + public void testMetadataResponseAllShards() throws Exception { + setup(); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 2)); + ensureGreen(INDEX_NAME); + indexDocs(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); + + assertBusy(() -> { assertFalse(client().admin().cluster().prepareHealth(INDEX_NAME).get().isTimedOut()); }); + + RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get(); + + response.groupByIndexAndShards().forEach((index, shardMap) -> { + shardMap.forEach((shardId, metadataList) -> { + assertFalse(metadataList.isEmpty()); + + for (RemoteStoreShardMetadata metadata : metadataList) { + assertEquals(index, metadata.getIndexName()); + assertEquals((int) shardId, metadata.getShardId()); + + assertNotNull(metadata.getLatestSegmentMetadataFileName()); + assertNotNull(metadata.getLatestTranslogMetadataFileName()); + + Map> segmentFiles = metadata.getSegmentMetadataFiles(); + assertNotNull(segmentFiles); + assertFalse(segmentFiles.isEmpty()); + + for (Map fileMeta : segmentFiles.values()) { + Map files = (Map) fileMeta.get("files"); + assertNotNull(files); + assertFalse(files.isEmpty()); + for (Object value : files.values()) { + Map meta = (Map) value; + assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length"))); + } + + Map checkpoint = (Map) fileMeta.get("replication_checkpoint"); + assertNotNull(checkpoint); + assertThat( + checkpoint, + allOf( + hasKey("primary_term"), + hasKey("segments_gen"), + hasKey("segment_infos_version"), + hasKey("codec"), + hasKey("created_timestamp") + ) + ); + } + + Map> translogFiles = metadata.getTranslogMetadataFiles(); + assertNotNull(translogFiles); + assertFalse(translogFiles.isEmpty()); + for (Map translogMeta : translogFiles.values()) { + assertThat( + translogMeta, + allOf( + hasKey("primary_term"), + hasKey("generation"), + hasKey("min_translog_gen"), + hasKey("generation_to_primary_term") + ) + ); + } + } + }); + }); + } + + public void testMultipleMetadataFilesPerShard() throws Exception { + setup(); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1)); + ensureGreen(INDEX_NAME); + + int refreshCount = 5; + for (int i = 0; i < refreshCount; i++) { + indexDocs(); + client().admin().indices().prepareRefresh(INDEX_NAME).get(); + Thread.sleep(100); + } + + RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get(); + + response.groupByIndexAndShards().forEach((index, shardMap) -> { + shardMap.forEach((shardId, metadataList) -> { + assertFalse(metadataList.isEmpty()); + + for (RemoteStoreShardMetadata metadata : metadataList) { + assertEquals(refreshCount, metadata.getSegmentMetadataFiles().size()); + assertTrue(metadata.getTranslogMetadataFiles().size() >= 1); + } + }); + }); + } + + public void testMetadataResponseMultipleIndicesAndShards() throws Exception { + setup(); + + String index1 = INDEX_NAME + "-1"; + String index2 = INDEX_NAME + "-2"; + + createIndex(index1, remoteStoreIndexSettings(0, 2)); + createIndex(index2, remoteStoreIndexSettings(0, 3)); + ensureGreen(index1, index2); + + indexDocs(index1); + indexDocs(index2); + + client().admin().indices().prepareRefresh(index1).get(); + client().admin().indices().prepareRefresh(index2).get(); + + RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata("*", null).get(); + + Map>> grouped = response.groupByIndexAndShards(); + + assertTrue(grouped.containsKey(index1)); + assertTrue(grouped.containsKey(index2)); + + grouped.forEach((index, shardMap) -> { + shardMap.forEach((shardId, metadataList) -> { + assertFalse(metadataList.isEmpty()); + metadataList.forEach(metadata -> { + assertEquals(index, metadata.getIndexName()); + assertEquals((int) shardId, metadata.getShardId()); + assertNotNull(metadata.getSegmentMetadataFiles()); + assertFalse(metadata.getSegmentMetadataFiles().isEmpty()); + assertNotNull(metadata.getTranslogMetadataFiles()); + assertFalse(metadata.getTranslogMetadataFiles().isEmpty()); + }); + }); + }); + } + + public void testShardFailureWhenRepositoryMissing() throws Exception { + String indexName = "failure-case-index"; + + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.replication.type", "SEGMENT") + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put("index.remote_store.repository", "non-existent-repo") + .put("index.remote_store.translog.enabled", true) + .put("index.remote_store.translog.repository", "non-existent-repo"); + + Exception exception = expectThrows(Exception.class, () -> assertAcked(prepareCreate(indexName).setSettings(settings))); + + assertTrue(exception.getMessage().toLowerCase(Locale.ROOT).contains("repository")); + } + + private void indexDocs() { + indexDocs(INDEX_NAME); + } + + private void indexDocs(String indexName) { + for (int i = 0; i < randomIntBetween(10, 20); i++) { + client().prepareIndex(indexName).setId("doc-" + i).setSource("field", "value-" + i).get(); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index dafc97855e050..67a86db37e790 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -67,6 +67,8 @@ import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.opensearch.action.admin.cluster.remote.RemoteInfoAction; import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataAction; +import org.opensearch.action.admin.cluster.remotestore.metadata.TransportRemoteStoreMetadataAction; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsAction; @@ -378,6 +380,7 @@ import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; +import org.opensearch.rest.action.admin.cluster.RestRemoteStoreMetadataAction; import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction; import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction; import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction; @@ -638,6 +641,7 @@ public void reg actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class); actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class); + actions.register(RemoteStoreMetadataAction.INSTANCE, TransportRemoteStoreMetadataAction.class); actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class); actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class); @@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestGetDecommissionStateAction()); registerHandler.accept(new RestRemoteStoreStatsAction()); registerHandler.accept(new RestRestoreRemoteStoreAction()); + registerHandler.accept(new RestRemoteStoreMetadataAction()); // pull-based ingestion API registerHandler.accept(new RestPauseIngestionAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataAction.java new file mode 100644 index 0000000000000..9e4b4a65efc28 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.action.ActionType; + +/** + * Action to fetch metadata from remote store + * + * @opensearch.internal + */ +public class RemoteStoreMetadataAction extends ActionType { + public static final RemoteStoreMetadataAction INSTANCE = new RemoteStoreMetadataAction(); + public static final String NAME = "cluster:admin/remote_store/metadata"; + + private RemoteStoreMetadataAction() { + super(NAME, RemoteStoreMetadataResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequest.java new file mode 100644 index 0000000000000..91b48c8af028e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequest.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request object for fetching remote store metadata of shards across one or more indices. + * + * @opensearch.internal + */ +@ExperimentalApi +public class RemoteStoreMetadataRequest extends BroadcastRequest { + private String[] shards; + + public RemoteStoreMetadataRequest() { + super((String[]) null); + shards = new String[0]; + } + + public RemoteStoreMetadataRequest(StreamInput in) throws IOException { + super(in); + shards = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(shards); + } + + public RemoteStoreMetadataRequest shards(String... shards) { + this.shards = shards; + return this; + } + + public String[] shards() { + return this.shards; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilder.java new file mode 100644 index 0000000000000..b3acd74afccb5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilder.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.transport.client.OpenSearchClient; + +/** + * Builder for RemoteStoreMetadataRequest + * + * @opensearch.api + */ +@ExperimentalApi +public class RemoteStoreMetadataRequestBuilder extends BroadcastOperationRequestBuilder< + RemoteStoreMetadataRequest, + RemoteStoreMetadataResponse, + RemoteStoreMetadataRequestBuilder> { + + public RemoteStoreMetadataRequestBuilder(OpenSearchClient client, RemoteStoreMetadataAction action) { + super(client, action, new RemoteStoreMetadataRequest()); + } + + /** + * Sets timeout of request. + */ + public final RemoteStoreMetadataRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * Sets shards preference of request. + */ + public final RemoteStoreMetadataRequestBuilder setShards(String... shards) { + request.shards(shards); + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponse.java new file mode 100644 index 0000000000000..a5b61f241d77d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponse.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.action.support.broadcast.BroadcastResponse; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.support.DefaultShardOperationFailedException; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Response containing remote store metadata + * + * @opensearch.api + */ +@ExperimentalApi +public class RemoteStoreMetadataResponse extends BroadcastResponse { + private final RemoteStoreShardMetadata[] remoteStoreShardMetadata; + + public RemoteStoreMetadataResponse(StreamInput in) throws IOException { + super(in); + remoteStoreShardMetadata = in.readArray(RemoteStoreShardMetadata::new, RemoteStoreShardMetadata[]::new); + } + + public RemoteStoreMetadataResponse( + RemoteStoreShardMetadata[] remoteStoreShardMetadata, + int totalShards, + int successfulShards, + int failedShards, + List shardFailures + ) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.remoteStoreShardMetadata = remoteStoreShardMetadata; + } + + /** + * Groups metadata by index and shard IDs. + * + * @return Map of index name to shard ID to list of metadata + */ + public Map>> groupByIndexAndShards() { + Map>> indexWiseMetadata = new HashMap<>(); + for (RemoteStoreShardMetadata metadata : remoteStoreShardMetadata) { + indexWiseMetadata.computeIfAbsent(metadata.getIndexName(), k -> new HashMap<>()) + .computeIfAbsent(metadata.getShardId(), k -> new ArrayList<>()) + .add(metadata); + } + return indexWiseMetadata; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeArray(remoteStoreShardMetadata); + } + + @Override + protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { + Map>> indexWiseMetadata = groupByIndexAndShards(); + builder.startObject(Fields.INDICES); + for (String indexName : indexWiseMetadata.keySet()) { + builder.startObject(indexName); + builder.startObject(Fields.SHARDS); + for (int shardId : indexWiseMetadata.get(indexName).keySet()) { + builder.startArray(Integer.toString(shardId)); + for (RemoteStoreShardMetadata metadata : indexWiseMetadata.get(indexName).get(shardId)) { + metadata.toXContent(builder, params); + } + builder.endArray(); + } + builder.endObject(); + builder.endObject(); + } + builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this, true, false); + } + + static final class Fields { + static final String SHARDS = "shards"; + static final String INDICES = "indices"; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreShardMetadata.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreShardMetadata.java new file mode 100644 index 0000000000000..72348d205c1af --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreShardMetadata.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; + +/** + * Response model that holds the remote store metadata (segment and translog) for a shard. + * + * @opensearch.internal + */ +@ExperimentalApi +public class RemoteStoreShardMetadata implements Writeable, ToXContentFragment { + + private final String indexName; + private final int shardId; + private final Map> segmentMetadataFiles; + private final Map> translogMetadataFiles; + private final String latestSegmentMetadataFileName; + private final String latestTranslogMetadataFileName; + + public RemoteStoreShardMetadata( + String indexName, + int shardId, + Map> segmentMetadataFiles, + Map> translogMetadataFiles, + String latestSegmentMetadataFileName, + String latestTranslogMetadataFileName + ) { + this.indexName = indexName; + this.shardId = shardId; + this.segmentMetadataFiles = segmentMetadataFiles; + this.translogMetadataFiles = translogMetadataFiles; + this.latestSegmentMetadataFileName = latestSegmentMetadataFileName; + this.latestTranslogMetadataFileName = latestTranslogMetadataFileName; + } + + @SuppressWarnings("unchecked") + public RemoteStoreShardMetadata(StreamInput in) throws IOException { + this.indexName = in.readString(); + this.shardId = in.readInt(); + this.segmentMetadataFiles = (Map>) in.readGenericValue(); + this.translogMetadataFiles = (Map>) in.readGenericValue(); + this.latestSegmentMetadataFileName = in.readOptionalString(); + this.latestTranslogMetadataFileName = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeInt(shardId); + out.writeGenericValue(segmentMetadataFiles); + out.writeGenericValue(translogMetadataFiles); + out.writeOptionalString(latestSegmentMetadataFileName); + out.writeOptionalString(latestTranslogMetadataFileName); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + builder.field("index", indexName); + builder.field("shard", shardId); + + if (latestSegmentMetadataFileName != null) { + builder.field("latest_segment_metadata_filename", latestSegmentMetadataFileName); + } + + if (latestTranslogMetadataFileName != null) { + builder.field("latest_translog_metadata_filename", latestTranslogMetadataFileName); + } + + builder.startObject("available_segment_metadata_files"); + for (Map.Entry> entry : segmentMetadataFiles.entrySet()) { + builder.startObject(entry.getKey()); + for (Map.Entry inner : entry.getValue().entrySet()) { + builder.field(inner.getKey(), inner.getValue()); + } + builder.endObject(); + } + builder.endObject(); + + builder.startObject("available_translog_metadata_files"); + for (Map.Entry> entry : translogMetadataFiles.entrySet()) { + builder.startObject(entry.getKey()); + for (Map.Entry inner : entry.getValue().entrySet()) { + builder.field(inner.getKey(), inner.getValue()); + } + builder.endObject(); + } + builder.endObject(); + + return builder.endObject(); + } + + public String getIndexName() { + return indexName; + } + + public int getShardId() { + return shardId; + } + + public Map> getSegmentMetadataFiles() { + return segmentMetadataFiles; + } + + public Map> getTranslogMetadataFiles() { + return translogMetadataFiles; + } + + public String getLatestSegmentMetadataFileName() { + return latestSegmentMetadataFileName; + } + + public String getLatestTranslogMetadataFileName() { + return latestTranslogMetadataFileName; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataAction.java new file mode 100644 index 0000000000000..16c29d7586a98 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataAction.java @@ -0,0 +1,286 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.support.DefaultShardOperationFailedException; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.index.translog.RemoteFsTranslog; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Transport action responsible for collecting segment and translog metadata + * from all shards of a given index. + * + * @opensearch.internal + */ +public class TransportRemoteStoreMetadataAction extends TransportAction { + + private static final Logger logger = LogManager.getLogger(TransportRemoteStoreMetadataAction.class); + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final RepositoriesService repositoriesService; + private final ThreadPool threadPool; + private final RemoteStoreSettings remoteStoreSettings; + + @Inject + public TransportRemoteStoreMetadataAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + RepositoriesService repositoriesService, + ThreadPool threadPool, + RemoteStoreSettings remoteStoreSettings + ) { + super(RemoteStoreMetadataAction.NAME, actionFilters, transportService.getTaskManager()); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.repositoriesService = repositoriesService; + this.threadPool = threadPool; + this.remoteStoreSettings = remoteStoreSettings; + } + + @Override + protected void doExecute(Task task, RemoteStoreMetadataRequest request, ActionListener listener) { + try { + ClusterState state = clusterService.state(); + state.blocks().globalBlockedRaiseException(ClusterBlockLevel.METADATA_READ); + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request); + + if (concreteIndices.length == 0) { + listener.onResponse(new RemoteStoreMetadataResponse(new RemoteStoreShardMetadata[0], 0, 0, 0, Collections.emptyList())); + return; + } + + List responses = new ArrayList<>(); + List shardFailures = new ArrayList<>(); + int totalShards = 0, successfulShards = 0, failedShards = 0; + + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool, + remoteStoreSettings.getSegmentsPathFixedPrefix() + ); + + for (String indexName : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().index(indexName); + Index index = indexMetadata.getIndex(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, clusterService.getSettings()); + + int[] shardIds = request.shards().length == 0 + ? java.util.stream.IntStream.range(0, indexMetadata.getNumberOfShards()).toArray() + : Arrays.stream(request.shards()).mapToInt(Integer::parseInt).toArray(); + + for (int shardId : shardIds) { + totalShards++; + ShardId sid = new ShardId(index, shardId); + + if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) { + failedShards++; + shardFailures.add( + new DefaultShardOperationFailedException( + indexName, + shardId, + new IllegalStateException("Remote store not enabled for index") + ) + ); + continue; + } + + try { + Map> segmentMetadataFiles = getSegmentMetadata( + remoteDirectoryFactory, + indexMetadata, + index, + sid, + indexSettings + ); + + String latestSegmentMetadataFilename = segmentMetadataFiles.isEmpty() + ? null + : new ArrayList<>(segmentMetadataFiles.keySet()).get(0); + + Map> translogMetadataFiles = getTranslogMetadataFiles( + indexMetadata, + sid, + indexSettings + ); + + String latestTranslogMetadataFilename = translogMetadataFiles.isEmpty() + ? null + : new ArrayList<>(translogMetadataFiles.keySet()).get(0); + + responses.add( + new RemoteStoreShardMetadata( + indexName, + shardId, + segmentMetadataFiles, + translogMetadataFiles, + latestSegmentMetadataFilename, + latestTranslogMetadataFilename + ) + ); + successfulShards++; + } catch (Exception e) { + failedShards++; + shardFailures.add(new DefaultShardOperationFailedException(indexName, shardId, e)); + logger.error("Failed to fetch metadata for shard [" + shardId + "]", e); + } + } + } + + listener.onResponse( + new RemoteStoreMetadataResponse( + responses.toArray(new RemoteStoreShardMetadata[0]), + totalShards, + successfulShards, + failedShards, + shardFailures + ) + ); + + } catch (Exception e) { + logger.error("Failed to execute remote store metadata action", e); + listener.onFailure(e); + } + } + + private Map> getSegmentMetadata( + RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory, + IndexMetadata indexMetadata, + Index index, + ShardId shardId, + IndexSettings indexSettings + ) throws IOException { + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( + IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.get(indexMetadata.getSettings()), + index.getUUID(), + shardId, + indexSettings.getRemoteStorePathStrategy() + ); + + Map segmentMetadataMapWithFilenames = remoteDirectory.readLatestNMetadataFiles(5); + Map> metadataFilesMap = new LinkedHashMap<>(); + + for (Map.Entry entry : segmentMetadataMapWithFilenames.entrySet()) { + String fileName = entry.getKey(); + RemoteSegmentMetadata segmentMetadata = entry.getValue(); + + Map segmentMetadataMap = new HashMap<>(); + Map filesMap = new HashMap<>(); + segmentMetadata.getMetadata().forEach((file, meta) -> { + Map metaMap = new HashMap<>(); + metaMap.put("original_name", meta.getOriginalFilename()); + metaMap.put("checksum", meta.getChecksum()); + metaMap.put("length", meta.getLength()); + filesMap.put(file, metaMap); + }); + segmentMetadataMap.put("files", filesMap); + + ReplicationCheckpoint checkpoint = segmentMetadata.getReplicationCheckpoint(); + if (checkpoint != null) { + Map checkpointMap = new HashMap<>(); + checkpointMap.put("primary_term", checkpoint.getPrimaryTerm()); + checkpointMap.put("segments_gen", checkpoint.getSegmentsGen()); + checkpointMap.put("segment_infos_version", checkpoint.getSegmentInfosVersion()); + checkpointMap.put("length", checkpoint.getLength()); + checkpointMap.put("codec", checkpoint.getCodec()); + checkpointMap.put("created_timestamp", checkpoint.getCreatedTimeStamp()); + segmentMetadataMap.put("replication_checkpoint", checkpointMap); + } + metadataFilesMap.put(fileName, segmentMetadataMap); + } + return metadataFilesMap; + } + + private Map> getTranslogMetadataFiles( + IndexMetadata indexMetadata, + ShardId shardId, + IndexSettings indexSettings + ) throws IOException { + String repository = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get(indexMetadata.getSettings()); + if (repository == null) { + return Collections.emptyMap(); + } + + RemoteTranslogTransferTracker tracker = new RemoteTranslogTransferTracker(shardId, 1000); + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, tracker); + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repositoriesService.repository(repository); + + TranslogTransferManager manager = RemoteFsTranslog.buildTranslogTransferManager( + blobStoreRepository, + threadPool, + shardId, + fileTransferTracker, + tracker, + indexSettings.getRemoteStorePathStrategy(), + new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings()), + RemoteStoreUtils.determineTranslogMetadataEnabled(indexMetadata) + ); + + Map metadataMap = manager.readLatestNMetadataFiles(5); + Map> translogFilesMap = new LinkedHashMap<>(); + + for (Map.Entry entry : metadataMap.entrySet()) { + String fileName = entry.getKey(); + TranslogTransferMetadata metadata = entry.getValue(); + + Map fileMap = new HashMap<>(); + fileMap.put("primary_term", metadata.getPrimaryTerm()); + fileMap.put("generation", metadata.getGeneration()); + fileMap.put("min_translog_gen", metadata.getMinTranslogGeneration()); + + Map genToTerm = metadata.getGenerationToPrimaryTermMapper(); + if (genToTerm == null || genToTerm.isEmpty()) { + genToTerm = Map.of(String.valueOf(metadata.getGeneration()), String.valueOf(metadata.getPrimaryTerm())); + } + fileMap.put("generation_to_primary_term", genToTerm); + + translogFilesMap.put(fileName, fileMap); + } + + return translogFilesMap; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/package-info.java new file mode 100644 index 0000000000000..60626ec51add7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/metadata/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Remote store metadata API for fetching segment and translog metadata. */ +package org.opensearch.action.admin.cluster.remotestore.metadata; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c18902b69d23c..1fd2e15ee50ec 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -259,6 +260,34 @@ private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws I } } + /** + * Reads the latest N segment metadata files from remote store along with filenames. + * + * @param count Number of recent metadata files to read (sorted by lexicographic order). + * @return Map from filename to parsed RemoteSegmentMetadata + * @throws IOException if reading any metadata file fails + */ + public Map readLatestNMetadataFiles(int count) throws IOException { + Map metadataMap = new LinkedHashMap<>(); + + List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.METADATA_PREFIX, + count + ); + + for (String file : metadataFiles) { + try (InputStream inputStream = remoteMetadataDirectory.getBlobStream(file)) { + byte[] bytes = inputStream.readAllBytes(); + RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(new ByteArrayIndexInput(file, bytes)); + metadataMap.put(file, metadata); + } catch (Exception e) { + logger.error("Failed to parse segment metadata file", e); + } + } + + return metadataMap; + } + /** * Metadata of a segment that is uploaded to remote segment store. * diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 22bb4cf0514bf..a0ac50709bf7e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -324,6 +324,10 @@ public void listAllInSortedOrder(Iterable path, String filenamePrefix, i blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder(filenamePrefix, limit, LEXICOGRAPHIC, listener); } + public List listAllInSortedOrder(Iterable path, String filenamePrefix, int limit) throws IOException { + return blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder(filenamePrefix, limit, LEXICOGRAPHIC); + } + public void listAllInSortedOrderAsync( String threadpoolName, Iterable path, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 1182c626fb0e9..2ab4df3429bb0 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -156,6 +156,8 @@ void uploadBlob( void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); + List listAllInSortedOrder(Iterable path, String filenamePrefix, int limit) throws IOException; + void listAllInSortedOrderAsync( String threadpoolName, Iterable path, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index d410f473c71f1..389d98adcc4eb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -29,6 +29,8 @@ import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogReader; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.threadpool.ThreadPool; @@ -41,6 +43,7 @@ import java.util.Base64; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -110,6 +113,34 @@ public ShardId getShardId() { return this.shardId; } + /** + * Reads the latest N translog metadata files from remote store using filename parsing. + * + * @param count Number of metadata files to read + * @return Map of filename to parsed TranslogTransferMetadata + * @throws IOException if the fetch or parsing fails + */ + public Map readLatestNMetadataFiles(int count) throws IOException { + List metadataFiles = transferService.listAllInSortedOrder( + remoteMetadataTransferPath, + TranslogTransferMetadata.METADATA_PREFIX, + count + ); + + Map result = new LinkedHashMap<>(); + for (BlobMetadata metadata : metadataFiles) { + String fileName = metadata.name(); + try { + TranslogTransferMetadata meta = readMetadata(fileName); + result.put(fileName, meta); + } catch (Exception e) { + logger.error("Failed to read translog metadata file ", e); + } + } + + return result; + } + public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRemoteStoreMetadataAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRemoteStoreMetadataAction.java new file mode 100644 index 0000000000000..342cc8ca93ee2 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRemoteStoreMetadataAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataRequest; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.transport.client.node.NodeClient; + +import java.io.IOException; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for fetching remote store metadata + * + * @opensearch.internal + */ +public class RestRemoteStoreMetadataAction extends BaseRestHandler { + + @Override + public List routes() { + return unmodifiableList( + asList(new Route(GET, "/_remotestore/metadata/{index}"), new Route(GET, "/_remotestore/metadata/{index}/{shard_id}")) + ); + } + + @Override + public String getName() { + return "remote_store_metadata"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String index = request.param("index"); + String shardId = request.param("shard_id"); + + RemoteStoreMetadataRequest metadataRequest = new RemoteStoreMetadataRequest(); + if (index != null) { + metadataRequest.indices(index); + } + if (shardId != null) { + metadataRequest.shards(shardId); + } + + return channel -> client.admin().cluster().remoteStoreMetadata(metadataRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/transport/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/transport/client/ClusterAdminClient.java index 4fc9b6ce83587..1bbfe9974d732 100644 --- a/server/src/main/java/org/opensearch/transport/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/transport/client/ClusterAdminClient.java @@ -69,6 +69,9 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataRequest; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataRequestBuilder; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequest; @@ -333,6 +336,10 @@ public interface ClusterAdminClient extends OpenSearchClient { RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId); + void remoteStoreMetadata(RemoteStoreMetadataRequest request, ActionListener listener); + + RemoteStoreMetadataRequestBuilder prepareRemoteStoreMetadata(String index, String shardId); + /** * Returns top N hot-threads samples per node. The hot-threads are only * sampled for the node ids specified in the request. Nodes usage of the diff --git a/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java index 9c408a82402b5..bfd64ebb571a3 100644 --- a/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java @@ -86,6 +86,10 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataAction; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataRequest; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataRequestBuilder; +import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; @@ -953,6 +957,24 @@ public RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, Stri return remoteStoreStatsRequestBuilder; } + @Override + public void remoteStoreMetadata( + final RemoteStoreMetadataRequest request, + final ActionListener listener + ) { + execute(RemoteStoreMetadataAction.INSTANCE, request, listener); + } + + @Override + public RemoteStoreMetadataRequestBuilder prepareRemoteStoreMetadata(String index, String shardId) { + RemoteStoreMetadataRequestBuilder builder = new RemoteStoreMetadataRequestBuilder(this, RemoteStoreMetadataAction.INSTANCE) + .setIndices(index); + if (shardId != null) { + builder.setShards(shardId); + } + return builder; + } + @Override public ActionFuture nodesUsage(final NodesUsageRequest request) { return execute(NodesUsageAction.INSTANCE, request); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilderTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilderTests.java new file mode 100644 index 0000000000000..6fd267fafc4ad --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestBuilderTests.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.client.OpenSearchClient; + +import static org.mockito.Mockito.mock; + +public class RemoteStoreMetadataRequestBuilderTests extends OpenSearchTestCase { + + public void testSetTimeout() { + OpenSearchClient mockClient = mock(OpenSearchClient.class); + RemoteStoreMetadataRequestBuilder builder = new RemoteStoreMetadataRequestBuilder(mockClient, RemoteStoreMetadataAction.INSTANCE); + + TimeValue timeout = TimeValue.timeValueSeconds(15); + builder.setTimeout(timeout); + + assertEquals(timeout, builder.request().timeout()); + } + + public void testSetShards() { + OpenSearchClient mockClient = mock(OpenSearchClient.class); + RemoteStoreMetadataRequestBuilder builder = new RemoteStoreMetadataRequestBuilder(mockClient, RemoteStoreMetadataAction.INSTANCE); + + String[] shards = new String[] { "0", "1" }; + builder.setShards(shards); + + assertArrayEquals(shards, builder.request().shards()); + } + + public void testChaining() { + OpenSearchClient mockClient = mock(OpenSearchClient.class); + RemoteStoreMetadataRequestBuilder builder = new RemoteStoreMetadataRequestBuilder(mockClient, RemoteStoreMetadataAction.INSTANCE); + + TimeValue timeout = TimeValue.timeValueSeconds(10); + String[] shards = new String[] { "0", "2" }; + + RemoteStoreMetadataRequestBuilder returned = builder.setTimeout(timeout).setShards(shards); + + assertSame(builder, returned); + assertEquals(timeout, returned.request().timeout()); + assertArrayEquals(shards, returned.request().shards()); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestTests.java new file mode 100644 index 0000000000000..2037b82947b78 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataRequestTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.hamcrest.MatcherAssert; + +import static org.hamcrest.Matchers.equalTo; + +public class RemoteStoreMetadataRequestTests extends OpenSearchTestCase { + + public void testAddIndexName() throws Exception { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices("test-index"); + RemoteStoreMetadataRequest deserializedRequest = roundTripRequest(request); + assertRequestsEqual(request, deserializedRequest); + } + + public void testAddShardId() throws Exception { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices("test-index"); + request.shards("0"); + RemoteStoreMetadataRequest deserializedRequest = roundTripRequest(request); + assertRequestsEqual(request, deserializedRequest); + } + + public void testAddMultipleShards() throws Exception { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices("test-index"); + request.shards("0", "1", "2"); + RemoteStoreMetadataRequest deserializedRequest = roundTripRequest(request); + assertRequestsEqual(request, deserializedRequest); + } + + private static RemoteStoreMetadataRequest roundTripRequest(RemoteStoreMetadataRequest request) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new RemoteStoreMetadataRequest(in); + } + } + } + + private static void assertRequestsEqual(RemoteStoreMetadataRequest request1, RemoteStoreMetadataRequest request2) { + MatcherAssert.assertThat(request1.indices(), equalTo(request2.indices())); + MatcherAssert.assertThat(request1.shards(), equalTo(request2.shards())); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponseTests.java new file mode 100644 index 0000000000000..4561d1007f078 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataResponseTests.java @@ -0,0 +1,262 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.action.support.DefaultShardOperationFailedException; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataTestHelper.createTestMetadata; +import static org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataTestHelper.createTestSegmentMetadata; +import static org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataTestHelper.createTestTranslogMetadata; +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +public class RemoteStoreMetadataResponseTests extends OpenSearchTestCase { + + private static final String FIELD_INDICES = "indices"; + private static final String FIELD_SHARDS = "_shards"; + private static final String FIELD_TOTAL = "total"; + private static final String FIELD_SUCCESSFUL = "successful"; + private static final String FIELD_FAILED = "failed"; + + public void testSerializationForSingleShard() throws Exception { + RemoteStoreShardMetadata metadata = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "test-index", 0); + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse( + new RemoteStoreShardMetadata[] { metadata }, + 1, + 1, + 0, + List.of() + ); + + assertSerializationRoundTrip(response); + assertXContentResponse(response, "test-index", 1); + } + + public void testSerializationForMultipleShards() throws Exception { + RemoteStoreShardMetadata metadata1 = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "test-index", 0); + RemoteStoreShardMetadata metadata2 = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "test-index", 1); + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse( + new RemoteStoreShardMetadata[] { metadata1, metadata2 }, + 2, + 2, + 0, + List.of() + ); + + assertSerializationRoundTrip(response); + assertXContentResponse(response, "test-index", 2); + } + + public void testSerializationForMultipleIndices() throws Exception { + RemoteStoreShardMetadata metadata1 = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "index1", 0); + RemoteStoreShardMetadata metadata2 = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "index2", 0); + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse( + new RemoteStoreShardMetadata[] { metadata1, metadata2 }, + 2, + 2, + 0, + List.of() + ); + + assertSerializationRoundTrip(response); + assertIndicesInResponse(response, "index1", "index2"); + } + + public void testFailures() throws Exception { + List failures = new ArrayList<>(); + failures.add(new DefaultShardOperationFailedException("index1", 1, new Exception("test failure"))); + failures.add(new DefaultShardOperationFailedException("index2", 0, new Exception("another failure"))); + + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse(new RemoteStoreShardMetadata[0], 3, 1, 2, failures); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput in = out.bytes().streamInput(); + RemoteStoreMetadataResponse deserializedResponse = new RemoteStoreMetadataResponse(in); + + assertEquals("Total shards mismatch", response.getTotalShards(), deserializedResponse.getTotalShards()); + assertEquals("Successful shards mismatch", response.getSuccessfulShards(), deserializedResponse.getSuccessfulShards()); + assertEquals("Failed shards mismatch", response.getFailedShards(), deserializedResponse.getFailedShards()); + assertEquals("Failures count mismatch", response.getShardFailures().length, deserializedResponse.getShardFailures().length); + + for (int i = 0; i < failures.size(); i++) { + DefaultShardOperationFailedException expected = failures.get(i); + DefaultShardOperationFailedException actual = deserializedResponse.getShardFailures()[i]; + assertEquals("Index mismatch", expected.index(), actual.index()); + assertEquals("Shard ID mismatch", expected.shardId(), actual.shardId()); + assertTrue("Failure reason mismatch", actual.reason().contains(expected.getCause().getMessage())); + } + } + + public void testEmptyResponse() throws Exception { + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse(new RemoteStoreShardMetadata[0], 0, 0, 0, List.of()); + + assertSerializationRoundTrip(response); + Map responseMap = convertToMap(response); + validateEmptyResponse(responseMap); + } + + private RemoteStoreMetadataResponse assertSerializationRoundTrip(RemoteStoreMetadataResponse response) throws Exception { + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput in = out.bytes().streamInput(); + RemoteStoreMetadataResponse deserializedResponse = new RemoteStoreMetadataResponse(in); + + assertResponseEquals(response, deserializedResponse); + return deserializedResponse; + } + + private void assertXContentResponse(RemoteStoreMetadataResponse response, String indexName, int expectedShards) throws Exception { + Map responseMap = convertToMap(response); + validateResponseContent(responseMap, indexName, expectedShards); + } + + @SuppressWarnings("unchecked") + private void assertIndicesInResponse(RemoteStoreMetadataResponse response, String... expectedIndices) throws Exception { + Map responseMap = convertToMap(response); + Map indices = (Map) responseMap.get(FIELD_INDICES); + + for (String index : expectedIndices) { + assertTrue("Missing index: " + index, indices.containsKey(index)); + } + validateIndicesContent(indices); + } + + private Map convertToMap(RemoteStoreMetadataResponse response) throws Exception { + XContentBuilder builder = XContentFactory.jsonBuilder(); + response.toXContent(builder, EMPTY_PARAMS); + return XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + } + + private void assertResponseEquals(RemoteStoreMetadataResponse expected, RemoteStoreMetadataResponse actual) { + assertEquals("Total shards mismatch", expected.getTotalShards(), actual.getTotalShards()); + assertEquals("Successful shards mismatch", expected.getSuccessfulShards(), actual.getSuccessfulShards()); + assertEquals("Failed shards mismatch", expected.getFailedShards(), actual.getFailedShards()); + assertEquals("Failures count mismatch", expected.getShardFailures().length, actual.getShardFailures().length); + + Map>> expectedGrouped = expected.groupByIndexAndShards(); + Map>> actualGrouped = actual.groupByIndexAndShards(); + + assertEquals("Index set mismatch", expectedGrouped.keySet(), actualGrouped.keySet()); + + for (String index : expectedGrouped.keySet()) { + Map> expectedShards = expectedGrouped.get(index); + Map> actualShards = actualGrouped.get(index); + assertEquals("Shard set mismatch for index " + index, expectedShards.keySet(), actualShards.keySet()); + + for (Integer shardId : expectedShards.keySet()) { + assertMetadataListEquals(expectedShards.get(shardId), actualShards.get(shardId)); + } + } + } + + private void assertMetadataListEquals(List expected, List actual) { + assertEquals("Metadata list size mismatch", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + RemoteStoreShardMetadata expectedMeta = expected.get(i); + RemoteStoreShardMetadata actualMeta = actual.get(i); + assertEquals("Index name mismatch", expectedMeta.getIndexName(), actualMeta.getIndexName()); + assertEquals("Shard ID mismatch", expectedMeta.getShardId(), actualMeta.getShardId()); + assertEquals( + "Latest segment filename mismatch", + expectedMeta.getLatestSegmentMetadataFileName(), + actualMeta.getLatestSegmentMetadataFileName() + ); + assertEquals( + "Latest translog filename mismatch", + expectedMeta.getLatestTranslogMetadataFileName(), + actualMeta.getLatestTranslogMetadataFileName() + ); + } + } + + @SuppressWarnings("unchecked") + private void validateResponseContent(Map responseMap, String indexName, int expectedShards) { + Map shardsObject = (Map) responseMap.get(FIELD_SHARDS); + assertEquals("Total shards mismatch", expectedShards, shardsObject.get(FIELD_TOTAL)); + assertEquals("Successful shards mismatch", expectedShards, shardsObject.get(FIELD_SUCCESSFUL)); + assertEquals("Failed shards mismatch", 0, shardsObject.get(FIELD_FAILED)); + + Map indicesObject = (Map) responseMap.get(FIELD_INDICES); + assertTrue("Missing index: " + indexName, indicesObject.containsKey(indexName)); + + validateIndicesContent(indicesObject); + } + + @SuppressWarnings("unchecked") + private void validateIndicesContent(Map indicesObject) { + for (Object indexData : indicesObject.values()) { + Map indexMap = (Map) indexData; + assertTrue("Missing shards section", indexMap.containsKey("shards")); + + Map shardsMap = (Map) indexMap.get("shards"); + for (Object shardData : shardsMap.values()) { + List> shardList = (List>) shardData; + for (Map shard : shardList) { + validateShardContent(shard); + } + } + } + } + + @SuppressWarnings("unchecked") + private void validateShardContent(Map shard) { + assertTrue("Missing index field", shard.containsKey("index")); + assertTrue("Missing shard field", shard.containsKey("shard")); + assertTrue("Missing latest segment metadata filename", shard.containsKey("latest_segment_metadata_filename")); + assertTrue("Missing latest translog metadata filename", shard.containsKey("latest_translog_metadata_filename")); + assertTrue("Missing segment metadata files", shard.containsKey("available_segment_metadata_files")); + assertTrue("Missing translog metadata files", shard.containsKey("available_translog_metadata_files")); + + Map segmentFiles = (Map) shard.get("available_segment_metadata_files"); + Map translogFiles = (Map) shard.get("available_translog_metadata_files"); + + assertNotNull("Segment files should not be null", segmentFiles); + assertFalse("Segment files should not be empty", segmentFiles.isEmpty()); + assertNotNull("Translog files should not be null", translogFiles); + assertFalse("Translog files should not be empty", translogFiles.isEmpty()); + } + + @SuppressWarnings("unchecked") + private void validateEmptyResponse(Map responseMap) { + Map shardsObject = (Map) responseMap.get(FIELD_SHARDS); + assertEquals("Total shards should be 0", 0, shardsObject.get(FIELD_TOTAL)); + assertEquals("Successful shards should be 0", 0, shardsObject.get(FIELD_SUCCESSFUL)); + assertEquals("Failed shards should be 0", 0, shardsObject.get(FIELD_FAILED)); + + Map indicesObject = (Map) responseMap.get(FIELD_INDICES); + assertTrue("Indices should be empty", indicesObject.isEmpty()); + } + + public void testToStringMethod() throws Exception { + RemoteStoreShardMetadata metadata = createTestMetadata(createTestSegmentMetadata(), createTestTranslogMetadata(), "test-index", 0); + RemoteStoreMetadataResponse response = new RemoteStoreMetadataResponse( + new RemoteStoreShardMetadata[] { metadata }, + 1, + 1, + 0, + List.of() + ); + + String json = response.toString(); + assertNotNull("JSON string should not be null", json); + assertTrue("JSON string should contain index name", json.contains("test-index")); + assertTrue("JSON string should contain shard id", json.contains("\"0\"")); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTestHelper.java new file mode 100644 index 0000000000000..7ba976000a7ca --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTestHelper.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * Helper class for unit testing RemoteStoreMetadata and RemoteStoreMetadataResponse. + */ +public class RemoteStoreMetadataTestHelper { + + @SuppressWarnings("unchecked") + public static RemoteStoreShardMetadata createTestMetadata( + Map segmentMetadata, + Map translogMetadata, + String indexName, + int shardId + ) { + Map> segmentFiles = (Map>) (Map) segmentMetadata; + Map> translogFiles = (Map>) (Map) translogMetadata; + + String latestSegmentFile = segmentFiles.isEmpty() ? null : segmentFiles.keySet().iterator().next(); + String latestTranslogFile = translogFiles.isEmpty() ? null : translogFiles.keySet().iterator().next(); + + return new RemoteStoreShardMetadata(indexName, shardId, segmentFiles, translogFiles, latestSegmentFile, latestTranslogFile); + } + + public static Map createTestSegmentMetadata() { + Map uploadedSegment = new HashMap<>(); + uploadedSegment.put("original_name", "segment_1"); + uploadedSegment.put("checksum", "abc123"); + uploadedSegment.put("length", 1024L); + + Map uploadedSegments = new HashMap<>(); + uploadedSegments.put("seg_1", uploadedSegment); + + Map replicationCheckpoint = new HashMap<>(); + replicationCheckpoint.put("shard_id", "index[0]"); + replicationCheckpoint.put("primary_term", 1L); + replicationCheckpoint.put("generation", 1L); + replicationCheckpoint.put("version", 1L); + replicationCheckpoint.put("length", 12345L); + replicationCheckpoint.put("codec", "Lucene80"); + replicationCheckpoint.put("created_timestamp", System.currentTimeMillis()); + + Map metadata = new HashMap<>(); + metadata.put("files", uploadedSegments); + metadata.put("replication_checkpoint", replicationCheckpoint); + + return Map.of("metadata__segment1", metadata); + } + + public static Map createTestTranslogMetadata() { + Map genToTermMap = new HashMap<>(); + genToTermMap.put("1", "1"); + + Map metadata = new HashMap<>(); + metadata.put("primary_term", 1L); + metadata.put("generation", 1L); + metadata.put("min_translog_gen", 1L); + metadata.put("generation_to_primary_term", genToTermMap); + + return Map.of("metadata__translog1", metadata); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTests.java new file mode 100644 index 0000000000000..bd440363497cc --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/RemoteStoreMetadataTests.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Map; + +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +public class RemoteStoreMetadataTests extends OpenSearchTestCase { + + public void testSerialization() throws Exception { + Map segmentMetadata = RemoteStoreMetadataTestHelper.createTestSegmentMetadata(); + Map translogMetadata = RemoteStoreMetadataTestHelper.createTestTranslogMetadata(); + RemoteStoreShardMetadata metadata = RemoteStoreMetadataTestHelper.createTestMetadata( + segmentMetadata, + translogMetadata, + "test-index", + 0 + ); + + BytesStreamOutput out = new BytesStreamOutput(); + metadata.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + RemoteStoreShardMetadata deserializedMetadata = new RemoteStoreShardMetadata(in); + + assertEquals(metadata.getIndexName(), deserializedMetadata.getIndexName()); + assertEquals(metadata.getShardId(), deserializedMetadata.getShardId()); + assertEquals(metadata.getLatestSegmentMetadataFileName(), deserializedMetadata.getLatestSegmentMetadataFileName()); + assertEquals(metadata.getLatestTranslogMetadataFileName(), deserializedMetadata.getLatestTranslogMetadataFileName()); + assertNestedMapEquals(metadata.getSegmentMetadataFiles(), deserializedMetadata.getSegmentMetadataFiles()); + assertNestedMapEquals(metadata.getTranslogMetadataFiles(), deserializedMetadata.getTranslogMetadataFiles()); + } + + @SuppressWarnings("unchecked") + public void testXContent() throws Exception { + Map segmentMetadata = RemoteStoreMetadataTestHelper.createTestSegmentMetadata(); + Map translogMetadata = RemoteStoreMetadataTestHelper.createTestTranslogMetadata(); + RemoteStoreShardMetadata metadata = RemoteStoreMetadataTestHelper.createTestMetadata( + segmentMetadata, + translogMetadata, + "test-index", + 0 + ); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + metadata.toXContent(builder, EMPTY_PARAMS); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + + assertEquals("test-index", xContentMap.get("index")); + assertEquals(0, xContentMap.get("shard")); + assertEquals(metadata.getLatestSegmentMetadataFileName(), xContentMap.get("latest_segment_metadata_filename")); + assertEquals(metadata.getLatestTranslogMetadataFileName(), xContentMap.get("latest_translog_metadata_filename")); + + assertNotNull(xContentMap.get("available_segment_metadata_files")); + assertNotNull(xContentMap.get("available_translog_metadata_files")); + + Map segmentFiles = (Map) xContentMap.get("available_segment_metadata_files"); + String segmentKey = metadata.getLatestSegmentMetadataFileName(); + assertTrue(segmentFiles.containsKey(segmentKey)); + + Map segmentContent = (Map) segmentFiles.get(segmentKey); + assertTrue(segmentContent.containsKey("files")); + assertTrue(segmentContent.containsKey("replication_checkpoint")); + + Map translogFiles = (Map) xContentMap.get("available_translog_metadata_files"); + String translogKey = metadata.getLatestTranslogMetadataFileName(); + assertTrue(translogFiles.containsKey(translogKey)); + + Map translogContent = (Map) translogFiles.get(translogKey); + assertTrue(translogContent.containsKey("generation")); + assertTrue(translogContent.containsKey("primary_term")); + assertTrue(translogContent.containsKey("min_translog_gen")); + assertTrue(translogContent.containsKey("generation_to_primary_term")); + } + + public void testEmptyMetadata() throws Exception { + RemoteStoreShardMetadata metadata = RemoteStoreMetadataTestHelper.createTestMetadata(Map.of(), Map.of(), "test-index", 0); + + BytesStreamOutput out = new BytesStreamOutput(); + metadata.writeTo(out); + StreamInput in = out.bytes().streamInput(); + RemoteStoreShardMetadata deserializedMetadata = new RemoteStoreShardMetadata(in); + + assertEquals("test-index", deserializedMetadata.getIndexName()); + assertEquals(0, deserializedMetadata.getShardId()); + assertTrue(deserializedMetadata.getSegmentMetadataFiles().isEmpty()); + assertTrue(deserializedMetadata.getTranslogMetadataFiles().isEmpty()); + assertNull(deserializedMetadata.getLatestSegmentMetadataFileName()); + assertNull(deserializedMetadata.getLatestTranslogMetadataFileName()); + } + + private void assertNestedMapEquals(Map> expected, Map> actual) { + assertEquals("Map size mismatch", expected.size(), actual.size()); + for (String key : expected.keySet()) { + assertTrue("Missing key: " + key, actual.containsKey(key)); + assertFlatMapEquals(expected.get(key), actual.get(key)); + } + } + + @SuppressWarnings("unchecked") + private void assertFlatMapEquals(Map expected, Map actual) { + assertEquals("Inner map size mismatch", expected.size(), actual.size()); + assertEquals("Inner map keys mismatch", expected.keySet(), actual.keySet()); + + for (String key : expected.keySet()) { + Object expectedVal = expected.get(key); + Object actualVal = actual.get(key); + + if (expectedVal instanceof Map) { + assertTrue("Expected value at key " + key + " should be a map", actualVal instanceof Map); + assertFlatMapEquals((Map) expectedVal, (Map) actualVal); + } else if (expectedVal instanceof Number) { + assertEquals("Mismatch at key " + key, ((Number) expectedVal).longValue(), ((Number) actualVal).longValue()); + } else { + assertEquals("Mismatch at key " + key, expectedVal, actualVal); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataActionTests.java new file mode 100644 index 0000000000000..4a95b47dcddea --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/metadata/TransportRemoteStoreMetadataActionTests.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.action.admin.cluster.remotestore.metadata; + +import org.opensearch.Version; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.Before; + +import java.util.EnumSet; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportRemoteStoreMetadataActionTests extends OpenSearchTestCase { + + private TransportRemoteStoreMetadataAction action; + private ClusterService mockClusterService; + private TransportService mockTransportService; + private IndexNameExpressionResolver mockResolver; + private ActionFilters mockActionFilters; + private RepositoriesService mockRepositoriesService; + private ThreadPool mockThreadPool; + private RemoteStoreSettings mockRemoteStoreSettings; + + private static final String TEST_INDEX = "test-index"; + private static final String INDEX_UUID = "uuid-test"; + + @Before + public void setup() { + mockClusterService = mock(ClusterService.class); + mockTransportService = mock(TransportService.class); + mockResolver = mock(IndexNameExpressionResolver.class); + mockActionFilters = mock(ActionFilters.class); + mockRepositoriesService = mock(RepositoriesService.class); + mockThreadPool = mock(ThreadPool.class); + mockRemoteStoreSettings = mock(RemoteStoreSettings.class); + + when(mockTransportService.getTaskManager()).thenReturn(null); + + action = new TransportRemoteStoreMetadataAction( + mockClusterService, + mockTransportService, + mockActionFilters, + mockResolver, + mockRepositoriesService, + mockThreadPool, + mockRemoteStoreSettings + ); + } + + public void testEmptyIndexReturnsEmptyResponse() { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices("dummy"); + + when(mockClusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + when(mockResolver.concreteIndexNames(any(), any())).thenReturn(new String[0]); + + AtomicReference responseRef = new AtomicReference<>(); + action.doExecute(null, request, new ActionListener<>() { + @Override + public void onResponse(RemoteStoreMetadataResponse response) { + responseRef.set(response); + } + + @Override + public void onFailure(Exception e) { + fail("Should not fail"); + } + }); + + assertNotNull(responseRef.get()); + assertEquals(0, responseRef.get().getTotalShards()); + assertEquals(0, responseRef.get().getSuccessfulShards()); + assertEquals(0, responseRef.get().getFailedShards()); + } + + public void testClusterBlocked() { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices(TEST_INDEX); + + ClusterBlock block = new ClusterBlock( + 1, + "metadata_read_block", + true, + true, + false, + null, + EnumSet.of(ClusterBlockLevel.METADATA_READ) + ); + ClusterBlocks blocks = ClusterBlocks.builder().addGlobalBlock(block).build(); + ClusterState blockedState = ClusterState.builder(new ClusterName("test")).blocks(blocks).build(); + + when(mockClusterService.state()).thenReturn(blockedState); + when(mockResolver.concreteIndexNames(any(), any())).thenReturn(new String[] { TEST_INDEX }); + + AtomicReference exceptionRef = new AtomicReference<>(); + action.doExecute(null, request, new ActionListener<>() { + @Override + public void onResponse(RemoteStoreMetadataResponse response) { + fail("Expected failure due to cluster block"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }); + + assertNotNull(exceptionRef.get()); + assertTrue(exceptionRef.get().getMessage().toLowerCase(Locale.ROOT).contains("blocked")); + } + + public void testRemoteStoreDisabledIndex() { + RemoteStoreMetadataRequest request = new RemoteStoreMetadataRequest(); + request.indices(TEST_INDEX); + request.shards("0"); + + ClusterState state = ClusterState.builder(new ClusterName("test")) + .metadata( + Metadata.builder() + .put( + IndexMetadata.builder(TEST_INDEX) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, INDEX_UUID) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) + ) + ) + ) + .build(); + + when(mockClusterService.state()).thenReturn(state); + when(mockResolver.concreteIndexNames(any(), any())).thenReturn(new String[] { TEST_INDEX }); + when(mockClusterService.getSettings()).thenReturn(Settings.EMPTY); // FIXED LINE + + AtomicReference responseRef = new AtomicReference<>(); + action.doExecute(null, request, new ActionListener<>() { + @Override + public void onResponse(RemoteStoreMetadataResponse response) { + responseRef.set(response); + } + + @Override + public void onFailure(Exception e) { + fail("Should handle as shard failure, not exception"); + } + }); + + assertNotNull(responseRef.get()); + assertEquals(1, responseRef.get().getTotalShards()); + assertEquals(0, responseRef.get().getSuccessfulShards()); + assertEquals(1, responseRef.get().getFailedShards()); + assertTrue(responseRef.get().getShardFailures()[0].reason().contains("Remote store not enabled")); + } +}