Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -583,6 +586,44 @@ public void testRemotePublicationSettingChangePersistedAfterFullRestart() throws
});
}

public void testPublicationIndexAlias() throws Exception {
// create cluster with multi node (3 master + 2 data)
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

createIndex("index-1");
createIndex("index-2");
createIndex("index-3");

IndicesAliasesRequest request = new IndicesAliasesRequest(); // <1>
IndicesAliasesRequest.AliasActions remoteIndexAction = new IndicesAliasesRequest.AliasActions(
IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX
).index("index-1");
IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index("index-2")
.alias("index-1");
request.addAliasAction(remoteIndexAction);
request.addAliasAction(aliasAction);

assertAcked(client().admin().indices().aliases(request).actionGet());
// assert here that NodeStats.discovery.remote_diff_download.failed_count is 0 for any/all node
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.nodesStats(new NodesStatsRequest().addMetric(DISCOVERY.metricName()))
.actionGet();
for (NodeStats node : nodesStatsResponse.getNodes()) {
List<PersistedStateStats> persistenceStats = node.getDiscoveryStats().getClusterStateStats().getPersistenceStats();
for (PersistedStateStats persistedStateStats : persistenceStats) {
String statsName = persistedStateStats.getStatsName();
if (FULL_DOWNLOAD_STATS.equals(statsName) || DIFF_DOWNLOAD_STATS.equals(statsName)) {
assertEquals(0, persistedStateStats.getFailedCount());
}
}
}
ensureGreen(INDEX_NAME);
}

private void assertDataNodeDownloadStats(NodeStats nodeStats) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodeStats.getDiscoveryStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -1202,6 +1203,51 @@
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead,
boolean readIndexRoutingTableDiff,
boolean includeEphemeral
) {
return readClusterStateInParallel(
previousState,
manifest,
clusterUUID,
localNodeId,
indicesToRead,
customToRead,
readCoordinationMetadata,
readSettingsMetadata,
readTransientSettingsMetadata,
readTemplatesMetadata,
readDiscoveryNodes,
readClusterBlocks,
indicesRoutingToRead,
readHashesOfConsistentSettings,
clusterStateCustomToRead,
readIndexRoutingTableDiff,
includeEphemeral,
(metadataBuilder) -> {},
(routingTable) -> {}
);
}

// package private for testing
ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
String clusterUUID,
String localNodeId,
List<UploadedIndexMetadata> indicesToRead,
Map<String, UploadedMetadataAttribute> customToRead,
boolean readCoordinationMetadata,
boolean readSettingsMetadata,
boolean readTransientSettingsMetadata,
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks,
List<UploadedIndexMetadata> indicesRoutingToRead,
boolean readHashesOfConsistentSettings,
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead,
boolean readIndexRoutingTableDiff,
boolean includeEphemeral,
Consumer<Metadata.Builder> metadataTransformer,
Consumer<RoutingTable> routingTableTransformer
) {
int totalReadTasks = indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata
? 1
Expand Down Expand Up @@ -1467,12 +1513,11 @@
});

metadataBuilder.indices(indexMetadataMap);
metadataTransformer.accept(metadataBuilder);
if (readDiscoveryNodes) {
clusterStateBuilder.nodes(discoveryNodesBuilder.get().localNodeId(localNodeId));
}

clusterStateBuilder.metadata(metadataBuilder).version(manifest.getStateVersion()).stateUUID(manifest.getStateUUID());

readIndexRoutingTableResults.forEach(
indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable)
);
Expand All @@ -1481,8 +1526,12 @@
if (routingTableDiff != null) {
newRoutingTable = routingTableDiff.apply(previousState.getRoutingTable());
}
clusterStateBuilder.routingTable(newRoutingTable);
routingTableTransformer.accept(newRoutingTable);

clusterStateBuilder.metadata(metadataBuilder)
.routingTable(newRoutingTable)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID());
return clusterStateBuilder.build();
}

Expand Down Expand Up @@ -1638,41 +1687,40 @@
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
includeEphemeral
includeEphemeral,
(metadataBuilder) -> {
// remove the deleted indices from the metadata
for (String index : diff.getIndicesDeleted()) {
metadataBuilder.remove(index);
}

Check warning on line 1695 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1694-L1695

Added lines #L1694 - L1695 were not covered by tests
// remove the deleted metadata customs from the metadata
if (diff.getCustomMetadataDeleted() != null) {
for (String customType : diff.getCustomMetadataDeleted()) {
metadataBuilder.removeCustom(customType);
}
}
},
(routingTable) -> {
Map<String, IndexRoutingTable> indexRoutingTables = routingTable.getIndicesRouting();
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
}

Check warning on line 1708 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1707-L1708

Added lines #L1707 - L1708 were not covered by tests
}
}
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
// remove the deleted indices from the metadata
for (String index : diff.getIndicesDeleted()) {
metadataBuilder.remove(index);
}
// remove the deleted metadata customs from the metadata
if (diff.getCustomMetadataDeleted() != null) {
for (String customType : diff.getCustomMetadataDeleted()) {
metadataBuilder.removeCustom(customType);
}
}

// remove the deleted cluster state customs from the metadata
if (diff.getClusterStateCustomDeleted() != null) {
for (String customType : diff.getClusterStateCustomDeleted()) {
clusterStateBuilder.removeCustom(customType);
}
}

HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(
updatedClusterState.getRoutingTable().getIndicesRouting()
);
if (manifest.getCodecVersion() == CODEC_V2 || manifest.getCodecVersion() == CODEC_V3) {
for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
}
}

ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.metadata(updatedClusterState.metadata())
.routingTable(updatedClusterState.routingTable())
.build();
if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
&& manifest.getClusterStateChecksum() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3512,7 +3512,9 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException {
anyBoolean(),
eq(emptyMap()),
anyBoolean(),
anyBoolean()
anyBoolean(),
any(),
any()
);
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);

Expand Down Expand Up @@ -3554,7 +3556,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio
anyBoolean(),
eq(emptyMap()),
anyBoolean(),
anyBoolean()
anyBoolean(),
any(),
any()
);
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);

Expand Down Expand Up @@ -3596,7 +3600,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I
anyBoolean(),
eq(emptyMap()),
anyBoolean(),
anyBoolean()
anyBoolean(),
any(),
any()
);
mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
verify(mockService, times(1)).validateClusterStateFromChecksum(
Expand Down Expand Up @@ -3637,7 +3643,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I
anyBoolean(),
eq(emptyMap()),
anyBoolean(),
anyBoolean()
anyBoolean(),
any(),
any()
);
doReturn(clusterState).when(mockService)
.readClusterStateInParallel(
Expand All @@ -3657,7 +3665,9 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I
eq(true),
eq(manifest.getClusterStateCustomMap()),
eq(false),
eq(true)
eq(true),
any(),
any()
);

mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID);
Expand Down Expand Up @@ -3699,7 +3709,9 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
anyBoolean(),
eq(emptyMap()),
anyBoolean(),
anyBoolean()
anyBoolean(),
any(),
any()
);
doReturn(clusterState).when(mockService)
.readClusterStateInParallel(
Expand All @@ -3719,7 +3731,9 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
eq(true),
eq(manifest.getClusterStateCustomMap()),
eq(false),
eq(true)
eq(true),
any(),
any()
);

expectThrows(IllegalStateException.class, () -> mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID));
Expand Down
Loading