Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class ClusterModule extends AbstractModule {
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
private final ClusterManagerMetrics clusterManagerMetrics;
private final Class<? extends ShardStateAction> shardStateActionClass;

public ClusterModule(
Settings settings,
Expand All @@ -152,7 +153,8 @@ public ClusterModule(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
Class<? extends ShardStateAction> shardStateActionClass
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -169,6 +171,7 @@ public ClusterModule(
clusterManagerMetrics
);
this.clusterManagerMetrics = clusterManagerMetrics;
this.shardStateActionClass = shardStateActionClass;
}

public static List<Entry> getNamedWriteables() {
Expand Down Expand Up @@ -474,7 +477,11 @@ protected void configure() {
bind(MetadataIndexTemplateService.class).asEagerSingleton();
bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
bind(DelayedAllocationService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();
if (shardStateActionClass == ShardStateAction.class) {
bind(ShardStateAction.class).asEagerSingleton();
} else {
bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton();
}
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.action.shard;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.function.Function;

/**
* A local implementation of {@link ShardStateAction} that applies shard state changes directly to the
* local cluster state. This is used in clusterless mode, where there is no cluster manager.
*/
public class LocalShardStateAction extends ShardStateAction {
@Inject
public LocalShardStateAction(
ClusterService clusterService,
TransportService transportService,
AllocationService allocationService,
RerouteService rerouteService,
ThreadPool threadPool
) {
super(clusterService, transportService, allocationService, rerouteService, threadPool);
}

@Override
public void shardStarted(
ShardRouting shardRouting,
long primaryTerm,
String message,
ActionListener<Void> listener,
ClusterState currentState
) {
Function<ClusterState, ClusterState> clusterStateUpdater = clusterState -> {
// We're running in clusterless mode. Apply the state change directly to the local cluster state.
RoutingTable routingTable = clusterState.getRoutingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());

ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable);
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index());
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) {
IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(
indexShardRoutingTable
);
indexShardRoutingTableBuilder.removeShard(shardRouting);
indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted());
indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build());
} else {
indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable);
}
}
routingTableBuilder.add(indexRoutingTableBuilder);
clusterStateBuilder.routingTable(routingTableBuilder.build());
return clusterStateBuilder.build();
};
clusterService.getClusterApplierService()
.updateClusterState("shard-started " + shardRouting.shardId(), clusterStateUpdater, (s, e) -> {});
}

@Override
public void localShardFailed(
ShardRouting shardRouting,
String message,
Exception failure,
ActionListener<Void> listener,
ClusterState currentState
) {
// Do not send a failure to the cluster manager, as we are running in clusterless mode.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
import java.util.function.Supplier;

/**
* Transport action for retrieving the shard state
* Registers transport actions that react to shard state changes, such as shard started or shard failed.
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -128,7 +128,7 @@ private static Priority parseReroutePriority(String priorityString) {
}

private final TransportService transportService;
private final ClusterService clusterService;
final ClusterService clusterService;
private final ThreadPool threadPool;

private volatile Priority followUpRerouteTaskPriority;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ public void onNewClusterState(
submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
}

public void updateClusterState(
final String source,
final Function<ClusterState, ClusterState> updateFunction,
final ClusterApplyListener listener
) {
submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), updateFunction, listener);
}

private void submitStateUpdateTask(
final String source,
final ClusterStateTaskConfig config,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterStateTaskConfig;
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.coordination.ClusterStatePublisher;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.Node;
import org.opensearch.threadpool.ThreadPool;

import java.util.Map;

/**
* A local implementation of {@link ClusterService} that assumes we have no cluster manager.
* This is used in clusterless mode.
*/
public class LocalClusterService extends ClusterService {
private static class DummyClusterManagerService extends ClusterManagerService {
private static final ClusterManagerThrottlingStats EMPTY_THROTTLING_STATS = new ClusterManagerThrottlingStats();

public DummyClusterManagerService(Settings settings, ClusterSettings clusterSettings) {
super(settings, clusterSettings, null, null);
}

@Override
public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {}

@Override
public ClusterManagerThrottlingStats getThrottlingStats() {
return EMPTY_THROTTLING_STATS;
}
}

public LocalClusterService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
settings,
clusterSettings,
new DummyClusterManagerService(settings, clusterSettings),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
);
}

@Override
protected synchronized void doStart() {
getClusterApplierService().start();
}

@Override
protected synchronized void doStop() {
getClusterApplierService().stop();
}

@Override
protected synchronized void doClose() {
getClusterApplierService().close();
}

@Override
public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) {
return null;
}

@Override
public <T> void submitStateUpdateTasks(
final String source,
final Map<T, ClusterStateTaskListener> tasks,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
throw new UnsupportedOperationException("Cannot submit cluster state update tasks when cluster manager service is not available");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.discovery;

import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.action.ActionListener;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Clusterless implementation of Discovery. This is only able to "discover" the local node.
*/
public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery {
private static final DiscoveryStats EMPTY_STATS = new DiscoveryStats(
new PendingClusterStateStats(0, 0, 0),
new PublishClusterStateStats(0, 0, 0),
new ClusterStateStats()
);
private final TransportService transportService;
private final ClusterApplier clusterApplier;

public LocalDiscovery(TransportService transportService, ClusterApplier clusterApplier) {
this.transportService = transportService;
this.clusterApplier = clusterApplier;
}

@Override
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
// In clusterless mode, we should never be asked to publish a cluster state.
throw new UnsupportedOperationException("Should not be called in clusterless mode");
}

@Override
protected void doStart() {
DiscoveryNode localNode = transportService.getLocalNode();
ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build())
.build();
clusterApplier.setInitialState(bootstrapClusterState);
}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {

}

@Override
public DiscoveryStats stats() {
return EMPTY_STATS;
}

@Override
public void startInitialJoin() {

}

@Override
public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {

}
}
Loading
Loading