From d4754985a58c46baebd610b9d21f0231cde9d932 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 5 May 2025 16:31:08 -0700 Subject: [PATCH 01/16] [Experimental] Add a feature flag to start without joining a cluster This is a rework of the extent of core changes from my proof-of-concept for a "clusterless" OpenSearch. Everything else is implemented in a plugin. Essentially, if the flag is set, we avoid creating DiscoveryModule or anything that requires it, including GatewayService. We still create ClusterService, but do not initialize a ClusterManagerService. There are a few actions that rely on an injected Discovery instance, so those also need to be removed when the flag is set. Signed-off-by: Michael Froh --- .../org/opensearch/action/ActionModule.java | 21 ++-- .../cluster/service/ClusterService.java | 27 ++++- .../common/settings/FeatureFlagSettings.java | 4 +- .../opensearch/common/util/FeatureFlags.java | 14 ++- .../cluster/IndicesClusterStateService.java | 37 +++++- .../main/java/org/opensearch/node/Node.java | 111 ++++++++++++------ .../java/org/opensearch/node/NodeService.java | 3 + .../common/util/FeatureFlagTests.java | 34 ++++++ 8 files changed, 193 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 67a86db37e790..f27905de32d8e 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -653,8 +653,10 @@ public void reg actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); - actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class); - actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { + actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class); + actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); + } actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); @@ -677,7 +679,9 @@ public void reg actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); - actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { + actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); + } actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); actions.register(ResizeAction.INSTANCE, TransportResizeAction.class); actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class); @@ -790,11 +794,12 @@ public void reg actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); // Dangling indices - actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class); - actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); - actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); - actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); - + if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { + actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class); + actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); + actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); + actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + } // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 05d478bbb9df1..1342bd2801b62 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -51,6 +51,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; @@ -105,7 +106,9 @@ public ClusterService( this( settings, clusterSettings, - new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics), + FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) + ? null + : new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) ); } @@ -144,18 +147,24 @@ public RerouteService getRerouteService() { @Override protected synchronized void doStart() { clusterApplierService.start(); - clusterManagerService.start(); + if (clusterManagerService != null) { + clusterManagerService.start(); + } } @Override protected synchronized void doStop() { - clusterManagerService.stop(); + if (clusterManagerService != null) { + clusterManagerService.stop(); + } clusterApplierService.stop(); } @Override protected synchronized void doClose() { - clusterManagerService.close(); + if (clusterManagerService != null) { + clusterManagerService.close(); + } clusterApplierService.close(); } @@ -305,7 +314,10 @@ public final String getNodeName() { * @return throttling task key which needs to be passed while submitting task to cluster manager */ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { - return clusterManagerService.registerClusterManagerTask(task, throttlingEnabled); + if (clusterManagerService != null) { + return clusterManagerService.registerClusterManagerTask(task, throttlingEnabled); + } + return null; } /** @@ -372,6 +384,11 @@ public void submitStateUpdateTasks( final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor ) { + if (clusterManagerService == null) { + throw new UnsupportedOperationException( + "Cannot submit cluster state update tasks when cluster manager service is not available" + ); + } clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); } } diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index a53debf564ce4..02f4516176605 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -38,6 +38,8 @@ protected FeatureFlagSettings( FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING, FeatureFlags.ARROW_STREAMS_SETTING, - FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, + FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING, + FeatureFlags.CLUSTERLESS_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index e88b99347800e..96f14318af585 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -68,7 +68,7 @@ public class FeatureFlags { * Gates the functionality of warm index having the capability to store data remotely. * Once the feature is ready for release, this feature flag can be removed. */ - public static final String WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG = "opensearch.experimental.feature.writable_warm_index.enabled"; + public static final String WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG = FEATURE_FLAG_PREFIX + "writable_warm_index.enabled"; /** * Gates the functionality of background task execution. @@ -79,7 +79,7 @@ public class FeatureFlags { * Gates the functionality of merged segment warmer in local/remote segment replication. * Once the feature is ready for release, this feature flag can be removed. */ - public static final String MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG = "opensearch.experimental.feature.merged_segment_warmer.enabled"; + public static final String MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG = FEATURE_FLAG_PREFIX + "merged_segment_warmer.enabled"; public static final Setting REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( REMOTE_STORE_MIGRATION_EXPERIMENTAL, @@ -129,6 +129,12 @@ public class FeatureFlags { public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled"; public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); + /** + * Allows the node to run without being part of a cluster. Eventually, we might move this to a normal cluster setting. + */ + public static final String CLUSTERLESS_FLAG = FEATURE_FLAG_PREFIX + "clusterless.enabled"; + public static final Setting CLUSTERLESS_SETTING = Setting.boolSetting(CLUSTERLESS_FLAG, false, Property.NodeScope); + /** * Underlying implementation for feature flags. * All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags. @@ -139,7 +145,8 @@ static class FeatureFlagsImpl { private static final String TEST_FLAG = "test.flag.enabled"; private static final Setting TEST_FLAG_SETTING = Setting.boolSetting(TEST_FLAG, false, Property.NodeScope); - private final ConcurrentHashMap, Boolean> featureFlags = new ConcurrentHashMap<>() { + // Visible for testing + final ConcurrentHashMap, Boolean> featureFlags = new ConcurrentHashMap<>() { { put(TEST_FLAG_SETTING, TEST_FLAG_SETTING.get(Settings.EMPTY)); put(REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); @@ -158,6 +165,7 @@ static class FeatureFlagsImpl { SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING.getDefault(Settings.EMPTY) ); put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); + put(CLUSTERLESS_SETTING, CLUSTERLESS_SETTING.getDefault(Settings.EMPTY)); } }; diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index bba3031d66bac..d2432f32a4dc3 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.routing.RoutingNode; @@ -242,7 +243,7 @@ public IndicesClusterStateService( this.clusterService = clusterService; this.threadPool = threadPool; this.recoveryTargetService = recoveryTargetService; - this.shardStateAction = shardStateAction; + this.shardStateAction = clusterService.getClusterManagerService() == null ? null : shardStateAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; @@ -830,7 +831,39 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + if (shardStateAction != null) { + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + recoveryState.getRecoverySource(), + SHARD_STATE_ACTION_LISTENER + ); + } else { + // We're running in "clusterless" mode. Apply the state change directly to the local cluster state. + ClusterState clusterState = clusterService.state(); + RoutingTable routingTable = clusterState.getRoutingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); + + ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder( + indexShardRoutingTable + ); + indexShardRoutingTableBuilder.removeShard(shardRouting); + indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } else { + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); + } + } + routingTableBuilder.add(indexRoutingTableBuilder); + clusterStateBuilder.routingTable(routingTableBuilder.build()); + clusterService.getClusterApplierService() + .onNewClusterState("shard-started " + shardRouting.shardId(), clusterStateBuilder::build, (s, e) -> {}); + } } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a5e92293c0be1..4155f4575ffd9 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,7 +43,6 @@ import org.opensearch.Version; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; -import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.admin.indices.view.ViewService; import org.opensearch.action.search.SearchExecutionStatsCollector; @@ -54,7 +53,6 @@ import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTaskRequestOperationsListener; import org.opensearch.action.search.SearchTransportService; -import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.arrow.spi.StreamManager; import org.opensearch.bootstrap.BootstrapCheck; @@ -81,6 +79,7 @@ import org.opensearch.cluster.metadata.TemplateUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.BatchedRerouteService; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; @@ -325,6 +324,7 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; import static org.opensearch.common.util.FeatureFlags.BACKGROUND_TASK_EXECUTION_EXPERIMENTAL; +import static org.opensearch.common.util.FeatureFlags.CLUSTERLESS_FLAG; import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; @@ -792,7 +792,9 @@ protected Node(final Environment initialEnvironment, Collection clas plugin.setCircuitBreaker(breaker); }); resourcesToClose.add(circuitBreakerService); - modules.add(new GatewayModule()); + if (!FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + modules.add(new GatewayModule()); + } PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); @@ -1347,27 +1349,31 @@ protected Node(final Environment initialEnvironment, Collection clas ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); - final DiscoveryModule discoveryModule = new DiscoveryModule( - settings, - threadPool, - transportService, - namedWriteableRegistry, - networkService, - clusterService.getClusterManagerService(), - clusterService.getClusterApplierService(), - clusterService.getClusterSettings(), - pluginsService.filterPlugins(DiscoveryPlugin.class), - clusterModule.getAllocationService(), - environment.configDir(), - gatewayMetaState, - rerouteService, - fsHealthService, - persistedStateRegistry, - remoteStoreNodeService, - clusterManagerMetrics, - remoteClusterStateService - ); - + final DiscoveryModule discoveryModule; + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + discoveryModule = null; + } else { + discoveryModule = new DiscoveryModule( + settings, + threadPool, + transportService, + namedWriteableRegistry, + networkService, + clusterService.getClusterManagerService(), + clusterService.getClusterApplierService(), + clusterService.getClusterSettings(), + pluginsService.filterPlugins(DiscoveryPlugin.class), + clusterModule.getAllocationService(), + environment.configDir(), + gatewayMetaState, + rerouteService, + fsHealthService, + persistedStateRegistry, + remoteStoreNodeService, + clusterManagerMetrics, + remoteClusterStateService + ); + } final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, threadPool, @@ -1392,7 +1398,7 @@ protected Node(final Environment initialEnvironment, Collection clas settings, threadPool, monitorService, - discoveryModule.getDiscovery(), + discoveryModule == null ? null : discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, @@ -1535,7 +1541,9 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); - b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); + if (discoveryModule != null) { + b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); + } b.bind(RemoteStoreSettings.class).toInstance(remoteStoreSettings); { b.bind(PeerRecoverySourceService.class) @@ -1713,10 +1721,15 @@ public Node start() throws NodeValidationException { nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); - injector.getInstance(GatewayService.class).start(); - Discovery discovery = injector.getInstance(Discovery.class); - discovery.setNodeConnectionsService(nodeConnectionsService); - clusterService.getClusterManagerService().setClusterStatePublisher(discovery::publish); + final Discovery discovery; + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + discovery = null; + } else { + injector.getInstance(GatewayService.class).start(); + discovery = injector.getInstance(Discovery.class); + discovery.setNodeConnectionsService(nodeConnectionsService); + clusterService.getClusterManagerService().setClusterStatePublisher(discovery); + } // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); @@ -1789,14 +1802,26 @@ public Node start() throws NodeValidationException { ); clusterService.addStateApplier(transportService.getTaskManager()); - // start after transport service so the local disco is known - discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService + + if (discovery != null) { + // start after transport service so the local disco is known + discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService + } else { + // We're running in clusterless mode, so we manually set the initial state to only include this node. + DiscoveryNode localNode = localNodeFactory.getNode(); + ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build()) + .build(); + clusterService.getClusterApplierService().setInitialState(bootstrapClusterState); + } clusterService.start(); this.autoForceMergeManager.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); - discovery.startInitialJoin(); + if (discovery != null) { + discovery.startInitialJoin(); + } final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); @@ -1873,7 +1898,9 @@ private Node stop() { injector.getInstance(IndicesClusterStateService.class).stop(); // close discovery early to not react to pings anymore. // This can confuse other nodes and delay things - mostly if we're the cluster manager and we're running tests. - injector.getInstance(Discovery.class).stop(); + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { + injector.getInstance(Discovery.class).stop(); + } // we close indices first, so operations won't be allowed on it injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); @@ -1883,7 +1910,9 @@ private Node stop() { injector.getInstance(WorkloadGroupService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); - injector.getInstance(GatewayService.class).stop(); + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { + injector.getInstance(GatewayService.class).stop(); + } injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); @@ -1938,8 +1967,10 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); toClose.add(injector.getInstance(NodeConnectionsService.class)); - toClose.add(() -> stopWatch.stop().start("discovery")); - toClose.add(injector.getInstance(Discovery.class)); + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { + toClose.add(() -> stopWatch.stop().start("discovery")); + toClose.add(injector.getInstance(Discovery.class)); + } toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); toClose.add(nodeService.getSearchBackpressureService()); @@ -1949,8 +1980,10 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(NodeResourceUsageTracker.class)); toClose.add(() -> stopWatch.stop().start("resource_usage_collector")); toClose.add(injector.getInstance(ResourceUsageCollectorService.class)); - toClose.add(() -> stopWatch.stop().start("gateway")); - toClose.add(injector.getInstance(GatewayService.class)); + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { + toClose.add(() -> stopWatch.stop().start("gateway")); + toClose.add(injector.getInstance(GatewayService.class)); + } toClose.add(() -> stopWatch.stop().start("search")); toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 9671fda14375d..8c9db76522d45 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -44,6 +44,7 @@ import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.discovery.Discovery; @@ -245,6 +246,8 @@ public NodeStats stats( boolean cacheService, boolean remoteStoreNodeStats ) { + discoveryStats = discoveryStats && FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false; + clusterManagerThrottling = clusterManagerThrottling && FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false; // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats( diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index f3751e98f5b60..21f591c9ecd31 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -9,9 +9,14 @@ package org.opensearch.common.util; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.FeatureFlagSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; +import java.lang.reflect.Field; +import java.util.Set; + import static org.opensearch.common.util.FeatureFlags.FEATURE_FLAG_PREFIX; public class FeatureFlagTests extends OpenSearchTestCase { @@ -42,6 +47,35 @@ public void testFeatureFlagFromSettings() { assertFalse(testFlagsImpl.isEnabled(TEST_FLAG)); } + /** + * Checks that all feature flags declared in the FeatureFlags class are registered in the map in + * FeatureFlagsImpl. (It's so easy to forget to register a new feature flag!) + */ + @SuppressWarnings("unchecked") + public void testFeatureFlagsAreAllRegistered() { + FeatureFlags.FeatureFlagsImpl testFlagsImpl = new FeatureFlags.FeatureFlagsImpl(); + testFlagsImpl.initializeFeatureFlags(Settings.EMPTY); + Set> registeredFeatureFlagKeys = testFlagsImpl.featureFlags.keySet(); + Field[] fields = FeatureFlags.class.getFields(); + for (Field field : fields) { + if (field.getType() == Setting.class) { + try { + Setting setting = (Setting) field.get(null); + assertTrue( + "Feature flag " + setting.getKey() + " is not registered in FeatureFlagsImpl", + registeredFeatureFlagKeys.contains(setting) + ); + assertTrue( + "Feature flag " + setting.getKey() + " is not registered in FeatureFlagSettings", + FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS.contains(setting) + ); + } catch (IllegalAccessException e) { + fail("Failed to access field: " + field.getName()); + } + } + } + } + @SuppressForbidden(reason = "Testing system property functionality") private void setSystemPropertyTrue(String key) { System.setProperty(key, "true"); From 41f9ada9f8cdba67da5fd42a72f96a92fc03280c Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 9 Jun 2025 16:42:56 -0700 Subject: [PATCH 02/16] Add changelog entry Signed-off-by: Michael Froh --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d04c01edc47d9..65775e6eb39f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708)) - Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697)) - Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663)) +- [Experimental] Add a feature flag to start OpenSearch without joining a cluster ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) From fa30da5815ad2c0466f5b35743393bc260493f2d Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 9 Jun 2025 16:57:56 -0700 Subject: [PATCH 03/16] Add back mistakenly removed imports Signed-off-by: Michael Froh --- server/src/main/java/org/opensearch/node/Node.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4155f4575ffd9..d93afb26bfb2d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,6 +43,7 @@ import org.opensearch.Version; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; +import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.admin.indices.view.ViewService; import org.opensearch.action.search.SearchExecutionStatsCollector; @@ -53,6 +54,7 @@ import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTaskRequestOperationsListener; import org.opensearch.action.search.SearchTransportService; +import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.arrow.spi.StreamManager; import org.opensearch.bootstrap.BootstrapCheck; From 21143b711a4c97a29142aad691d42bbac3616adf Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 10:09:34 -0700 Subject: [PATCH 04/16] Fix IndicesClusterStateServiceRandomUpdatesTests Test was failing because I was assuming that not having ClusterManagerService is sufficient to tell when you're running in clusterless mode. In the unit tests, though, we use a mock ClusterService, which doesn't have a ClusterManagerService. I could have changed the unit test to return a mock ClusterManagerService, but that feels like an extra layer of complexity, when we could just check the feature flag instead. Signed-off-by: Michael Froh --- .../cluster/IndicesClusterStateService.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index d2432f32a4dc3..fa7d1110e8a73 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -57,6 +57,7 @@ import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; @@ -243,7 +244,7 @@ public IndicesClusterStateService( this.clusterService = clusterService; this.threadPool = threadPool; this.recoveryTargetService = recoveryTargetService; - this.shardStateAction = clusterService.getClusterManagerService() == null ? null : shardStateAction; + this.shardStateAction = shardStateAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; @@ -831,14 +832,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - if (shardStateAction != null) { - shardStateAction.shardStarted( - shardRouting, - primaryTerm, - "after " + recoveryState.getRecoverySource(), - SHARD_STATE_ACTION_LISTENER - ); - } else { + if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG)) { // We're running in "clusterless" mode. Apply the state change directly to the local cluster state. ClusterState clusterState = clusterService.state(); RoutingTable routingTable = clusterState.getRoutingTable(); @@ -863,6 +857,13 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting clusterStateBuilder.routingTable(routingTableBuilder.build()); clusterService.getClusterApplierService() .onNewClusterState("shard-started " + shardRouting.shardId(), clusterStateBuilder::build, (s, e) -> {}); + } else { + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + recoveryState.getRecoverySource(), + SHARD_STATE_ACTION_LISTENER + ); } } From fb9f22f9a33796dfc541c9334a008496174912a4 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 11:05:55 -0700 Subject: [PATCH 05/16] Replace a couple of flag checks with field null-checks I'd like to try to isolate use of the clusterless feature flag to the Node class. Removing these checks gets me a little closer. Signed-off-by: Michael Froh --- server/src/main/java/org/opensearch/node/NodeService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 8c9db76522d45..971ccba061b2a 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -44,7 +44,6 @@ import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.discovery.Discovery; @@ -246,8 +245,8 @@ public NodeStats stats( boolean cacheService, boolean remoteStoreNodeStats ) { - discoveryStats = discoveryStats && FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false; - clusterManagerThrottling = clusterManagerThrottling && FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false; + discoveryStats = discoveryStats && discovery != null; + clusterManagerThrottling = clusterManagerThrottling && clusterService.getClusterManagerService() != null; // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats( From 577c389ae8f6731df0856b801d5b855085057406 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 12:12:29 -0700 Subject: [PATCH 06/16] Extend ClusterService and ShardStateAction Instead of peppering ClusterService and IndicesClusterStateService with conditional checks for the clusterless feature flag, we can use polymorphism to get clusterless behavior. Signed-off-by: Michael Froh --- .../org/opensearch/cluster/ClusterModule.java | 5 +- .../action/shard/LocalShardStateAction.java | 78 +++++++++++++++++++ .../action/shard/ShardStateAction.java | 2 +- .../cluster/service/ClusterService.java | 32 +++----- .../cluster/service/LocalClusterService.java | 72 +++++++++++++++++ .../cluster/IndicesClusterStateService.java | 36 +-------- .../main/java/org/opensearch/node/Node.java | 13 ++-- 7 files changed, 173 insertions(+), 65 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 7dbc0d0ee88e3..67952709e92c9 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -474,7 +474,10 @@ protected void configure() { bind(MetadataIndexTemplateService.class).asEagerSingleton(); bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); - bind(ShardStateAction.class).asEagerSingleton(); + @SuppressWarnings("unchecked") + Class shardStateActionClass = (Class) clusterService + .getShardStateActionClass(); + bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton(); bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(TaskResultsService.class).asEagerSingleton(); diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java new file mode 100644 index 0000000000000..d4c5774a13ee3 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.action.shard; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RerouteService; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +public class LocalShardStateAction extends ShardStateAction { + @Inject + public LocalShardStateAction( + ClusterService clusterService, + TransportService transportService, + AllocationService allocationService, + RerouteService rerouteService, + ThreadPool threadPool + ) { + super(clusterService, transportService, allocationService, rerouteService, threadPool); + } + + @Override + public void shardStarted( + ShardRouting shardRouting, + long primaryTerm, + String message, + ActionListener listener, + ClusterState currentState + ) { + // We're running in clusterless mode. Apply the state change directly to the local cluster state. + ClusterState clusterState = clusterService.state(); + RoutingTable routingTable = clusterState.getRoutingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); + + ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable); + indexShardRoutingTableBuilder.removeShard(shardRouting); + indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } else { + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); + } + } + routingTableBuilder.add(indexRoutingTableBuilder); + clusterStateBuilder.routingTable(routingTableBuilder.build()); + clusterService.getClusterApplierService() + .onNewClusterState("shard-started " + shardRouting.shardId(), clusterStateBuilder::build, (s, e) -> {}); + } + + @Override + public void localShardFailed( + ShardRouting shardRouting, + String message, + Exception failure, + ActionListener listener, + ClusterState currentState + ) { + // Do not send a failure to the cluster manager, as we are running in clusterless mode. + } +} diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java index cb5749a91d448..6850c71532c50 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java @@ -128,7 +128,7 @@ private static Priority parseReroutePriority(String priorityString) { } private final TransportService transportService; - private final ClusterService clusterService; + final ClusterService clusterService; private final ThreadPool threadPool; private volatile Priority followUpRerouteTaskPriority; diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 1342bd2801b62..24da0df194ce8 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.RerouteService; @@ -51,7 +52,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; @@ -106,9 +106,7 @@ public ClusterService( this( settings, clusterSettings, - FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) - ? null - : new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics), + new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) ); } @@ -147,24 +145,18 @@ public RerouteService getRerouteService() { @Override protected synchronized void doStart() { clusterApplierService.start(); - if (clusterManagerService != null) { - clusterManagerService.start(); - } + clusterManagerService.start(); } @Override protected synchronized void doStop() { - if (clusterManagerService != null) { - clusterManagerService.stop(); - } + clusterManagerService.stop(); clusterApplierService.stop(); } @Override protected synchronized void doClose() { - if (clusterManagerService != null) { - clusterManagerService.close(); - } + clusterManagerService.close(); clusterApplierService.close(); } @@ -314,10 +306,7 @@ public final String getNodeName() { * @return throttling task key which needs to be passed while submitting task to cluster manager */ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { - if (clusterManagerService != null) { - return clusterManagerService.registerClusterManagerTask(task, throttlingEnabled); - } - return null; + return clusterManagerService.registerClusterManagerTask(task, throttlingEnabled); } /** @@ -384,11 +373,10 @@ public void submitStateUpdateTasks( final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor ) { - if (clusterManagerService == null) { - throw new UnsupportedOperationException( - "Cannot submit cluster state update tasks when cluster manager service is not available" - ); - } clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); } + + public Class getShardStateActionClass() { + return ShardStateAction.class; + } } diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java new file mode 100644 index 0000000000000..fffdd684a579e --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.cluster.ClusterManagerMetrics; +import org.opensearch.cluster.ClusterStateTaskConfig; +import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.cluster.ClusterStateTaskListener; +import org.opensearch.cluster.action.shard.LocalShardStateAction; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.Node; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Map; + +public class LocalClusterService extends ClusterService { + public LocalClusterService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { + super( + settings, + clusterSettings, + null, + new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) + ); + } + + @Override + protected synchronized void doStart() { + getClusterApplierService().doStart(); + } + + @Override + protected synchronized void doStop() { + getClusterApplierService().doStop(); + } + + @Override + protected synchronized void doClose() { + getClusterApplierService().doClose(); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { + return null; + } + + @Override + public void submitStateUpdateTasks( + final String source, + final Map tasks, + final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor + ) { + throw new UnsupportedOperationException("Cannot submit cluster state update tasks when cluster manager service is not available"); + } + + @Override + public Class getShardStateActionClass() { + return LocalShardStateAction.class; + } +} diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index fa7d1110e8a73..bba3031d66bac 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -44,7 +44,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.routing.RoutingNode; @@ -57,7 +56,6 @@ import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; @@ -832,39 +830,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG)) { - // We're running in "clusterless" mode. Apply the state change directly to the local cluster state. - ClusterState clusterState = clusterService.state(); - RoutingTable routingTable = clusterState.getRoutingTable(); - IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); - - ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { - IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder( - indexShardRoutingTable - ); - indexShardRoutingTableBuilder.removeShard(shardRouting); - indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); - indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); - } else { - indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); - } - } - routingTableBuilder.add(indexRoutingTableBuilder); - clusterStateBuilder.routingTable(routingTableBuilder.build()); - clusterService.getClusterApplierService() - .onNewClusterState("shard-started " + shardRouting.shardId(), clusterStateBuilder::build, (s, e) -> {}); - } else { - shardStateAction.shardStarted( - shardRouting, - primaryTerm, - "after " + recoveryState.getRecoverySource(), - SHARD_STATE_ACTION_LISTENER - ); - } + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d93afb26bfb2d..1ae48406df34c 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -87,6 +87,7 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.LocalClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.cache.module.CacheModule; @@ -704,12 +705,12 @@ protected Node(final Environment initialEnvironment, Collection clas final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); - final ClusterService clusterService = new ClusterService( - settings, - settingsModule.getClusterSettings(), - threadPool, - clusterManagerMetrics - ); + final ClusterService clusterService; + if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + clusterService = new LocalClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); + } else { + clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); + } clusterService.addStateApplier(scriptService); resourcesToClose.add(clusterService); final Set> consistentSettings = settingsModule.getConsistentSettings(); From 2c125806ed1d6c4bdf8a4a63771f2437beaf5358 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 12:27:23 -0700 Subject: [PATCH 07/16] Add Javadoc Signed-off-by: Michael Froh --- .../cluster/action/shard/LocalShardStateAction.java | 4 ++++ .../java/org/opensearch/cluster/service/ClusterService.java | 3 +++ .../org/opensearch/cluster/service/LocalClusterService.java | 4 ++++ 3 files changed, 11 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java index d4c5774a13ee3..04abe5112f06c 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java @@ -21,6 +21,10 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +/** + * A local implementation of {@link ShardStateAction} that applies shard state changes directly to the + * local cluster state. This is used in clusterless mode, where there is no cluster manager. + */ public class LocalShardStateAction extends ShardStateAction { @Inject public LocalShardStateAction( diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 24da0df194ce8..c9d1f515063bc 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -376,6 +376,9 @@ public void submitStateUpdateTasks( clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); } + /** + * @return the class to use to handle shard state actions. Subclasses may provide a different implementation. + */ public Class getShardStateActionClass() { return ShardStateAction.class; } diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index fffdd684a579e..96524451e71ed 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -20,6 +20,10 @@ import java.util.Map; +/** + * A local implementation of {@link ClusterService} that assumes we have no cluster manager. + * This is used in clusterless mode. + */ public class LocalClusterService extends ClusterService { public LocalClusterService( Settings settings, From d0c678a69b00a2ba88effbdf06959208212fbf69 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 12:47:42 -0700 Subject: [PATCH 08/16] Cannot bind a class to itself Signed-off-by: Michael Froh --- .../src/main/java/org/opensearch/cluster/ClusterModule.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 67952709e92c9..0e4d23f2f2d72 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -477,7 +477,11 @@ protected void configure() { @SuppressWarnings("unchecked") Class shardStateActionClass = (Class) clusterService .getShardStateActionClass(); - bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton(); + if (shardStateActionClass == ShardStateAction.class) { + bind(ShardStateAction.class).asEagerSingleton(); + } else { + bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton(); + } bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(TaskResultsService.class).asEagerSingleton(); From e2ac28e58f46cb685e94d5864a80be14e72edd04 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 10 Jun 2025 13:46:12 -0700 Subject: [PATCH 09/16] Delegate to public AbstractLifecycleComponent methods Accidentally bypassed AbstractLifecycleComponent logic when delegating from LocalClusterService to ClusterApplierService. Signed-off-by: Michael Froh --- .../org/opensearch/cluster/service/LocalClusterService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index 96524451e71ed..0e634a62ee15d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -41,17 +41,17 @@ public LocalClusterService( @Override protected synchronized void doStart() { - getClusterApplierService().doStart(); + getClusterApplierService().start(); } @Override protected synchronized void doStop() { - getClusterApplierService().doStop(); + getClusterApplierService().stop(); } @Override protected synchronized void doClose() { - getClusterApplierService().doClose(); + getClusterApplierService().close(); } @Override From 98583a4bed65016043b03863dc7b9499fbeddee1 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 24 Jun 2025 13:48:43 -0700 Subject: [PATCH 10/16] Implement dummy Discovery and ClusterManagerService This reduces the number of components that need to be explicitly disabled when running in clusterless mode. Signed-off-by: Michael Froh --- .../org/opensearch/action/ActionModule.java | 20 ++--- .../cluster/service/LocalClusterService.java | 19 ++++- .../opensearch/discovery/LocalDiscovery.java | 82 +++++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 66 +++++---------- .../java/org/opensearch/node/NodeService.java | 2 - 5 files changed, 129 insertions(+), 60 deletions(-) create mode 100644 server/src/main/java/org/opensearch/discovery/LocalDiscovery.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index f27905de32d8e..491a9e2edc7bb 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -653,10 +653,8 @@ public void reg actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); - if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { - actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class); - actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); - } + actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class); + actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); @@ -679,9 +677,7 @@ public void reg actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); - if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { - actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); - } + actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); actions.register(ResizeAction.INSTANCE, TransportResizeAction.class); actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class); @@ -794,12 +790,10 @@ public void reg actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class); // Dangling indices - if (FeatureFlags.isEnabled(FeatureFlags.CLUSTERLESS_FLAG) == false) { - actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class); - actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); - actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); - actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); - } + actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class); + actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); + actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); + actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index 0e634a62ee15d..563e722f88224 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.action.shard.LocalShardStateAction; +import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.Node; @@ -25,6 +26,22 @@ * This is used in clusterless mode. */ public class LocalClusterService extends ClusterService { + private static class DummyClusterManagerService extends ClusterManagerService { + private static final ClusterManagerThrottlingStats EMPTY_THROTTLING_STATS = new ClusterManagerThrottlingStats(); + + public DummyClusterManagerService(Settings settings, ClusterSettings clusterSettings) { + super(settings, clusterSettings, null, null); + } + + @Override + public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {} + + @Override + public ClusterManagerThrottlingStats getThrottlingStats() { + return EMPTY_THROTTLING_STATS; + } + } + public LocalClusterService( Settings settings, ClusterSettings clusterSettings, @@ -34,7 +51,7 @@ public LocalClusterService( super( settings, clusterSettings, - null, + new DummyClusterManagerService(settings, clusterSettings), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) ); } diff --git a/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java b/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java new file mode 100644 index 0000000000000..d0090f5b4e7f4 --- /dev/null +++ b/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.discovery; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.coordination.PendingClusterStateStats; +import org.opensearch.cluster.coordination.PublishClusterStateStats; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterApplier; +import org.opensearch.cluster.service.ClusterStateStats; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Clusterless implementation of Discovery. This is only able to "discover" the local node. + */ +public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { + private static final DiscoveryStats EMPTY_STATS = new DiscoveryStats( + new PendingClusterStateStats(0, 0, 0), + new PublishClusterStateStats(0, 0, 0), + new ClusterStateStats() + ); + private final TransportService transportService; + private final ClusterApplier clusterApplier; + + public LocalDiscovery(TransportService transportService, ClusterApplier clusterApplier) { + this.transportService = transportService; + this.clusterApplier = clusterApplier; + } + + @Override + public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener) { + // In clusterless mode, we should never be asked to publish a cluster state. + throw new UnsupportedOperationException("Should not be called in clusterless mode"); + } + + @Override + protected void doStart() { + DiscoveryNode localNode = transportService.getLocalNode(); + ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build()) + .build(); + clusterApplier.setInitialState(bootstrapClusterState); + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public DiscoveryStats stats() { + return EMPTY_STATS; + } + + @Override + public void startInitialJoin() { + + } + + @Override + public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { + + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1ae48406df34c..7d91f33c87b03 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -81,7 +81,6 @@ import org.opensearch.cluster.metadata.TemplateUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.BatchedRerouteService; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; @@ -132,6 +131,7 @@ import org.opensearch.crypto.CryptoHandlerRegistry; import org.opensearch.discovery.Discovery; import org.opensearch.discovery.DiscoveryModule; +import org.opensearch.discovery.LocalDiscovery; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeMetadata; @@ -795,9 +795,7 @@ protected Node(final Environment initialEnvironment, Collection clas plugin.setCircuitBreaker(breaker); }); resourcesToClose.add(circuitBreakerService); - if (!FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { - modules.add(new GatewayModule()); - } + modules.add(new GatewayModule()); PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); @@ -1397,11 +1395,16 @@ protected Node(final Environment initialEnvironment, Collection clas transportService.getTaskManager(), taskCancellationMonitoringSettings ); + + Discovery discovery = discoveryModule == null + ? new LocalDiscovery(transportService, clusterService.getClusterApplierService()) + : discoveryModule.getDiscovery(); + this.nodeService = new NodeService( settings, threadPool, monitorService, - discoveryModule == null ? null : discoveryModule.getDiscovery(), + discovery, transportService, indicesService, pluginsService, @@ -1544,9 +1547,7 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); - if (discoveryModule != null) { - b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); - } + b.bind(Discovery.class).toInstance(discovery); b.bind(RemoteStoreSettings.class).toInstance(remoteStoreSettings); { b.bind(PeerRecoverySourceService.class) @@ -1725,14 +1726,10 @@ public Node start() throws NodeValidationException { clusterService.setNodeConnectionsService(nodeConnectionsService); final Discovery discovery; - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { - discovery = null; - } else { - injector.getInstance(GatewayService.class).start(); - discovery = injector.getInstance(Discovery.class); - discovery.setNodeConnectionsService(nodeConnectionsService); - clusterService.getClusterManagerService().setClusterStatePublisher(discovery); - } + injector.getInstance(GatewayService.class).start(); + discovery = injector.getInstance(Discovery.class); + discovery.setNodeConnectionsService(nodeConnectionsService); + clusterService.getClusterManagerService().setClusterStatePublisher(discovery); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); @@ -1806,25 +1803,14 @@ public Node start() throws NodeValidationException { clusterService.addStateApplier(transportService.getTaskManager()); - if (discovery != null) { - // start after transport service so the local disco is known - discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService - } else { - // We're running in clusterless mode, so we manually set the initial state to only include this node. - DiscoveryNode localNode = localNodeFactory.getNode(); - ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build()) - .build(); - clusterService.getClusterApplierService().setInitialState(bootstrapClusterState); - } + // start after transport service so the local disco is known + discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); this.autoForceMergeManager.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); - if (discovery != null) { - discovery.startInitialJoin(); - } + discovery.startInitialJoin(); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); @@ -1901,9 +1887,7 @@ private Node stop() { injector.getInstance(IndicesClusterStateService.class).stop(); // close discovery early to not react to pings anymore. // This can confuse other nodes and delay things - mostly if we're the cluster manager and we're running tests. - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { - injector.getInstance(Discovery.class).stop(); - } + injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); @@ -1913,9 +1897,7 @@ private Node stop() { injector.getInstance(WorkloadGroupService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { - injector.getInstance(GatewayService.class).stop(); - } + injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); @@ -1970,10 +1952,8 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); toClose.add(injector.getInstance(NodeConnectionsService.class)); - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { - toClose.add(() -> stopWatch.stop().start("discovery")); - toClose.add(injector.getInstance(Discovery.class)); - } + toClose.add(() -> stopWatch.stop().start("discovery")); + toClose.add(injector.getInstance(Discovery.class)); toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); toClose.add(nodeService.getSearchBackpressureService()); @@ -1983,10 +1963,8 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(NodeResourceUsageTracker.class)); toClose.add(() -> stopWatch.stop().start("resource_usage_collector")); toClose.add(injector.getInstance(ResourceUsageCollectorService.class)); - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG) == false) { - toClose.add(() -> stopWatch.stop().start("gateway")); - toClose.add(injector.getInstance(GatewayService.class)); - } + toClose.add(() -> stopWatch.stop().start("gateway")); + toClose.add(injector.getInstance(GatewayService.class)); toClose.add(() -> stopWatch.stop().start("search")); toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 971ccba061b2a..9671fda14375d 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -245,8 +245,6 @@ public NodeStats stats( boolean cacheService, boolean remoteStoreNodeStats ) { - discoveryStats = discoveryStats && discovery != null; - clusterManagerThrottling = clusterManagerThrottling && clusterService.getClusterManagerService() != null; // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) return new NodeStats( From a8f30dbe9467211ed695ea5f72eec9a961cac53b Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 24 Jun 2025 16:35:02 -0700 Subject: [PATCH 11/16] Clean up a bit Removed unnecessary casts and another piece of conditional logic. Signed-off-by: Michael Froh --- .../java/org/opensearch/action/ActionModule.java | 1 + .../org/opensearch/cluster/ClusterModule.java | 4 +--- .../cluster/service/ClusterService.java | 2 +- .../cluster/service/LocalClusterService.java | 3 ++- .../src/main/java/org/opensearch/node/Node.java | 16 +++++----------- 5 files changed, 10 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 491a9e2edc7bb..67a86db37e790 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -794,6 +794,7 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 0e4d23f2f2d72..cfd5828dfb951 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -474,9 +474,7 @@ protected void configure() { bind(MetadataIndexTemplateService.class).asEagerSingleton(); bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); - @SuppressWarnings("unchecked") - Class shardStateActionClass = (Class) clusterService - .getShardStateActionClass(); + Class shardStateActionClass = clusterService.getShardStateActionClass(); if (shardStateActionClass == ShardStateAction.class) { bind(ShardStateAction.class).asEagerSingleton(); } else { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c9d1f515063bc..a3f6a0258d082 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -379,7 +379,7 @@ public void submitStateUpdateTasks( /** * @return the class to use to handle shard state actions. Subclasses may provide a different implementation. */ - public Class getShardStateActionClass() { + public Class getShardStateActionClass() { return ShardStateAction.class; } } diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index 563e722f88224..c68757702e295 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.action.shard.LocalShardStateAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -87,7 +88,7 @@ public void submitStateUpdateTasks( } @Override - public Class getShardStateActionClass() { + public Class getShardStateActionClass() { return LocalShardStateAction.class; } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7d91f33c87b03..1bf140bb60ca7 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1350,11 +1350,11 @@ protected Node(final Environment initialEnvironment, Collection clas ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); - final DiscoveryModule discoveryModule; + final Discovery discovery; if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { - discoveryModule = null; + discovery = new LocalDiscovery(transportService, clusterService.getClusterApplierService()); } else { - discoveryModule = new DiscoveryModule( + discovery = new DiscoveryModule( settings, threadPool, transportService, @@ -1373,7 +1373,7 @@ protected Node(final Environment initialEnvironment, Collection clas remoteStoreNodeService, clusterManagerMetrics, remoteClusterStateService - ); + ).getDiscovery(); } final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1396,10 +1396,6 @@ protected Node(final Environment initialEnvironment, Collection clas taskCancellationMonitoringSettings ); - Discovery discovery = discoveryModule == null - ? new LocalDiscovery(transportService, clusterService.getClusterApplierService()) - : discoveryModule.getDiscovery(); - this.nodeService = new NodeService( settings, threadPool, @@ -1725,9 +1721,8 @@ public Node start() throws NodeValidationException { nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); - final Discovery discovery; injector.getInstance(GatewayService.class).start(); - discovery = injector.getInstance(Discovery.class); + Discovery discovery = injector.getInstance(Discovery.class); discovery.setNodeConnectionsService(nodeConnectionsService); clusterService.getClusterManagerService().setClusterStatePublisher(discovery); @@ -1802,7 +1797,6 @@ public Node start() throws NodeValidationException { ); clusterService.addStateApplier(transportService.getTaskManager()); - // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); From 48413ae4856ce5d1cee6d3e217c89175b394d996 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Wed, 25 Jun 2025 12:06:22 -0700 Subject: [PATCH 12/16] Add explicit cast back to ClusterModule I tried to be clever and scope down the type returned from the getShardStateActionClass method, but doing so accidentally added ShardStateAction to our public API surface. While I don't like having to do an explicit cast, I definitely don't want to expand our public API surface if I don't need to. Signed-off-by: Michael Froh --- .../src/main/java/org/opensearch/cluster/ClusterModule.java | 4 +++- .../java/org/opensearch/cluster/service/ClusterService.java | 2 +- .../org/opensearch/cluster/service/LocalClusterService.java | 3 +-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index cfd5828dfb951..0e4d23f2f2d72 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -474,7 +474,9 @@ protected void configure() { bind(MetadataIndexTemplateService.class).asEagerSingleton(); bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); - Class shardStateActionClass = clusterService.getShardStateActionClass(); + @SuppressWarnings("unchecked") + Class shardStateActionClass = (Class) clusterService + .getShardStateActionClass(); if (shardStateActionClass == ShardStateAction.class) { bind(ShardStateAction.class).asEagerSingleton(); } else { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index a3f6a0258d082..c9d1f515063bc 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -379,7 +379,7 @@ public void submitStateUpdateTasks( /** * @return the class to use to handle shard state actions. Subclasses may provide a different implementation. */ - public Class getShardStateActionClass() { + public Class getShardStateActionClass() { return ShardStateAction.class; } } diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index c68757702e295..563e722f88224 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.action.shard.LocalShardStateAction; -import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -88,7 +87,7 @@ public void submitStateUpdateTasks( } @Override - public Class getShardStateActionClass() { + public Class getShardStateActionClass() { return LocalShardStateAction.class; } } From 3dfcc048da1803c68d46f7c2b3c276b29ed4756b Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Tue, 1 Jul 2025 12:50:52 -0700 Subject: [PATCH 13/16] Incorporate feedback from @rajiv-kv Pass ShardStateAction class to ClusterModule directly instead of passing it via ClusterService, since ClusterService is unfortunately part of the public API. Update ClusterState on shard startup entirely within the cluster applier thread. This ensures that we don't accidentally drop a state update For example, if two shards both start at the same time, they could have both read the current cluster state and compute new cluster state based only on the newly-started shard, then the last update would win. This change fixes that. Signed-off-by: Michael Froh --- .../org/opensearch/cluster/ClusterModule.java | 8 ++-- .../action/shard/LocalShardStateAction.java | 44 +++++++++++-------- .../action/shard/ShardStateAction.java | 2 +- .../service/ClusterApplierService.java | 8 ++++ .../cluster/service/ClusterService.java | 8 ---- .../cluster/service/LocalClusterService.java | 6 --- .../main/java/org/opensearch/node/Node.java | 5 ++- .../cluster/ClusterModuleTests.java | 21 +++++---- 8 files changed, 55 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 0e4d23f2f2d72..517d3abd2a472 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -144,6 +144,7 @@ public class ClusterModule extends AbstractModule { final Collection deciderList; final ShardsAllocator shardsAllocator; private final ClusterManagerMetrics clusterManagerMetrics; + private final Class shardStateActionClass; public ClusterModule( Settings settings, @@ -152,7 +153,8 @@ public ClusterModule( ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, ThreadContext threadContext, - ClusterManagerMetrics clusterManagerMetrics + ClusterManagerMetrics clusterManagerMetrics, + Class shardStateActionClass ) { this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); @@ -169,6 +171,7 @@ public ClusterModule( clusterManagerMetrics ); this.clusterManagerMetrics = clusterManagerMetrics; + this.shardStateActionClass = shardStateActionClass; } public static List getNamedWriteables() { @@ -474,9 +477,6 @@ protected void configure() { bind(MetadataIndexTemplateService.class).asEagerSingleton(); bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); - @SuppressWarnings("unchecked") - Class shardStateActionClass = (Class) clusterService - .getShardStateActionClass(); if (shardStateActionClass == ShardStateAction.class) { bind(ShardStateAction.class).asEagerSingleton(); } else { diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java index 04abe5112f06c..335cede540b57 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java @@ -21,6 +21,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.util.function.Function; + /** * A local implementation of {@link ShardStateAction} that applies shard state changes directly to the * local cluster state. This is used in clusterless mode, where there is no cluster manager. @@ -45,28 +47,32 @@ public void shardStarted( ActionListener listener, ClusterState currentState ) { - // We're running in clusterless mode. Apply the state change directly to the local cluster state. - ClusterState clusterState = clusterService.state(); - RoutingTable routingTable = clusterState.getRoutingTable(); - IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); + Function clusterStateUpdater = clusterState -> { + // We're running in clusterless mode. Apply the state change directly to the local cluster state. + RoutingTable routingTable = clusterState.getRoutingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); - ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { - IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable); - indexShardRoutingTableBuilder.removeShard(shardRouting); - indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); - indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); - } else { - indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); + ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder( + indexShardRoutingTable + ); + indexShardRoutingTableBuilder.removeShard(shardRouting); + indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } else { + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); + } } - } - routingTableBuilder.add(indexRoutingTableBuilder); - clusterStateBuilder.routingTable(routingTableBuilder.build()); + routingTableBuilder.add(indexRoutingTableBuilder); + clusterStateBuilder.routingTable(routingTableBuilder.build()); + return clusterStateBuilder.build(); + }; clusterService.getClusterApplierService() - .onNewClusterState("shard-started " + shardRouting.shardId(), clusterStateBuilder::build, (s, e) -> {}); + .updateClusterState("shard-started " + shardRouting.shardId(), clusterStateUpdater, (s, e) -> {}); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java index 6850c71532c50..6a204925ccd04 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java @@ -88,7 +88,7 @@ import java.util.function.Supplier; /** - * Transport action for retrieving the shard state + * Registers transport actions that react to shard state changes, such as shard started or shard failed. * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index 7ab1a082a4620..fc2a121c90e54 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -382,6 +382,14 @@ public void onNewClusterState( submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener); } + public void updateClusterState( + final String source, + final Function updateFunction, + final ClusterApplyListener listener + ) { + submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), updateFunction, listener); + } + private void submitStateUpdateTask( final String source, final ClusterStateTaskConfig config, diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c9d1f515063bc..05d478bbb9df1 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -42,7 +42,6 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.NodeConnectionsService; -import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.RerouteService; @@ -375,11 +374,4 @@ public void submitStateUpdateTasks( ) { clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); } - - /** - * @return the class to use to handle shard state actions. Subclasses may provide a different implementation. - */ - public Class getShardStateActionClass() { - return ShardStateAction.class; - } } diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java index 563e722f88224..4caf37cacfc42 100644 --- a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -12,7 +12,6 @@ import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ClusterStateTaskListener; -import org.opensearch.cluster.action.shard.LocalShardStateAction; import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -85,9 +84,4 @@ public void submitStateUpdateTasks( ) { throw new UnsupportedOperationException("Cannot submit cluster state update tasks when cluster manager service is not available"); } - - @Override - public Class getShardStateActionClass() { - return LocalShardStateAction.class; - } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1bf140bb60ca7..fc66571b958a6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -68,6 +68,8 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.action.shard.LocalShardStateAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.applicationtemplates.SystemTemplatesPlugin; import org.opensearch.cluster.applicationtemplates.SystemTemplatesService; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -769,7 +771,8 @@ protected Node(final Environment initialEnvironment, Collection clas clusterInfoService, snapshotsInfoService, threadPool.getThreadContext(), - clusterManagerMetrics + clusterManagerMetrics, + FeatureFlags.isEnabled(CLUSTERLESS_FLAG) ? LocalShardStateAction.class : ShardStateAction.class ); modules.add(clusterModule); final List mapperPlugins = pluginsService.filterPlugins(MapperPlugin.class); diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 82551bc26e224..cb46d57a9d44d 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -32,6 +32,7 @@ package org.opensearch.cluster; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; @@ -91,7 +92,7 @@ import java.util.function.Supplier; public class ClusterModuleTests extends ModuleTestCase { - private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE; + private final ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE; private ClusterService clusterService; private ThreadContext threadContext; @@ -229,7 +230,7 @@ public void testRegisterAllocationDeciderDuplicate() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings)); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)) + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class) ); assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice"); } @@ -240,7 +241,7 @@ public void testRegisterAllocationDecider() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new FakeAllocationDecider()); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class); assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class))); } @@ -250,7 +251,7 @@ private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, Str public Map> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonMap(name, supplier); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class); } public void testRegisterShardsAllocator() { @@ -278,7 +279,8 @@ public void testUnknownShardsAllocator() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ) ); assertEquals("Unknown ShardsAllocator [dne]", e.getMessage()); @@ -366,7 +368,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); expectThrows( IllegalArgumentException.class, @@ -382,7 +385,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); expectThrows( IllegalArgumentException.class, @@ -415,7 +419,8 @@ public void testRerouteServiceSetForBalancedShardsAllocator() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state())); } From fe6d0bc7234e58922e23354b94359123bcd05b7c Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 14 Jul 2025 10:26:19 -0700 Subject: [PATCH 14/16] Fix ClusterModuleTests Needed to pass ShardStateAction.class Signed-off-by: Michael Froh --- .../org/opensearch/cluster/ClusterModuleTests.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index cb46d57a9d44d..3b20af8446704 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -159,7 +159,8 @@ public Collection getIndexNameCu clusterInfoService, null, threadContext, - null + null, + ShardStateAction.class ); assertTrue(module.getIndexNameExpressionResolver().getExpressionResolvers().contains(customResolver1)); assertTrue(module.getIndexNameExpressionResolver().getExpressionResolvers().contains(customResolver2)); @@ -176,7 +177,16 @@ public Collection getIndexNameCu }); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new ClusterModule(Settings.EMPTY, clusterService, clusterPlugins, clusterInfoService, null, threadContext, null) + () -> new ClusterModule( + Settings.EMPTY, + clusterService, + clusterPlugins, + clusterInfoService, + null, + threadContext, + null, + ShardStateAction.class + ) ); assertEquals( "Cannot specify expression resolver [org.opensearch.cluster.ClusterModuleTests$FakeExpressionResolver] twice", From b9abedad1595dee585a33d0a5c8516960404b779 Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 14 Jul 2025 15:29:55 -0700 Subject: [PATCH 15/16] Remove feature flag Per suggestion from @andrross, I removed the feature flag and made the presence of a plugin that manages cluster state the criterion to decide whether to start in clusterless mode. Signed-off-by: Michael Froh --- .../common/settings/FeatureFlagSettings.java | 4 +-- .../opensearch/common/util/FeatureFlags.java | 14 ++------ .../main/java/org/opensearch/node/Node.java | 8 ++--- .../org/opensearch/plugins/ClusterPlugin.java | 9 +++++ .../common/util/FeatureFlagTests.java | 34 ------------------- 5 files changed, 17 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 02f4516176605..a53debf564ce4 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -38,8 +38,6 @@ protected FeatureFlagSettings( FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING, FeatureFlags.ARROW_STREAMS_SETTING, - FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, - FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING, - FeatureFlags.CLUSTERLESS_SETTING + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 96f14318af585..e88b99347800e 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -68,7 +68,7 @@ public class FeatureFlags { * Gates the functionality of warm index having the capability to store data remotely. * Once the feature is ready for release, this feature flag can be removed. */ - public static final String WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG = FEATURE_FLAG_PREFIX + "writable_warm_index.enabled"; + public static final String WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG = "opensearch.experimental.feature.writable_warm_index.enabled"; /** * Gates the functionality of background task execution. @@ -79,7 +79,7 @@ public class FeatureFlags { * Gates the functionality of merged segment warmer in local/remote segment replication. * Once the feature is ready for release, this feature flag can be removed. */ - public static final String MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG = FEATURE_FLAG_PREFIX + "merged_segment_warmer.enabled"; + public static final String MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG = "opensearch.experimental.feature.merged_segment_warmer.enabled"; public static final Setting REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( REMOTE_STORE_MIGRATION_EXPERIMENTAL, @@ -129,12 +129,6 @@ public class FeatureFlags { public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled"; public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); - /** - * Allows the node to run without being part of a cluster. Eventually, we might move this to a normal cluster setting. - */ - public static final String CLUSTERLESS_FLAG = FEATURE_FLAG_PREFIX + "clusterless.enabled"; - public static final Setting CLUSTERLESS_SETTING = Setting.boolSetting(CLUSTERLESS_FLAG, false, Property.NodeScope); - /** * Underlying implementation for feature flags. * All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags. @@ -145,8 +139,7 @@ static class FeatureFlagsImpl { private static final String TEST_FLAG = "test.flag.enabled"; private static final Setting TEST_FLAG_SETTING = Setting.boolSetting(TEST_FLAG, false, Property.NodeScope); - // Visible for testing - final ConcurrentHashMap, Boolean> featureFlags = new ConcurrentHashMap<>() { + private final ConcurrentHashMap, Boolean> featureFlags = new ConcurrentHashMap<>() { { put(TEST_FLAG_SETTING, TEST_FLAG_SETTING.get(Settings.EMPTY)); put(REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); @@ -165,7 +158,6 @@ static class FeatureFlagsImpl { SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING.getDefault(Settings.EMPTY) ); put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); - put(CLUSTERLESS_SETTING, CLUSTERLESS_SETTING.getDefault(Settings.EMPTY)); } }; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index fc66571b958a6..5b3559b2fde49 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -329,7 +329,6 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; import static org.opensearch.common.util.FeatureFlags.BACKGROUND_TASK_EXECUTION_EXPERIMENTAL; -import static org.opensearch.common.util.FeatureFlags.CLUSTERLESS_FLAG; import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; @@ -707,8 +706,9 @@ protected Node(final Environment initialEnvironment, Collection clas final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); + final boolean clusterless = clusterPlugins.stream().anyMatch(ClusterPlugin::isClusterless); final ClusterService clusterService; - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + if (clusterless) { clusterService = new LocalClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); } else { clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); @@ -772,7 +772,7 @@ protected Node(final Environment initialEnvironment, Collection clas snapshotsInfoService, threadPool.getThreadContext(), clusterManagerMetrics, - FeatureFlags.isEnabled(CLUSTERLESS_FLAG) ? LocalShardStateAction.class : ShardStateAction.class + clusterless ? LocalShardStateAction.class : ShardStateAction.class ); modules.add(clusterModule); final List mapperPlugins = pluginsService.filterPlugins(MapperPlugin.class); @@ -1354,7 +1354,7 @@ protected Node(final Environment initialEnvironment, Collection clas clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); final Discovery discovery; - if (FeatureFlags.isEnabled(CLUSTERLESS_FLAG)) { + if (clusterless) { discovery = new LocalDiscovery(transportService, clusterService.getClusterApplierService()); } else { discovery = new DiscoveryModule( diff --git a/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java b/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java index 9b2ddd0bed41a..cfd26814697e0 100644 --- a/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java @@ -111,4 +111,13 @@ default void onNodeStarted(DiscoveryNode localNode) { onNodeStarted(); } + /** + * @return true if this plugin will handle cluster state management on behalf of the node, so the node does not + * need to discover a cluster manager and be part of a cluster. + * + * Note that if any ClusterPlugin returns true from this method, the node will start in clusterless mode. + */ + default boolean isClusterless() { + return false; + } } diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index 21f591c9ecd31..f3751e98f5b60 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -9,14 +9,9 @@ package org.opensearch.common.util; import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.settings.FeatureFlagSettings; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; -import java.lang.reflect.Field; -import java.util.Set; - import static org.opensearch.common.util.FeatureFlags.FEATURE_FLAG_PREFIX; public class FeatureFlagTests extends OpenSearchTestCase { @@ -47,35 +42,6 @@ public void testFeatureFlagFromSettings() { assertFalse(testFlagsImpl.isEnabled(TEST_FLAG)); } - /** - * Checks that all feature flags declared in the FeatureFlags class are registered in the map in - * FeatureFlagsImpl. (It's so easy to forget to register a new feature flag!) - */ - @SuppressWarnings("unchecked") - public void testFeatureFlagsAreAllRegistered() { - FeatureFlags.FeatureFlagsImpl testFlagsImpl = new FeatureFlags.FeatureFlagsImpl(); - testFlagsImpl.initializeFeatureFlags(Settings.EMPTY); - Set> registeredFeatureFlagKeys = testFlagsImpl.featureFlags.keySet(); - Field[] fields = FeatureFlags.class.getFields(); - for (Field field : fields) { - if (field.getType() == Setting.class) { - try { - Setting setting = (Setting) field.get(null); - assertTrue( - "Feature flag " + setting.getKey() + " is not registered in FeatureFlagsImpl", - registeredFeatureFlagKeys.contains(setting) - ); - assertTrue( - "Feature flag " + setting.getKey() + " is not registered in FeatureFlagSettings", - FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS.contains(setting) - ); - } catch (IllegalAccessException e) { - fail("Failed to access field: " + field.getName()); - } - } - } - } - @SuppressForbidden(reason = "Testing system property functionality") private void setSystemPropertyTrue(String key) { System.setProperty(key, "true"); From 1cbb72b6c8ebad5353f25cdd607cb986a55c226f Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 14 Jul 2025 16:11:31 -0700 Subject: [PATCH 16/16] Update changelog We don't have a feature flag anymore, so the changelog entry needs to change. Signed-off-by: Michael Froh --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65775e6eb39f0..1a7fd77591249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708)) - Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697)) - Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663)) -- [Experimental] Add a feature flag to start OpenSearch without joining a cluster ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479)) +- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))