Skip to content

Commit

Permalink
Make Remote Publication a dynamic setting (opensearch-project#15937)
Browse files Browse the repository at this point in the history
* Make Remote Publication a dynamic setting

Signed-off-by: Shivansh Arora <[email protected]>
Co-authored-by: Sooraj Sinha <[email protected]>
  • Loading branch information
2 people authored and dk2k committed Oct 17, 2024
1 parent 675faf3 commit 2e59425
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 55 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -81,7 +80,6 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
Expand All @@ -105,14 +103,6 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
}

public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteClusterStateService remoteClusterStateService;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;
private final ClusterSettings clusterSettings;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -315,6 +316,7 @@ public Coordinator(
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
this.clusterSettings = clusterSettings;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -1364,7 +1366,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
this.isRemotePublicationEnabled(),
persistedStateRegistry
);
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());
Expand Down Expand Up @@ -1893,8 +1895,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) {
}

public boolean isRemotePublicationEnabled() {
if (coordinationState.get() != null) {
return coordinationState.get().isRemotePublicationEnabled();
if (remoteClusterStateService != null) {
return remoteClusterStateService.isRemotePublicationEnabled();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;

/**
* A Service which provides APIs to upload and download routing table from remote store.
Expand All @@ -76,7 +76,7 @@ public InternalRemoteRoutingTableService(
ThreadPool threadpool,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
assert isRemoteRoutingTableConfigured(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadpool;
Expand Down Expand Up @@ -234,7 +234,7 @@ protected void doClose() throws IOException {

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;

/**
* Factory to provide impl for RemoteRoutingTableService based on settings.
Expand All @@ -37,7 +37,7 @@ public static RemoteRoutingTableService getService(
ThreadPool threadPool,
String clusterName
) {
if (isRemoteRoutingTableEnabled(settings)) {
if (isRemoteRoutingTableConfigured(settings)) {
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
}
return new NoopRemoteRoutingTableService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -112,6 +113,8 @@
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand All @@ -132,7 +135,7 @@ public class RemoteClusterStateService implements Closeable {
REMOTE_PUBLICATION_SETTING_KEY,
false,
Property.NodeScope,
Property.Final
Property.Dynamic
);

/**
Expand Down Expand Up @@ -232,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private volatile AtomicBoolean isPublicationEnabled;
private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -273,9 +276,12 @@ public RemoteClusterStateService(
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.isPublicationEnabled = new AtomicBoolean(
clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured(settings)
);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
Expand Down Expand Up @@ -303,19 +309,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
return null;
}

boolean publicationEnabled = isPublicationEnabled.get();
UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled),
true,
true,
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled ? clusterState.customs() : Collections.emptyMap(),
publicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
null
);
Expand Down Expand Up @@ -394,9 +401,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get());
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
Expand Down Expand Up @@ -461,10 +468,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

final boolean updateDiscoveryNodes = isPublicationEnabled
final boolean updateDiscoveryNodes = isPublicationEnabled.get()
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get()
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
Expand Down Expand Up @@ -1115,6 +1122,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isPublicationEnabled.set(false);
} else {
this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableConfigured(settings));
}
}

// Package private for unit test
RemoteRoutingTableService getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
Expand Down Expand Up @@ -1830,7 +1845,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
}

public boolean isRemotePublicationEnabled() {
return this.isPublicationEnabled;
return this.isPublicationEnabled.get();
}

public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class RemotePersistenceStats {
RemoteDownloadStats remoteDiffDownloadStats;
RemoteDownloadStats remoteFullDownloadStats;

final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
public static final String FULL_DOWNLOAD_STATS = "remote_full_download";
public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
remoteUploadStats = new RemoteUploadStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
.isEmpty() == false;
}

public static boolean isRemoteRoutingTableEnabled(Settings settings) {
public static boolean isRemoteRoutingTableConfigured(Settings settings) {
return isRemoteRoutingTableAttributePresent(settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand Down Expand Up @@ -1268,16 +1267,6 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe
verifyNoInteractions(remoteClusterStateService);
}

public void testIsRemotePublicationEnabled_WithInconsistentSettings() {
// create settings with remote state disabled but publication enabled
Settings settings = Settings.builder()
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.build();
CoordinationState coordinationState = createCoordinationState(psr1, node1, settings);
assertFalse(coordinationState.isRemotePublicationEnabled());
}

public static CoordinationState createCoordinationState(
PersistedStateRegistry persistedStateRegistry,
DiscoveryNode localNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
// TODO Make the publication flag parameterized
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
.customs(
Map.of(
Expand Down Expand Up @@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down Expand Up @@ -2749,7 +2755,7 @@ public void testRemoteStateUploadStats() throws IOException {
}

public void testRemoteRoutingTableNotInitializedWhenDisabled() {
if (isRemoteRoutingTableEnabled(settings)) {
if (isRemoteRoutingTableConfigured(settings)) {
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService);
} else {
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService);
Expand Down

0 comments on commit 2e59425

Please sign in to comment.