Skip to content

Commit

Permalink
[Remote Routing Table] Implement write and read flow for shard diff f…
Browse files Browse the repository at this point in the history
…ile. (#14684)

* Implement write and read flow to upload/download shard diff file.

Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
shailendra0811 authored Jul 23, 2024
1 parent e485856 commit f85a58f
Show file tree
Hide file tree
Showing 20 changed files with 1,663 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add shard-diff path to diff manifest to reduce number of read calls remote store (([#14684](https://github.com/opensearch-project/OpenSearch/pull/14684)))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand All @@ -32,16 +33,19 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
BlobPath indexRoutingPath;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;
Expand Down Expand Up @@ -72,7 +76,13 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
Expand All @@ -86,7 +96,11 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
});

verifyUpdatesInManifestFile(remoteManifestManager);
latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
Expand All @@ -98,6 +112,42 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));

// Update cluster settings
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS))
.get();
assertTrue(response.isAcknowledged());

latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, false);

routingTableVersions = getRoutingTableFromAllNodes();
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

Expand All @@ -124,10 +174,16 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
}

public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
Expand All @@ -153,7 +209,13 @@ public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
verifyUpdatesInManifestFile(remoteManifestManager);
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
}

private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
Expand Down Expand Up @@ -208,18 +270,23 @@ private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID
);
}

private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) {
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
private void verifyUpdatesInManifestFile(
Optional<ClusterMetadataManifest> latestManifest,
List<String> expectedIndexNames,
int expectedIndicesRoutingFilesInManifest,
List<String> expectedDeletedIndex,
boolean isRoutingTableDiffFileExpected
) {
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME));
assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty());
assertFalse(manifest.getIndicesRouting().isEmpty());
assertEquals(1, manifest.getIndicesRouting().size());
assertTrue(manifest.getIndicesRouting().get(0).getUploadedFilename().contains(indexRoutingPath.buildAsString()));

assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
}
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
}

private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.Diff;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized.
*/
public class RoutingTableIncrementalDiff implements Diff<RoutingTable> {

private final Map<String, Diff<IndexRoutingTable>> diffs;

/**
* Constructs a new RoutingTableIncrementalDiff with the given differences.
*
* @param diffs a map containing the differences of {@link IndexRoutingTable}.
*/
public RoutingTableIncrementalDiff(Map<String, Diff<IndexRoutingTable>> diffs) {
this.diffs = diffs;
}

/**
* Gets the map of differences of {@link IndexRoutingTable}.
*
* @return a map containing the differences.
*/
public Map<String, Diff<IndexRoutingTable>> getDiffs() {
return diffs;
}

/**
* Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized RoutingTableIncrementalDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>();

for (int i = 0; i < size; i++) {
String key = in.readString();
Diff<IndexRoutingTable> diff = IndexRoutingTableIncrementalDiff.readFrom(in);
diffs.put(key, diff);
}
return new RoutingTableIncrementalDiff(diffs);
}

/**
* Applies the differences to the provided {@link RoutingTable}.
*
* @param part the original RoutingTable to which the differences will be applied.
* @return the updated RoutingTable with the applied differences.
*/
@Override
public RoutingTable apply(RoutingTable part) {
RoutingTable.Builder builder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : part) {
builder.add(indexRoutingTable); // Add existing index routing tables to builder
}

// Apply the diffs
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
builder.add(entry.getValue().apply(part.index(entry.getKey())));
}

return builder.build();
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(diffs.size());
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
}

/**
* Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized.
*/
public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutingTable> {

private final List<IndexShardRoutingTable> indexShardRoutingTables;

/**
* Constructs a new IndexShardRoutingTableDiff with the given shard routing tables.
*
* @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences.
*/
public IndexRoutingTableIncrementalDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
this.indexShardRoutingTables = indexShardRoutingTables;
}

/**
* Applies the differences to the provided {@link IndexRoutingTable}.
*
* @param part the original IndexRoutingTable to which the differences will be applied.
* @return the updated IndexRoutingTable with the applied differences.
*/
@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
for (IndexShardRoutingTable shardRoutingTable : part) {
builder.addIndexShard(shardRoutingTable); // Add existing shards to builder
}

// Apply the diff: update or add the new shard routing tables
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
builder.addIndexShard(diffShard);
}
return builder.build();
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexShardRoutingTables.size());
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
}
}

/**
* Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized IndexShardRoutingTableDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
indexShardRoutingTables.add(shardRoutingTable);
}
return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables);
}
}
}
Loading

0 comments on commit f85a58f

Please sign in to comment.