diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java index 49f87c9243b05..dbaf378d61b7e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java @@ -128,10 +128,7 @@ private static String indexOrAlias() { @Override public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) - .build(); + return Settings.builder().put(super.indexSettings()).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true).build(); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index ada58fed0d12a..92edb147d66d9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -269,8 +269,7 @@ public GroupShardsIterator searchShards( } if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG) - && IndexModule.DataLocalityType.PARTIAL.name() - .equals(indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) + && indexMetadataForShard.getSettings().getAsBoolean(IndexModule.IS_WARM_INDEX_SETTING.getKey(), false) && (preference == null || preference.isEmpty())) { preference = Preference.PRIMARY_FIRST.type(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java index 493d23b57d271..00b0a51272b64 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -91,12 +91,13 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation); RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId())); + if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) { logger.debug( "Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]", shardRouting.shortSummary(), currentNodePool.name(), - RoutingPool.REMOTE_CAPABLE.name(), + targetPool.name(), node.node() ); return allocation.decision( @@ -107,7 +108,25 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl currentNodePool, targetPool ); - } + } else if (RoutingPool.LOCAL_ONLY.equals(targetPool) + && targetPool != currentNodePool + && !node.node().getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) { + logger.debug( + "Shard: [{}] has target pool: [{}]. Cannot remain on node: [{}] without the [{}] node role", + shardRouting, + targetPool, + node.node(), + DiscoveryNodeRole.DATA_ROLE.roleName() + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Shard pool: [%s], node pool: [%s] without [%s] role", + targetPool, + currentNodePool, + DiscoveryNodeRole.DATA_ROLE.roleName() + ); + } return allocation.decision( Decision.YES, NAME, diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index f6c179373793a..1254c1a84e573 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -1088,7 +1088,7 @@ public void testPartialIndexPrimaryDefault() throws Exception { .settings( Settings.builder() .put(state.metadata().index(indexName).getSettings()) - .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL) + .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true) .build() ) .build(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 523a5f59a81c5..b70a0b49366ed 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -102,10 +102,17 @@ public RoutingAllocation getRoutingAllocation(ClusterState clusterState, Routing } public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, int localIndices, int remoteIndices) { - return createInitialCluster(localOnlyNodes, remoteNodes, false, localIndices, remoteIndices); + return createInitialCluster(localOnlyNodes, remoteNodes, false, localIndices, remoteIndices, false); } - public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, boolean remoteOnly, int localIndices, int remoteIndices) { + public ClusterState createInitialCluster( + int localOnlyNodes, + int remoteNodes, + boolean remoteRoleOnly, + int localIndices, + int remoteIndices, + boolean remoteIndexIsWarm + ) { Metadata.Builder mb = Metadata.builder(); for (int i = 0; i < localIndices; i++) { mb.put( @@ -117,15 +124,27 @@ public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, bo } for (int i = 0; i < remoteIndices; i++) { - mb.put( - IndexMetadata.builder(getIndexName(i, true)) - .settings( - settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") - .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) - ) - .numberOfShards(PRIMARIES) - .numberOfReplicas(REPLICAS) - ); + if (remoteIndexIsWarm == false) { + mb.put( + IndexMetadata.builder(getIndexName(i, true)) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } else { + mb.put( + IndexMetadata.builder(getIndexName(i, true)) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") + .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } } Metadata metadata = mb.build(); @@ -143,7 +162,7 @@ public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, bo String name = getNodeId(i, false); nb.add(newNode(name, name, MANAGER_DATA_ROLES)); } - if (remoteOnly) { + if (remoteRoleOnly) { for (int i = 0; i < remoteNodes; i++) { String name = getNodeId(i, true); nb.add(newNode(name, name, WARM_ONLY_ROLE)); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java index a1d5cb3932aa7..36e9b5a275b9a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java @@ -21,7 +21,7 @@ import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; import static org.opensearch.cluster.routing.RoutingPool.getIndexPool; import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG; -import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.IS_WARM_INDEX_SETTING; public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase { @@ -81,8 +81,9 @@ public void testShardsWithTiering() { clusterState = updateIndexMetadataForTiering( clusterState, localIndices, + remoteIndices, IndexModule.TieringState.HOT_TO_WARM.name(), - IndexModule.DataLocalityType.PARTIAL.name() + true ); // trigger shard relocation clusterState = allocateShardsAndBalance(clusterState, service); @@ -100,14 +101,61 @@ public void testShardsWithTiering() { } } + @LockFeatureFlag(WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG) + public void testShardsWithWarmToHotTiering() throws Exception { + int localOnlyNodes = 60; + int remoteCapableNodes = 13; + int localIndices = 0; + int remoteIndices = 10; // 5 primary, 1 replica + + // Create a cluster with warm only roles (dedicated setup) and remote index is of type warm only. + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, true, localIndices, remoteIndices, true); + AllocationService service = this.createRemoteCapableAllocationService(); + + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertTrue(shard.relocating() || shard.started()); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + assertEquals(REMOTE_CAPABLE, shardPool); + assertEquals(REMOTE_CAPABLE, nodePool); + } + + // put indices in the hot to warm tiering state + clusterState = updateIndexMetadataForTiering( + clusterState, + localIndices, + remoteIndices, + IndexModule.TieringState.WARM_TO_HOT.name(), + false + ); + // trigger shard relocation + clusterState = allocateShardsAndBalance(clusterState, service); + routingNodes = clusterState.getRoutingNodes(); + allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertBusy(() -> { assertFalse(shard.unassigned()); }); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(LOCAL_ONLY, shardPool); + assertEquals(nodePool, shardPool); + } + } + @LockFeatureFlag(WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG) public void testShardPoolForPartialIndices() { String index = "test-index"; IndexMetadata indexMetadata = IndexMetadata.builder(index) - .settings( - settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) - .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true) - ) + .settings(settings(Version.CURRENT).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true)) .numberOfShards(PRIMARIES) .numberOfReplicas(REPLICAS) .build(); @@ -119,7 +167,7 @@ public void testShardPoolForPartialIndices() { public void testShardPoolForFullIndices() { String index = "test-index"; IndexMetadata indexMetadata = IndexMetadata.builder(index) - .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())) + .settings(settings(Version.CURRENT).put(IS_WARM_INDEX_SETTING.getKey(), false)) .numberOfShards(PRIMARIES) .numberOfReplicas(REPLICAS) .build(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java index e90c959dc0b18..3ce8df5e88d02 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java @@ -15,7 +15,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexModule; -import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; @SuppressForbidden(reason = "feature flag overrides") @@ -24,8 +23,9 @@ public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancer public ClusterState updateIndexMetadataForTiering( ClusterState clusterState, int localIndices, + int remoteIndices, String tieringState, - String dataLocality + boolean isWarmIndex ) { Metadata.Builder mb = Metadata.builder(clusterState.metadata()); for (int i = 0; i < localIndices; i++) { @@ -38,8 +38,21 @@ public ClusterState updateIndexMetadataForTiering( .put(settings) .put(settings) .put(INDEX_TIERING_STATE.getKey(), tieringState) - .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true) - .put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality) + .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), isWarmIndex) + ) + ); + } + for (int i = 0; i < remoteIndices; i++) { + IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, true)); + Settings settings = indexMetadata.getSettings(); + mb.put( + IndexMetadata.builder(indexMetadata) + .settings( + Settings.builder() + .put(settings) + .put(settings) + .put(INDEX_TIERING_STATE.getKey(), tieringState) + .put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), isWarmIndex) ) ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java index 9e25e86ec0797..51b59306ebb2c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java @@ -113,7 +113,7 @@ public void testTargetPoolHybridAllocationDecisions() { } public void testTargetPoolDedicatedSearchNodeAllocationDecisions() { - ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2); + ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2, false); AllocationService service = this.createRemoteCapableAllocationService(); clusterState = allocateShardsAndBalance(clusterState, service); @@ -202,7 +202,7 @@ public void testTargetPoolDedicatedSearchNodeAllocationDecisions() { } public void testDebugMessage() { - ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2); + ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2, false); AllocationService service = this.createRemoteCapableAllocationService(); clusterState = allocateShardsAndBalance(clusterState, service);