From 7947513d84d834265e41563e0066862059af4242 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Mon, 24 Nov 2025 18:12:19 +0530 Subject: [PATCH 1/5] Adding version checks to remote entities using bytestream ser/de Signed-off-by: Harsh Garg --- .../InternalRemoteRoutingTableService.java | 11 +++--- .../remote/NoopRemoteRoutingTableService.java | 7 ++-- .../remote/RemoteRoutingTableService.java | 7 ++-- .../remote/RemoteClusterStateService.java | 18 ++++++---- .../remote/model/RemoteClusterBlocks.java | 17 +++++---- .../model/RemoteClusterStateCustoms.java | 12 ++++--- .../remote/model/RemoteDiscoveryNodes.java | 17 +++++---- .../RemoteHashesOfConsistentSettings.java | 23 +++++++++--- .../routingtable/RemoteIndexRoutingTable.java | 15 +++++--- .../routingtable/RemoteRoutingTableDiff.java | 16 +++++---- .../RemoteRoutingTableServiceTests.java | 26 ++++++++++---- ...oteClusterStateAttributesManagerTests.java | 16 ++++----- .../RemoteClusterStateServiceTests.java | 35 +++++++++++++++---- .../RemoteGlobalMetadataManagerTests.java | 10 ++++-- .../model/RemoteClusterBlocksTests.java | 9 ++--- .../model/RemoteClusterStateCustomsTests.java | 12 ++++--- .../model/RemoteDiscoveryNodesTests.java | 10 +++--- ...RemoteHashesOfConsistentSettingsTests.java | 13 ++++--- .../RemoteIndexRoutingTableDiffTests.java | 9 ++--- .../RemoteIndexRoutingTableTests.java | 28 ++++++++++++--- 20 files changed, 214 insertions(+), 97 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index eafbe05faf76f..31d7614f6ad0a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -182,7 +183,8 @@ public List getAllUploadedIndices public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ) { ActionListener actionListener = ActionListener.wrap( @@ -190,7 +192,7 @@ public void getAsyncIndexRoutingReadAction( latchedActionListener::onFailure ); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version); remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } @@ -199,14 +201,15 @@ public void getAsyncIndexRoutingReadAction( public void getAsyncIndexRoutingTableDiffReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener> latchedActionListener + LatchedActionListener> latchedActionListener, + Version version ) { ActionListener> actionListener = ActionListener.wrap( latchedActionListener::onResponse, latchedActionListener::onFailure ); - RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor); + RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor, version); remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 17687199c39d6..65b7c2f0fb26c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -71,7 +72,8 @@ public List getAllUploadedIndices public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ) { // noop } @@ -80,7 +82,8 @@ public void getAsyncIndexRoutingReadAction( public void getAsyncIndexRoutingTableDiffReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener> latchedActionListener + LatchedActionListener> latchedActionListener, + Version version ) { // noop } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d7ef3a29aa21f..e85d588bec07a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -31,13 +32,15 @@ public interface RemoteRoutingTableService extends LifecycleComponent { void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener latchedActionListener + LatchedActionListener latchedActionListener, + Version version ); void getAsyncIndexRoutingTableDiffReadAction( String clusterUUID, String uploadedFilename, - LatchedActionListener> latchedActionListener + LatchedActionListener> latchedActionListener, + Version version ); List getUpdatedIndexRoutingTableMetadata( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 943f5becc528a..5285bf6e10af8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1296,7 +1296,8 @@ ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingReadAction( clusterUUID, indexRouting.getUploadedFilename(), - routingTableLatchedActionListener + routingTableLatchedActionListener, + manifest.getOpensearchVersion() ); } @@ -1315,7 +1316,8 @@ ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( clusterUUID, manifest.getDiffManifest().getIndicesRoutingDiffPath(), - routingTableDiffLatchedActionListener + routingTableDiffLatchedActionListener, + manifest.getOpensearchVersion() ); } @@ -1392,7 +1394,8 @@ ClusterState readClusterStateInParallel( new RemoteDiscoveryNodes( manifest.getDiscoveryNodesMetadata().getUploadedFilename(), clusterUUID, - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + manifest.getOpensearchVersion() ), listener ); @@ -1404,7 +1407,8 @@ ClusterState readClusterStateInParallel( new RemoteClusterBlocks( manifest.getClusterBlocksMetadata().getUploadedFilename(), clusterUUID, - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + manifest.getOpensearchVersion() ), listener ); @@ -1416,7 +1420,8 @@ ClusterState readClusterStateInParallel( new RemoteHashesOfConsistentSettings( manifest.getHashesOfConsistentSettings().getUploadedFilename(), clusterUUID, - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + manifest.getOpensearchVersion() ), listener ); @@ -1431,7 +1436,8 @@ ClusterState readClusterStateInParallel( entry.getValue().getAttributeName(), clusterUUID, blobStoreRepository.getCompressor(), - namedWriteableRegistry + namedWriteableRegistry, + manifest.getOpensearchVersion() ), listener ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 101daaa143a66..2bc3a3efdca1f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; @@ -32,10 +33,7 @@ public class RemoteClusterBlocks extends AbstractClusterMetadataWriteableBlobEntity { public static final String CLUSTER_BLOCKS = "blocks"; - public static final ChecksumWritableBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumWritableBlobStoreFormat<>( - "blocks", - ClusterBlocks::readFrom - ); + public final ChecksumWritableBlobStoreFormat clusterBlocksFormat; private ClusterBlocks clusterBlocks; private long stateVersion; @@ -44,11 +42,16 @@ public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion, super(clusterUUID, compressor, null); this.clusterBlocks = clusterBlocks; this.stateVersion = stateVersion; + this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom); } - public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor) { + public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) { super(clusterUUID, compressor, null); this.blobName = blobName; + this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", is -> { + is.setVersion(version); + return ClusterBlocks.readFrom(is); + }); } @Override @@ -83,11 +86,11 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput(); + return this.clusterBlocksFormat.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput(); } @Override public ClusterBlocks deserialize(final InputStream inputStream) throws IOException { - return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); + return this.clusterBlocksFormat.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index e5e44525520f4..64cf79175392e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.io.Streams; @@ -65,16 +66,17 @@ public RemoteClusterStateCustoms( final String customType, final String clusterUUID, final Compressor compressor, - final NamedWriteableRegistry namedWriteableRegistry + final NamedWriteableRegistry namedWriteableRegistry, + final Version version ) { super(clusterUUID, compressor, null); this.blobName = blobName; this.customType = customType; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>( - "cluster-state-custom", - is -> readFrom(is, namedWriteableRegistry, customType) - ); + this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>("cluster-state-custom", is -> { + is.setVersion(version); + return readFrom(is, namedWriteableRegistry, customType); + }); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 829036c6d122b..215b9b326886d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; @@ -32,10 +33,7 @@ public class RemoteDiscoveryNodes extends AbstractClusterMetadataWriteableBlobEntity { public static final String DISCOVERY_NODES = "nodes"; - public static final ChecksumWritableBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumWritableBlobStoreFormat<>( - "nodes", - is -> DiscoveryNodes.readFrom(is, null) - ); + public final ChecksumWritableBlobStoreFormat discoveryNodesFormat; private DiscoveryNodes discoveryNodes; private long stateVersion; @@ -49,11 +47,16 @@ public RemoteDiscoveryNodes( super(clusterUUID, compressor, null); this.discoveryNodes = discoveryNodes; this.stateVersion = stateVersion; + this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null)); } - public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor) { + public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) { super(clusterUUID, compressor, null); this.blobName = blobName; + this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> { + is.setVersion(version); + return DiscoveryNodes.readFrom(is, null); + }); } @Override @@ -88,7 +91,7 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize( + return discoveryNodesFormat.serialize( (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), discoveryNodes, generateBlobFileName(), @@ -98,6 +101,6 @@ public InputStream serialize() throws IOException { @Override public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException { - return DISCOVERY_NODES_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); + return discoveryNodesFormat.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index dee48237e5c4c..da923a08c9d0d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; @@ -30,8 +31,7 @@ */ public class RemoteHashesOfConsistentSettings extends AbstractClusterMetadataWriteableBlobEntity { public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings"; - public static final ChecksumWritableBlobStoreFormat HASHES_OF_CONSISTENT_SETTINGS_FORMAT = - new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", DiffableStringMap::readFrom); + public final ChecksumWritableBlobStoreFormat hashesOfConsistentSettingsFormat; private DiffableStringMap hashesOfConsistentSettings; private long metadataVersion; @@ -45,11 +45,24 @@ public RemoteHashesOfConsistentSettings( super(clusterUUID, compressor, null); this.metadataVersion = metadataVersion; this.hashesOfConsistentSettings = hashesOfConsistentSettings; + this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>( + "hashes-of-consistent-settings", + DiffableStringMap::readFrom + ); } - public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor) { + public RemoteHashesOfConsistentSettings( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final Version version + ) { super(clusterUUID, compressor, null); this.blobName = blobName; + this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", is -> { + is.setVersion(version); + return DiffableStringMap.readFrom(is); + }); } @Override @@ -83,12 +96,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor()) + return hashesOfConsistentSettingsFormat.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor()) .streamInput(); } @Override public DiffableStringMap deserialize(final InputStream inputStream) throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); + return hashesOfConsistentSettingsFormat.deserialize(blobName, Streams.readFully(inputStream)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 46c5074c48eb8..773e8dab4f155 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.routingtable; +import org.opensearch.Version; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; @@ -37,8 +38,7 @@ public class RemoteIndexRoutingTable extends AbstractClusterMetadataWriteableBlo private long term; private long version; private BlobPathParameters blobPathParameters; - public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = - new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); + public final ChecksumWritableBlobStoreFormat indexRoutingTableFormat; public RemoteIndexRoutingTable( IndexRoutingTable indexRoutingTable, @@ -52,6 +52,7 @@ public RemoteIndexRoutingTable( this.indexRoutingTable = indexRoutingTable; this.term = term; this.version = version; + this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); } /** @@ -60,12 +61,16 @@ public RemoteIndexRoutingTable( * @param clusterUUID UUID of the cluster * @param compressor Compressor object */ - public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor, Version opensearchVersion) { super(clusterUUID, compressor); this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; + this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", is -> { + is.setVersion(opensearchVersion); + return IndexRoutingTable.readFrom(is); + }); } @Override @@ -104,11 +109,11 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + return indexRoutingTableFormat.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); } @Override public IndexRoutingTable deserialize(InputStream in) throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.deserialize(blobName, Streams.readFully(in)); + return indexRoutingTableFormat.deserialize(blobName, Streams.readFully(in)); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java index b3e0e9e5763b7..a4bc8ff272e7f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.routingtable; +import org.opensearch.Version; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -47,8 +48,7 @@ public class RemoteRoutingTableDiff extends AbstractClusterMetadataWriteableBlob public static final int VERSION = 1; - public static final ChecksumWritableBlobStoreFormat REMOTE_ROUTING_TABLE_DIFF_FORMAT = - new ChecksumWritableBlobStoreFormat<>(codec, RoutingTableIncrementalDiff::readFrom); + public final ChecksumWritableBlobStoreFormat remoteRoutingTableDiffFormat; /** * Constructs a new RemoteRoutingTableDiff with the given differences. @@ -70,6 +70,7 @@ public RemoteRoutingTableDiff( this.routingTableIncrementalDiff = routingTableIncrementalDiff; this.term = term; this.version = version; + this.remoteRoutingTableDiffFormat = new ChecksumWritableBlobStoreFormat<>(codec, RoutingTableIncrementalDiff::readFrom); } /** @@ -79,10 +80,14 @@ public RemoteRoutingTableDiff( * @param clusterUUID the cluster UUID. * @param compressor the compressor to be used. */ - public RemoteRoutingTableDiff(String blobName, String clusterUUID, Compressor compressor) { + public RemoteRoutingTableDiff(String blobName, String clusterUUID, Compressor compressor, final Version version) { super(clusterUUID, compressor); this.routingTableIncrementalDiff = null; this.blobName = blobName; + this.remoteRoutingTableDiffFormat = new ChecksumWritableBlobStoreFormat<>(codec, is -> { + is.setVersion(version); + return RoutingTableIncrementalDiff.readFrom(is); + }); } /** @@ -127,12 +132,11 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { assert routingTableIncrementalDiff != null; - return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor()) - .streamInput(); + return remoteRoutingTableDiffFormat.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor()).streamInput(); } @Override public Diff deserialize(InputStream in) throws IOException { - return REMOTE_ROUTING_TABLE_DIFF_FORMAT.deserialize(blobName, Streams.readFully(in)); + return remoteRoutingTableDiffFormat.deserialize(blobName, Streams.readFully(in)); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 023d9b983afc4..068cbbdbce498 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -35,6 +35,8 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -70,8 +72,6 @@ import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; -import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.REMOTE_ROUTING_TABLE_DIFF_FORMAT; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_PATH_TOKEN; @@ -567,8 +567,14 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + uploadedFileName, + clusterState.stateUUID(), + compressor, + Version.CURRENT + ); when(blobContainer.readBlob(indexName)).thenReturn( - INDEX_ROUTING_TABLE_FORMAT.serialize( + remoteIndexRoutingTable.indexRoutingTableFormat.serialize( clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName, compressor @@ -580,7 +586,8 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { remoteRoutingTableService.getAsyncIndexRoutingReadAction( "cluster-uuid", uploadedFileName, - new LatchedActionListener<>(listener, latch) + new LatchedActionListener<>(listener, latch), + Version.CURRENT ); latch.await(); @@ -598,8 +605,14 @@ public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { RoutingTableIncrementalDiff diff = new RoutingTableIncrementalDiff(previousState.getRoutingTable(), currentState.getRoutingTable()); String uploadedFileName = String.format(Locale.ROOT, "routing-table-diff/" + indexName); + RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff( + uploadedFileName, + currentState.stateUUID(), + compressor, + Version.CURRENT + ); when(blobContainer.readBlob(indexName)).thenReturn( - REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(diff, uploadedFileName, compressor).streamInput() + remoteRoutingTableDiff.remoteRoutingTableDiffFormat.serialize(diff, uploadedFileName, compressor).streamInput() ); TestCapturingListener> listener = new TestCapturingListener<>(); @@ -608,7 +621,8 @@ public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( "cluster-uuid", uploadedFileName, - new LatchedActionListener<>(listener, latch) + new LatchedActionListener<>(listener, latch), + Version.CURRENT ); latch.await(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 67b1528466a9e..a8943e92b7397 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -57,11 +58,9 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; -import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustomsTests.getClusterStateCustom; -import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes; import static org.opensearch.index.remote.RemoteStoreUtils.invertLong; import static org.hamcrest.Matchers.is; @@ -139,15 +138,15 @@ public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, Inter public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); + RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor, Version.CURRENT); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - DISCOVERY_NODES_FORMAT.serialize( + remoteObjForDownload.discoveryNodesFormat.serialize( (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), discoveryNodes, fileName, compressor ).streamInput() ); - RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); remoteClusterStateAttributesManager.readAsync(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)); @@ -194,10 +193,10 @@ public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, Interr public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor, Version.CURRENT); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, fileName, compressor).streamInput() + remoteClusterBlocks.clusterBlocksFormat.serialize(clusterBlocks, fileName, compressor).streamInput() ); - RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); @@ -263,7 +262,8 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc custom.getWriteableName(), CLUSTER_UUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + Version.CURRENT ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput() @@ -309,7 +309,7 @@ public void testGetAsyncWriteRunnable_Exception() throws IOException, Interrupte public void testGetAsyncReadRunnable_Exception() throws IOException, InterruptedException { String fileName = randomAlphaOfLength(10); - RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor, Version.CURRENT); Exception ioException = new IOException("mock test exception"); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 52a6e3e9c00db..909581d2d0d10 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -54,8 +54,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; +import org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings; import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; @@ -123,7 +126,6 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT; -import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; @@ -131,11 +133,9 @@ import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.readFrom; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES; -import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes; import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT; import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; -import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettingsTests.getHashesOfConsistentSettings; import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX; import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_METADATA_FORMAT; @@ -1173,7 +1173,13 @@ public void testGetClusterStateUsingDiff() throws IOException { new UploadedMetadataAttribute(HASHES_OF_CONSISTENT_SETTINGS, HASHES_OF_CONSISTENT_SETTINGS_FILENAME) ); when(blobContainer.readBlob(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)).thenAnswer(i -> { - BytesReference bytes = HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + RemoteHashesOfConsistentSettings remoteHashesOfConsistentSettings = new RemoteHashesOfConsistentSettings( + HASHES_OF_CONSISTENT_SETTINGS_FILENAME, + clusterState.stateUUID(), + compressor, + Version.CURRENT + ); + BytesReference bytes = remoteHashesOfConsistentSettings.hashesOfConsistentSettingsFormat.serialize( hashesOfConsistentSettings, HASHES_OF_CONSISTENT_SETTINGS_FILENAME, compressor @@ -1215,7 +1221,13 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.discoveryNodesUpdated(true); manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME)); when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize( + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes( + DISCOVERY_NODES_FILENAME, + clusterState.stateUUID(), + compressor, + Version.CURRENT + ); + BytesReference bytes = remoteDiscoveryNodes.discoveryNodesFormat.serialize( (out, nodes) -> nodes.writeToWithAttribute(out), nodesBuilder.build(), DISCOVERY_NODES_FILENAME, @@ -1231,7 +1243,17 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.clusterBlocksUpdated(true); manifestBuilder.clusterBlocksMetadata(new UploadedMetadataAttribute(CLUSTER_BLOCKS, CLUSTER_BLOCKS_FILENAME)); when(blobContainer.readBlob(CLUSTER_BLOCKS_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = CLUSTER_BLOCKS_FORMAT.serialize(newClusterBlock, CLUSTER_BLOCKS_FILENAME, compressor); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks( + CLUSTER_BLOCKS_FILENAME, + clusterState.stateUUID(), + compressor, + Version.CURRENT + ); + BytesReference bytes = remoteClusterBlocks.clusterBlocksFormat.serialize( + newClusterBlock, + CLUSTER_BLOCKS_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); @@ -1243,6 +1265,7 @@ public void testGetClusterStateUsingDiff() throws IOException { .stateVersion(clusterState.version()) .metadataVersion(clusterState.metadata().version()) .clusterUUID(clusterState.getMetadata().clusterUUID()) + .opensearchVersion(Version.CURRENT) .routingTableVersion(clusterState.getRoutingTable().version()); remoteClusterStateService.start(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 591fbf31a3021..f1a31e89d0b55 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -79,7 +79,6 @@ import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT; import static org.opensearch.gateway.remote.model.RemoteGlobalMetadataTests.getGlobalMetadata; import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; -import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettingsTests.getHashesOfConsistentSettings; import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadataTests.getSettings; @@ -357,10 +356,15 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( fileName, CLUSTER_UUID, - compressor + compressor, + Version.CURRENT ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() + hashesOfConsistentSettingsForDownload.hashesOfConsistentSettingsFormat.serialize( + hashesOfConsistentSettings, + fileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java index 3c1e141b81360..d258b172371ee 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.blobstore.BlobPath; @@ -62,7 +63,7 @@ public void testClusterUUID() { RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks(clusterBlocks, METADATA_VERSION, clusterUUID, compressor); assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); } @@ -71,7 +72,7 @@ public void testFullBlobName() { RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks(clusterBlocks, METADATA_VERSION, clusterUUID, compressor); assertNull(remoteObjectForUpload.getFullBlobName()); - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.getFullBlobName(), TEST_BLOB_NAME); } @@ -80,13 +81,13 @@ public void testBlobFileName() { RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks(clusterBlocks, METADATA_VERSION, clusterUUID, compressor); assertNull(remoteObjectForUpload.getBlobFileName()); - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); } public void testBlobPathTokens() { String uploadedFile = "user/local/opensearch/cluster-blocks"; - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(uploadedFile, clusterUUID, compressor); + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(uploadedFile, clusterUUID, compressor, Version.CURRENT); assertArrayEquals(remoteObjectForDownload.getBlobPathTokens(), new String[] { "user", "local", "opensearch", "cluster-blocks" }); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java index 1b020e13324a4..b7b99a7b4e428 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java @@ -101,7 +101,8 @@ public void testClusterUUID() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + CURRENT ); assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); } @@ -123,7 +124,8 @@ public void testFullBlobName() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + CURRENT ); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); } @@ -145,7 +147,8 @@ public void testBlobFileName() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + CURRENT ); assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); } @@ -157,7 +160,8 @@ public void testBlobPathTokens() { "test-custom", clusterUUID, compressor, - namedWriteableRegistry + namedWriteableRegistry, + CURRENT ); assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "clusterStateCustoms" })); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index 1b988ee1f37ec..1ec99318e2be8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -70,7 +70,7 @@ public void testClusterUUID() { RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); - RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); } @@ -79,7 +79,7 @@ public void testFullBlobName() { RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); assertNull(remoteObjectForUpload.getFullBlobName()); - RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.getFullBlobName(), TEST_BLOB_NAME); } @@ -88,13 +88,13 @@ public void testBlobFileName() { RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); assertNull(remoteObjectForUpload.getBlobFileName()); - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); } public void testBlobPathTokens() { String uploadedFile = "user/local/opensearch/discovery-nodes"; - RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(uploadedFile, clusterUUID, compressor); + RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(uploadedFile, clusterUUID, compressor, Version.CURRENT); assertArrayEquals(remoteObjectForDownload.getBlobPathTokens(), new String[] { "user", "local", "opensearch", "discovery-nodes" }); } @@ -152,7 +152,7 @@ public void testExceptionDuringDeserialize() throws IOException { InputStream in = mock(InputStream.class); when(in.read(any(byte[].class))).thenThrow(new IOException("mock-exception")); String uploadedFile = "user/local/opensearch/discovery-nodes"; - RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(uploadedFile, clusterUUID, compressor); + RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(uploadedFile, clusterUUID, compressor, Version.CURRENT); IOException ioe = assertThrows(IOException.class, () -> remoteObjectForDownload.deserialize(in)); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java index b931f24f98631..1c3e52d7adf9a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.model; +import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.blobstore.BlobPath; @@ -71,7 +72,8 @@ public void testClusterUUID() { RemoteHashesOfConsistentSettings remoteObjectForDownload = new RemoteHashesOfConsistentSettings( TEST_BLOB_NAME, clusterUUID, - compressor + compressor, + Version.CURRENT ); assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); } @@ -89,7 +91,8 @@ public void testFullBlobName() { RemoteHashesOfConsistentSettings remoteObjectForDownload = new RemoteHashesOfConsistentSettings( TEST_BLOB_NAME, clusterUUID, - compressor + compressor, + Version.CURRENT ); assertEquals(remoteObjectForDownload.getFullBlobName(), TEST_BLOB_NAME); } @@ -107,7 +110,8 @@ public void testBlobFileName() { RemoteHashesOfConsistentSettings remoteObjectForDownload = new RemoteHashesOfConsistentSettings( TEST_BLOB_NAME, clusterUUID, - compressor + compressor, + Version.CURRENT ); assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); } @@ -117,7 +121,8 @@ public void testBlobPathTokens() { RemoteHashesOfConsistentSettings remoteObjectForDownload = new RemoteHashesOfConsistentSettings( uploadedFile, clusterUUID, - compressor + compressor, + Version.CURRENT ); assertArrayEquals( remoteObjectForDownload.getBlobPathTokens(), diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java index 5dcc3127f2f5d..c9dd7d9242855 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote.routingtable; +import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.RoutingTable; @@ -100,7 +101,7 @@ public void testClusterUUID() { ); assertEquals(remoteDiffForUpload.clusterUUID(), clusterUUID); - RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteDiffForDownload.clusterUUID(), clusterUUID); } @@ -123,7 +124,7 @@ public void testFullBlobName() { ); assertThat(remoteDiffForUpload.getFullBlobName(), nullValue()); - RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertThat(remoteDiffForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); } @@ -146,7 +147,7 @@ public void testBlobFileName() { ); assertThat(remoteDiffForUpload.getBlobFileName(), nullValue()); - RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertThat(remoteDiffForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); } @@ -249,7 +250,7 @@ public void testStreamOperations() throws IOException { InputStream inputStream = remoteDiffForUpload.serialize(); // Create a new instance for deserialization - RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); // Deserialize the remote diff Diff deserializedDiff = remoteDiffForDownload.deserialize(inputStream); diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 29d4ffa978851..b2dd5c67c47dc 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -107,7 +107,12 @@ public void testClusterUUID() { ); assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); }); } @@ -137,7 +142,12 @@ public void testFullBlobName() { ); assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); }); } @@ -167,14 +177,24 @@ public void testBlobFileName() { ); assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + TEST_BLOB_NAME, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); }); } public void testBlobPathTokens() { String uploadedFile = "user/local/opensearch/routingTable"; - RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor); + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable( + uploadedFile, + clusterUUID, + compressor, + Version.CURRENT + ); assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); } From c0a68305d9b2a1f50734793c906c9691b3ea0fbc Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Fri, 28 Nov 2025 16:33:02 +0530 Subject: [PATCH 2/5] Adding OpensearchVersion attribute to ChecksumWritableBlobStoreFormat Signed-off-by: Harsh Garg --- CHANGELOG.md | 1 + .../gateway/remote/RemoteClusterStateService.java | 9 +++++++++ .../gateway/remote/model/RemoteClusterBlocks.java | 5 +---- .../gateway/remote/model/RemoteClusterStateCustoms.java | 9 +++++---- .../gateway/remote/model/RemoteCustomMetadata.java | 9 +++++---- .../gateway/remote/model/RemoteDiscoveryNodes.java | 5 +---- .../remote/model/RemoteHashesOfConsistentSettings.java | 9 +++++---- .../remote/routingtable/RemoteIndexRoutingTable.java | 9 +++++---- .../remote/routingtable/RemoteRoutingTableDiff.java | 5 +---- .../blobstore/ChecksumWritableBlobStoreFormat.java | 7 +++++++ 10 files changed, 40 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4786c353b697d..ec77721d7f2e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948)) - Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102)) - Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([20105](https://github.com/opensearch-project/OpenSearch/pull/20105)) +- Fixed version incompatibility in remote state entities using bytestream for ser/de ([#20080](https://github.com/opensearch-project/OpenSearch/pull/20080)) ### Dependencies - Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026)) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 5285bf6e10af8..a28f32bb03bf5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -1260,6 +1261,14 @@ ClusterState readClusterStateInParallel( AtomicReference> readIndexRoutingTableDiffResults = new AtomicReference<>(); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); + if (manifest.getOpensearchVersion() != Version.CURRENT) { + logger.info( + "Reading cluster state on version {} from manifest uploaded with version {}", + Version.CURRENT, + manifest.getOpensearchVersion() + ); + } + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(response -> { logger.debug("Successfully read cluster state component from remote"); readResults.add(response); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 2bc3a3efdca1f..7ecad9f75baa0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -48,10 +48,7 @@ public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion, public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) { super(clusterUUID, compressor, null); this.blobName = blobName; - this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", is -> { - is.setVersion(version); - return ClusterBlocks.readFrom(is); - }); + this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom, version); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index 64cf79175392e..641ec3f14b88b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -73,10 +73,11 @@ public RemoteClusterStateCustoms( this.blobName = blobName; this.customType = customType; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>("cluster-state-custom", is -> { - is.setVersion(version); - return readFrom(is, namedWriteableRegistry, customType); - }); + this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>( + "cluster-state-custom", + is -> readFrom(is, namedWriteableRegistry, customType), + version + ); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java index 03055a0be0e64..04c28cdcc61bd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -75,10 +75,11 @@ public RemoteCustomMetadata( this.blobName = blobName; this.customType = customType; this.namedWriteableRegistry = namedWriteableRegistry; - this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>("custom", is -> { - is.setVersion(version); - return readFrom(is, namedWriteableRegistry, customType); - }); + this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>( + "custom", + is -> readFrom(is, namedWriteableRegistry, customType), + version + ); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 215b9b326886d..745ac9bc70184 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -53,10 +53,7 @@ public RemoteDiscoveryNodes( public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) { super(clusterUUID, compressor, null); this.blobName = blobName; - this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> { - is.setVersion(version); - return DiscoveryNodes.readFrom(is, null); - }); + this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null), version); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index da923a08c9d0d..3695d08631351 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -59,10 +59,11 @@ public RemoteHashesOfConsistentSettings( ) { super(clusterUUID, compressor, null); this.blobName = blobName; - this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", is -> { - is.setVersion(version); - return DiffableStringMap.readFrom(is); - }); + this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>( + "hashes-of-consistent-settings", + DiffableStringMap::readFrom, + version + ); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 773e8dab4f155..55b87746b923a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -67,10 +67,11 @@ public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor c this.term = -1; this.version = -1; this.blobName = blobName; - this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>("index-routing-table", is -> { - is.setVersion(opensearchVersion); - return IndexRoutingTable.readFrom(is); - }); + this.indexRoutingTableFormat = new ChecksumWritableBlobStoreFormat<>( + "index-routing-table", + IndexRoutingTable::readFrom, + opensearchVersion + ); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java index a4bc8ff272e7f..19f0e02e8a23b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java @@ -84,10 +84,7 @@ public RemoteRoutingTableDiff(String blobName, String clusterUUID, Compressor co super(clusterUUID, compressor); this.routingTableIncrementalDiff = null; this.blobName = blobName; - this.remoteRoutingTableDiffFormat = new ChecksumWritableBlobStoreFormat<>(codec, is -> { - is.setVersion(version); - return RoutingTableIncrementalDiff.readFrom(is); - }); + this.remoteRoutingTableDiffFormat = new ChecksumWritableBlobStoreFormat<>(codec, RoutingTableIncrementalDiff::readFrom, version); } /** diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 88672995f4fd6..695bad89602ea 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -50,10 +50,16 @@ public class ChecksumWritableBlobStoreFormat { private final String codec; private final CheckedFunction reader; + private final Version opensearchVersion; public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction reader) { + this(codec, reader, Version.CURRENT); + } + + public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction reader, Version opensearchVersion) { this.codec = codec; this.reader = reader; + this.opensearchVersion = opensearchVersion; } public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { @@ -102,6 +108,7 @@ public T deserialize(String blobName, BytesReference bytes) throws IOException { BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize); Compressor compressor = CompressorRegistry.compressorForWritable(bytesReference); try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) { + in.setVersion(opensearchVersion); return reader.apply(in); } } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { From ea9ef12788b4bfe09da7210dd9a29c5253625709 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Sat, 29 Nov 2025 00:11:00 +0530 Subject: [PATCH 3/5] Adding UT for mixed version ser/de of RemoteDiscoveryNodes. Signed-off-by: Harsh Garg --- .../ChecksumWritableBlobStoreFormat.java | 6 +++ .../RemoteRoutingTableServiceTests.java | 32 +++++++------- .../model/RemoteDiscoveryNodesTests.java | 44 +++++++++++++++++++ 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 695bad89602ea..44b3a4119ffeb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -50,6 +50,12 @@ public class ChecksumWritableBlobStoreFormat { private final String codec; private final CheckedFunction reader; + /** + * opensearchVersion here corresponds to the version of the node which is/was used to + * serialize or deserialize the blob entity. Currently, it is just being referenced while + * deserializing, to ensure the deserialization is compatible with the current version. + * Remote entities can fetch the opensearchVersion version from manifest and pass it along. + */ private final Version opensearchVersion; public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction reader) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 068cbbdbce498..3564e10935722 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -618,21 +618,23 @@ public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { TestCapturingListener> listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( - "cluster-uuid", - uploadedFileName, - new LatchedActionListener<>(listener, latch), - Version.CURRENT - ); - latch.await(); - - assertNull(listener.getFailure()); - assertNotNull(listener.getResult()); - Diff resultDiff = listener.getResult(); - assertEquals( - currentState.getRoutingTable().getIndicesRouting(), - resultDiff.apply(previousState.getRoutingTable()).getIndicesRouting() - ); + for (Version version : List.of(Version.CURRENT, Version.V_3_1_0, Version.V_3_2_0)) { + remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( + "cluster-uuid", + uploadedFileName, + new LatchedActionListener<>(listener, latch), + version + ); + latch.await(); + + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + Diff resultDiff = listener.getResult(); + assertEquals( + currentState.getRoutingTable().getIndicesRouting(), + resultDiff.apply(previousState.getRoutingTable()).getIndicesRouting() + ); + } } public void testGetAsyncIndexRoutingWriteAction() throws Exception { diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index 1ec99318e2be8..156680858f359 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -23,9 +23,11 @@ import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -156,6 +158,48 @@ public void testExceptionDuringDeserialize() throws IOException { IOException ioe = assertThrows(IOException.class, () -> remoteObjectForDownload.deserialize(in)); } + public void testSerializationOnOldVersionDeserializationOnNewVersions() throws IOException { + DiscoveryNodes nodes = getDiscoveryNodes(); + + // Create format with V_3_1_0 for serialization using the same ChecksumWritableBlobStoreFormat + // which is being used in RemoteDiscoveryNodes class. + ChecksumWritableBlobStoreFormat serializeFormat = new ChecksumWritableBlobStoreFormat<>( + "nodes", + is -> DiscoveryNodes.readFrom(is, null) + ); + + // Serialize using 3.1.0 format + byte[] serializedData = serializeFormat.serialize( + (out, discoveryNode) -> { + out.setVersion(Version.V_3_1_0); + discoveryNode.writeToWithAttribute(out); + }, + nodes, + "test-blob", + compressor + ).streamInput().readAllBytes(); + + + // Deserialize on 3.2.0 + RemoteDiscoveryNodes deserializeOn32 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_2_0); + try (InputStream inputStream = new ByteArrayInputStream(serializedData)) { + deserializeOn32.deserialize(inputStream); + fail("De-serealizing assuming DiscoveryNodes were serialized using 3.4 should fail"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + + // Deserialize on 3.4.0 + RemoteDiscoveryNodes deserializeOn31 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_1_0); + DiscoveryNodes readNodes31; + try (InputStream inputStream = new ByteArrayInputStream(serializedData)) { + readNodes31 = deserializeOn31.deserialize(inputStream); + } + assertEquals(nodes.getSize(), readNodes31.getSize()); + assertEquals(nodes.getClusterManagerNodeId(), readNodes31.getClusterManagerNodeId()); + } + + public static DiscoveryNodes getDiscoveryNodes() { return DiscoveryNodes.builder() .add( From 98d0cf268d4321cd84a0ef13e270a84ac0b8afeb Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Tue, 2 Dec 2025 17:54:21 +0530 Subject: [PATCH 4/5] Fixing typo in comment statement for UT Signed-off-by: Harsh Garg --- .../model/RemoteDiscoveryNodesTests.java | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index 156680858f359..0934a99081882 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -90,7 +90,7 @@ public void testBlobFileName() { RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); assertNull(remoteObjectForUpload.getBlobFileName()); - RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); + RemoteDiscoveryNodes remoteObjectForDownload = new RemoteDiscoveryNodes(TEST_BLOB_NAME, clusterUUID, compressor, Version.CURRENT); assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); } @@ -169,37 +169,29 @@ public void testSerializationOnOldVersionDeserializationOnNewVersions() throws I ); // Serialize using 3.1.0 format - byte[] serializedData = serializeFormat.serialize( - (out, discoveryNode) -> { - out.setVersion(Version.V_3_1_0); - discoveryNode.writeToWithAttribute(out); - }, - nodes, - "test-blob", - compressor - ).streamInput().readAllBytes(); - - - // Deserialize on 3.2.0 - RemoteDiscoveryNodes deserializeOn32 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_2_0); - try (InputStream inputStream = new ByteArrayInputStream(serializedData)) { - deserializeOn32.deserialize(inputStream); - fail("De-serealizing assuming DiscoveryNodes were serialized using 3.4 should fail"); - } catch (Exception e) { - assertTrue(e instanceof IllegalStateException); - } - - // Deserialize on 3.4.0 - RemoteDiscoveryNodes deserializeOn31 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_1_0); + byte[] serializedData = serializeFormat.serialize((out, discoveryNode) -> { + out.setVersion(Version.V_3_1_0); + discoveryNode.writeToWithAttribute(out); + }, nodes, "test-blob", compressor).streamInput().readAllBytes(); + + // Deserialize assuming serialization was performed using version 3.4.0, which should fail + RemoteDiscoveryNodes deserialize34 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_4_0); + assertThrows(Exception.class, () -> { + try (InputStream inputStream = new ByteArrayInputStream(serializedData)) { + deserialize34.deserialize(inputStream); + } + }); + + // Deserialize assuming serialization was performed using version 3.1.0, which should pass + RemoteDiscoveryNodes deserialize31 = new RemoteDiscoveryNodes("test-blob", clusterUUID, compressor, Version.V_3_1_0); DiscoveryNodes readNodes31; try (InputStream inputStream = new ByteArrayInputStream(serializedData)) { - readNodes31 = deserializeOn31.deserialize(inputStream); + readNodes31 = deserialize31.deserialize(inputStream); } assertEquals(nodes.getSize(), readNodes31.getSize()); assertEquals(nodes.getClusterManagerNodeId(), readNodes31.getClusterManagerNodeId()); } - public static DiscoveryNodes getDiscoveryNodes() { return DiscoveryNodes.builder() .add( From d40567789fa91ccd354a85520cc75bd987598ebc Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Tue, 2 Dec 2025 21:19:30 +0530 Subject: [PATCH 5/5] Retry Build Signed-off-by: Harsh Garg