diff --git a/CHANGELOG.md b/CHANGELOG.md index d04c01edc47d9..1a7fd77591249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708)) - Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697)) - Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663)) +- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 7dbc0d0ee88e3..517d3abd2a472 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -144,6 +144,7 @@ public class ClusterModule extends AbstractModule { final Collection deciderList; final ShardsAllocator shardsAllocator; private final ClusterManagerMetrics clusterManagerMetrics; + private final Class shardStateActionClass; public ClusterModule( Settings settings, @@ -152,7 +153,8 @@ public ClusterModule( ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, ThreadContext threadContext, - ClusterManagerMetrics clusterManagerMetrics + ClusterManagerMetrics clusterManagerMetrics, + Class shardStateActionClass ) { this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); @@ -169,6 +171,7 @@ public ClusterModule( clusterManagerMetrics ); this.clusterManagerMetrics = clusterManagerMetrics; + this.shardStateActionClass = shardStateActionClass; } public static List getNamedWriteables() { @@ -474,7 +477,11 @@ protected void configure() { bind(MetadataIndexTemplateService.class).asEagerSingleton(); bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); - bind(ShardStateAction.class).asEagerSingleton(); + if (shardStateActionClass == ShardStateAction.class) { + bind(ShardStateAction.class).asEagerSingleton(); + } else { + bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton(); + } bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(TaskResultsService.class).asEagerSingleton(); diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java new file mode 100644 index 0000000000000..335cede540b57 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/action/shard/LocalShardStateAction.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.action.shard; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RerouteService; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.function.Function; + +/** + * A local implementation of {@link ShardStateAction} that applies shard state changes directly to the + * local cluster state. This is used in clusterless mode, where there is no cluster manager. + */ +public class LocalShardStateAction extends ShardStateAction { + @Inject + public LocalShardStateAction( + ClusterService clusterService, + TransportService transportService, + AllocationService allocationService, + RerouteService rerouteService, + ThreadPool threadPool + ) { + super(clusterService, transportService, allocationService, rerouteService, threadPool); + } + + @Override + public void shardStarted( + ShardRouting shardRouting, + long primaryTerm, + String message, + ActionListener listener, + ClusterState currentState + ) { + Function clusterStateUpdater = clusterState -> { + // We're running in clusterless mode. Apply the state change directly to the local cluster state. + RoutingTable routingTable = clusterState.getRoutingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); + + ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index()); + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) { + IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder( + indexShardRoutingTable + ); + indexShardRoutingTableBuilder.removeShard(shardRouting); + indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted()); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build()); + } else { + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable); + } + } + routingTableBuilder.add(indexRoutingTableBuilder); + clusterStateBuilder.routingTable(routingTableBuilder.build()); + return clusterStateBuilder.build(); + }; + clusterService.getClusterApplierService() + .updateClusterState("shard-started " + shardRouting.shardId(), clusterStateUpdater, (s, e) -> {}); + } + + @Override + public void localShardFailed( + ShardRouting shardRouting, + String message, + Exception failure, + ActionListener listener, + ClusterState currentState + ) { + // Do not send a failure to the cluster manager, as we are running in clusterless mode. + } +} diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java index cb5749a91d448..6a204925ccd04 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java @@ -88,7 +88,7 @@ import java.util.function.Supplier; /** - * Transport action for retrieving the shard state + * Registers transport actions that react to shard state changes, such as shard started or shard failed. * * @opensearch.internal */ @@ -128,7 +128,7 @@ private static Priority parseReroutePriority(String priorityString) { } private final TransportService transportService; - private final ClusterService clusterService; + final ClusterService clusterService; private final ThreadPool threadPool; private volatile Priority followUpRerouteTaskPriority; diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index 7ab1a082a4620..fc2a121c90e54 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -382,6 +382,14 @@ public void onNewClusterState( submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener); } + public void updateClusterState( + final String source, + final Function updateFunction, + final ClusterApplyListener listener + ) { + submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), updateFunction, listener); + } + private void submitStateUpdateTask( final String source, final ClusterStateTaskConfig config, diff --git a/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java new file mode 100644 index 0000000000000..4caf37cacfc42 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/LocalClusterService.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.cluster.ClusterManagerMetrics; +import org.opensearch.cluster.ClusterStateTaskConfig; +import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.cluster.ClusterStateTaskListener; +import org.opensearch.cluster.coordination.ClusterStatePublisher; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.Node; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Map; + +/** + * A local implementation of {@link ClusterService} that assumes we have no cluster manager. + * This is used in clusterless mode. + */ +public class LocalClusterService extends ClusterService { + private static class DummyClusterManagerService extends ClusterManagerService { + private static final ClusterManagerThrottlingStats EMPTY_THROTTLING_STATS = new ClusterManagerThrottlingStats(); + + public DummyClusterManagerService(Settings settings, ClusterSettings clusterSettings) { + super(settings, clusterSettings, null, null); + } + + @Override + public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {} + + @Override + public ClusterManagerThrottlingStats getThrottlingStats() { + return EMPTY_THROTTLING_STATS; + } + } + + public LocalClusterService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + ClusterManagerMetrics clusterManagerMetrics + ) { + super( + settings, + clusterSettings, + new DummyClusterManagerService(settings, clusterSettings), + new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics) + ); + } + + @Override + protected synchronized void doStart() { + getClusterApplierService().start(); + } + + @Override + protected synchronized void doStop() { + getClusterApplierService().stop(); + } + + @Override + protected synchronized void doClose() { + getClusterApplierService().close(); + } + + @Override + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { + return null; + } + + @Override + public void submitStateUpdateTasks( + final String source, + final Map tasks, + final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor + ) { + throw new UnsupportedOperationException("Cannot submit cluster state update tasks when cluster manager service is not available"); + } +} diff --git a/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java b/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java new file mode 100644 index 0000000000000..d0090f5b4e7f4 --- /dev/null +++ b/server/src/main/java/org/opensearch/discovery/LocalDiscovery.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.discovery; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.coordination.PendingClusterStateStats; +import org.opensearch.cluster.coordination.PublishClusterStateStats; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterApplier; +import org.opensearch.cluster.service.ClusterStateStats; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Clusterless implementation of Discovery. This is only able to "discover" the local node. + */ +public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { + private static final DiscoveryStats EMPTY_STATS = new DiscoveryStats( + new PendingClusterStateStats(0, 0, 0), + new PublishClusterStateStats(0, 0, 0), + new ClusterStateStats() + ); + private final TransportService transportService; + private final ClusterApplier clusterApplier; + + public LocalDiscovery(TransportService transportService, ClusterApplier clusterApplier) { + this.transportService = transportService; + this.clusterApplier = clusterApplier; + } + + @Override + public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener) { + // In clusterless mode, we should never be asked to publish a cluster state. + throw new UnsupportedOperationException("Should not be called in clusterless mode"); + } + + @Override + protected void doStart() { + DiscoveryNode localNode = transportService.getLocalNode(); + ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build()) + .build(); + clusterApplier.setInitialState(bootstrapClusterState); + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public DiscoveryStats stats() { + return EMPTY_STATS; + } + + @Override + public void startInitialJoin() { + + } + + @Override + public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { + + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a5e92293c0be1..5b3559b2fde49 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -68,6 +68,8 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.action.shard.LocalShardStateAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.applicationtemplates.SystemTemplatesPlugin; import org.opensearch.cluster.applicationtemplates.SystemTemplatesService; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -86,6 +88,7 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.LocalClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.cache.module.CacheModule; @@ -130,6 +133,7 @@ import org.opensearch.crypto.CryptoHandlerRegistry; import org.opensearch.discovery.Discovery; import org.opensearch.discovery.DiscoveryModule; +import org.opensearch.discovery.LocalDiscovery; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeMetadata; @@ -702,12 +706,13 @@ protected Node(final Environment initialEnvironment, Collection clas final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); - final ClusterService clusterService = new ClusterService( - settings, - settingsModule.getClusterSettings(), - threadPool, - clusterManagerMetrics - ); + final boolean clusterless = clusterPlugins.stream().anyMatch(ClusterPlugin::isClusterless); + final ClusterService clusterService; + if (clusterless) { + clusterService = new LocalClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); + } else { + clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, clusterManagerMetrics); + } clusterService.addStateApplier(scriptService); resourcesToClose.add(clusterService); final Set> consistentSettings = settingsModule.getConsistentSettings(); @@ -766,7 +771,8 @@ protected Node(final Environment initialEnvironment, Collection clas clusterInfoService, snapshotsInfoService, threadPool.getThreadContext(), - clusterManagerMetrics + clusterManagerMetrics, + clusterless ? LocalShardStateAction.class : ShardStateAction.class ); modules.add(clusterModule); final List mapperPlugins = pluginsService.filterPlugins(MapperPlugin.class); @@ -1347,27 +1353,31 @@ protected Node(final Environment initialEnvironment, Collection clas ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); - final DiscoveryModule discoveryModule = new DiscoveryModule( - settings, - threadPool, - transportService, - namedWriteableRegistry, - networkService, - clusterService.getClusterManagerService(), - clusterService.getClusterApplierService(), - clusterService.getClusterSettings(), - pluginsService.filterPlugins(DiscoveryPlugin.class), - clusterModule.getAllocationService(), - environment.configDir(), - gatewayMetaState, - rerouteService, - fsHealthService, - persistedStateRegistry, - remoteStoreNodeService, - clusterManagerMetrics, - remoteClusterStateService - ); - + final Discovery discovery; + if (clusterless) { + discovery = new LocalDiscovery(transportService, clusterService.getClusterApplierService()); + } else { + discovery = new DiscoveryModule( + settings, + threadPool, + transportService, + namedWriteableRegistry, + networkService, + clusterService.getClusterManagerService(), + clusterService.getClusterApplierService(), + clusterService.getClusterSettings(), + pluginsService.filterPlugins(DiscoveryPlugin.class), + clusterModule.getAllocationService(), + environment.configDir(), + gatewayMetaState, + rerouteService, + fsHealthService, + persistedStateRegistry, + remoteStoreNodeService, + clusterManagerMetrics, + remoteClusterStateService + ).getDiscovery(); + } final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, threadPool, @@ -1388,11 +1398,12 @@ protected Node(final Environment initialEnvironment, Collection clas transportService.getTaskManager(), taskCancellationMonitoringSettings ); + this.nodeService = new NodeService( settings, threadPool, monitorService, - discoveryModule.getDiscovery(), + discovery, transportService, indicesService, pluginsService, @@ -1535,7 +1546,7 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); - b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); + b.bind(Discovery.class).toInstance(discovery); b.bind(RemoteStoreSettings.class).toInstance(remoteStoreSettings); { b.bind(PeerRecoverySourceService.class) @@ -1716,7 +1727,7 @@ public Node start() throws NodeValidationException { injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); discovery.setNodeConnectionsService(nodeConnectionsService); - clusterService.getClusterManagerService().setClusterStatePublisher(discovery::publish); + clusterService.getClusterManagerService().setClusterStatePublisher(discovery); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); diff --git a/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java b/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java index 9b2ddd0bed41a..cfd26814697e0 100644 --- a/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/ClusterPlugin.java @@ -111,4 +111,13 @@ default void onNodeStarted(DiscoveryNode localNode) { onNodeStarted(); } + /** + * @return true if this plugin will handle cluster state management on behalf of the node, so the node does not + * need to discover a cluster manager and be part of a cluster. + * + * Note that if any ClusterPlugin returns true from this method, the node will start in clusterless mode. + */ + default boolean isClusterless() { + return false; + } } diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 82551bc26e224..3b20af8446704 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -32,6 +32,7 @@ package org.opensearch.cluster; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; @@ -91,7 +92,7 @@ import java.util.function.Supplier; public class ClusterModuleTests extends ModuleTestCase { - private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE; + private final ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE; private ClusterService clusterService; private ThreadContext threadContext; @@ -158,7 +159,8 @@ public Collection getIndexNameCu clusterInfoService, null, threadContext, - null + null, + ShardStateAction.class ); assertTrue(module.getIndexNameExpressionResolver().getExpressionResolvers().contains(customResolver1)); assertTrue(module.getIndexNameExpressionResolver().getExpressionResolvers().contains(customResolver2)); @@ -175,7 +177,16 @@ public Collection getIndexNameCu }); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new ClusterModule(Settings.EMPTY, clusterService, clusterPlugins, clusterInfoService, null, threadContext, null) + () -> new ClusterModule( + Settings.EMPTY, + clusterService, + clusterPlugins, + clusterInfoService, + null, + threadContext, + null, + ShardStateAction.class + ) ); assertEquals( "Cannot specify expression resolver [org.opensearch.cluster.ClusterModuleTests$FakeExpressionResolver] twice", @@ -229,7 +240,7 @@ public void testRegisterAllocationDeciderDuplicate() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings)); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)) + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class) ); assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice"); } @@ -240,7 +251,7 @@ public void testRegisterAllocationDecider() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new FakeAllocationDecider()); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class); assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class))); } @@ -250,7 +261,7 @@ private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, Str public Map> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonMap(name, supplier); } - }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)); + }), clusterInfoService, null, threadContext, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), ShardStateAction.class); } public void testRegisterShardsAllocator() { @@ -278,7 +289,8 @@ public void testUnknownShardsAllocator() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ) ); assertEquals("Unknown ShardsAllocator [dne]", e.getMessage()); @@ -366,7 +378,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); expectThrows( IllegalArgumentException.class, @@ -382,7 +395,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); expectThrows( IllegalArgumentException.class, @@ -415,7 +429,8 @@ public void testRerouteServiceSetForBalancedShardsAllocator() { clusterInfoService, null, threadContext, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + ShardStateAction.class ); clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state())); }