diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf6f73206460..0241d14cf81a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) - Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988)) - Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088)) +- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139)) ### Changed - Avoid invalid retries in multiple replicas when querying [#17370](https://github.com/opensearch-project/OpenSearch/pull/17370) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 9d893cb6f33c7..e2db9f85131a9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -67,6 +67,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.service.ClusterService; @@ -1479,9 +1480,14 @@ public void testDoNotInfinitelyWaitForMapping() { } /** Makes sure the new cluster-manager does not repeatedly fetch index metadata from recovering replicas */ - public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { + public void testOngoingRecoveryAndClusterManagerFailOverForASFDisabled() throws Exception { String indexName = "test"; - internalCluster().startNodes(2); + // ASF Disabled + internalCluster().startNodes( + 2, + Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build() + ); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); assertAcked( client().admin() @@ -1544,6 +1550,84 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { ensureGreen(indexName); } + public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { + String indexName = "test"; + internalCluster().startNodes(2); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.include._name", nodeWithPrimary) + ) + ); + MockTransportService transport = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + CountDownLatch phase1ReadyBlocked = new CountDownLatch(1); + CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1); + Semaphore blockRecovery = new Semaphore(1); + transport.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action) && blockRecovery.tryAcquire()) { + phase1ReadyBlocked.countDown(); + try { + allowToCompletePhase1Latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + try { + String nodeWithReplica = internalCluster().startDataOnlyNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica) + ) + ); + phase1ReadyBlocked.await(); + internalCluster().restartNode( + clusterService().state().nodes().getClusterManagerNode().getName(), + new InternalTestCluster.RestartCallback() + ); + internalCluster().ensureAtLeastNumDataNodes(3); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .putNull("index.routing.allocation.include._name") + ) + ); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertTrue( + state.routingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size() == 1 + && state.routingTable().index(indexName).shardsWithState(ShardRoutingState.INITIALIZING).size() == 1 + ); + /* + Shard assignment is stuck because recovery is blocked at CLEAN_FILES stage. Once, it times out after 60s the replica shards get assigned. + https://github.com/opensearch-project/OpenSearch/issues/18098. + + Stack trace: + Caused by: org.opensearch.transport.ReceiveTimeoutTransportException: [node_t3][127.0.0.1:56648][internal:index/shard/recovery/clean_files] request_id [20] timed out after [60026ms] + at org.opensearch.transport.TransportService$TimeoutHandler.run(TransportService.java:1399) ~[main/:?] + */ + ensureGreen(TimeValue.timeValueSeconds(62), indexName); + } finally { + allowToCompletePhase1Latch.countDown(); + } + } + public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); List nodes = randomSubsetOf( diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index eb7a1e7209c37..893376abf07c1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -64,21 +64,19 @@ public interface ExistingShardsAllocator { /** * Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk. - * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate + * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate * in one or more go. - * - * Enable this setting if your ExistingShardAllocator is implementing the + *

+ * This setting is enabled by default. In your ExistingShardAllocator implement the * {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method. * The default implementation of this method is not optimized and assigns shards one by one. - * - * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, + *

+ * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be used for it , i.e, * {@link ShardsBatchGatewayAllocator}. - * - * This setting is experimental at this point. */ Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( "cluster.allocator.existing_shards_allocator.batch_enabled", - false, + true, Setting.Property.NodeScope ); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 82229f244239f..ce14cb3442f75 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -85,6 +85,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue replicaShardsBatchGatewayAllocatorTimeout; private volatile Priority followUpRerouteTaskPriority; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); + public static final TimeValue DEFAULT_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); private final ClusterManagerMetrics clusterManagerMetrics; /** @@ -105,7 +106,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { */ public static final Setting PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, - TimeValue.MINUS_ONE, + DEFAULT_ALLOCATOR_TIMEOUT, TimeValue.MINUS_ONE, new Setting.Validator<>() { @Override @@ -129,7 +130,7 @@ public void validate(TimeValue timeValue) { */ public static final Setting REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, - TimeValue.MINUS_ONE, + DEFAULT_ALLOCATOR_TIMEOUT, TimeValue.MINUS_ONE, new Setting.Validator<>() { @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index b1f4b45bb2441..d3ac8b3c198f1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -59,12 +59,14 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; +import org.opensearch.test.gateway.TestShardBatchGatewayAllocator; import java.util.Arrays; import java.util.Collections; @@ -192,8 +194,10 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing final String unrealisticAllocatorName = "unrealistic"; final Map allocatorMap = new HashMap<>(); final TestGatewayAllocator testGatewayAllocator = new TestGatewayAllocator(); + final TestShardBatchGatewayAllocator testShardBatchGatewayAllocator = new TestShardBatchGatewayAllocator(); allocatorMap.put(GatewayAllocator.ALLOCATOR_NAME, testGatewayAllocator); allocatorMap.put(unrealisticAllocatorName, new UnrealisticAllocator()); + allocatorMap.put(ShardsBatchGatewayAllocator.ALLOCATOR_NAME, testShardBatchGatewayAllocator); allocationService.setExistingShardsAllocators(allocatorMap); final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index 0c0dec29c9dbf..c681fb077efe0 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -211,7 +211,10 @@ public void testTwoLoggersDifferentLevel() { public void testMultipleSlowLoggersUseSingleLog4jLogger() { LoggerContext context = (LoggerContext) LogManager.getContext(false); - SearchContext ctx1 = searchContextWithSourceAndTask(createIndex("index-1")); + IndexService index1 = createIndex("index-1"); + IndexService index2 = createIndex("index-2"); + + SearchContext ctx1 = searchContextWithSourceAndTask(index1); IndexSettings settings1 = new IndexSettings( createIndexMetadata(SlowLogLevel.WARN, "index-1", UUIDs.randomBase64UUID()), Settings.EMPTY @@ -219,7 +222,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { SearchSlowLog log1 = new SearchSlowLog(settings1); int numberOfLoggersBefore = context.getLoggers().size(); - SearchContext ctx2 = searchContextWithSourceAndTask(createIndex("index-2")); + SearchContext ctx2 = searchContextWithSourceAndTask(index2); IndexSettings settings2 = new IndexSettings( createIndexMetadata(SlowLogLevel.TRACE, "index-2", UUIDs.randomBase64UUID()), Settings.EMPTY