From 10718f92dd18b320ffaea7a13f399731b71aea4c Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 20 Jul 2022 13:04:16 +0200 Subject: [PATCH 1/6] Replace health request with a state observer. This was using health request assuming yellow index is one with all primaries initialized. This is not true as newly created index remain yellow when primaries allocation is trottled. This was discovered while working on a new shards allocator. --- .../action/GetGlobalCheckpointsAction.java | 112 +++++++++--------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java index 945951653f0c8..c906175086e50 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java @@ -16,11 +16,11 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -35,8 +35,10 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; @@ -156,6 +158,7 @@ public static class TransportAction extends org.elasticsearch.action.support.Tra private final ClusterService clusterService; private final NodeClient client; private final IndexNameExpressionResolver resolver; + private final ThreadPool threadPool; @Inject public TransportAction( @@ -163,12 +166,14 @@ public TransportAction( final TransportService transportService, final ClusterService clusterService, final NodeClient client, - final IndexNameExpressionResolver resolver + final IndexNameExpressionResolver resolver, + final ThreadPool threadPool ) { super(NAME, actionFilters, transportService.getTaskManager()); this.clusterService = clusterService; this.client = client; this.resolver = resolver; + this.threadPool = threadPool; } @Override @@ -180,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener li index = resolver.concreteSingleIndex(state, request); } catch (IndexNotFoundException e) { if (request.waitForIndex()) { - handleIndexNotReady(request, listener); + handleIndexNotReady(state, request, listener); } else { listener.onFailure(e); } @@ -194,7 +199,7 @@ protected void doExecute(Task task, Request request, ActionListener li new CheckpointFetcher(client, request, listener, indexMetadata, request.timeout()).run(); } else { if (request.waitForIndex()) { - handleIndexNotReady(request, listener); + handleIndexNotReady(state, request, listener); } else { int active = routingTable.primaryShardsActive(); int total = indexMetadata.getNumberOfShards(); @@ -205,60 +210,61 @@ protected void doExecute(Task task, Request request, ActionListener li } } - private void handleIndexNotReady(final Request request, final ActionListener responseListener) { + private void handleIndexNotReady(ClusterState initialState, Request request, ActionListener listener) { long startNanos = System.nanoTime(); - client.admin() - .cluster() - .prepareHealth(request.index) - .setLocal(true) - .setTimeout(request.timeout()) - .setWaitForYellowStatus() - .setWaitForNoInitializingShards(true) - .execute(new ActionListener<>() { - @Override - public void onResponse(ClusterHealthResponse healthResponse) { - final long elapsedNanos = System.nanoTime() - startNanos; - final ClusterState state = clusterService.state(); - final Index index; - try { - index = resolver.concreteSingleIndex(state, request); - } catch (Exception e) { - responseListener.onFailure(e); - return; - } - - final IndexMetadata indexMetadata = state.getMetadata().index(index); - final IndexRoutingTable routingTable = state.routingTable().index(index); - + var observer = new ClusterStateObserver(initialState, clusterService, request.timeout(), logger, threadPool.getThreadContext()); + + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + try { + var index = resolver.concreteSingleIndex(state, request); + long elapsedNanos = System.nanoTime() - startNanos; long remainingNanos = request.timeout().nanos() - elapsedNanos; - if (routingTable.allPrimaryShardsActive() && remainingNanos > 0) { - new CheckpointFetcher( - client, - request, - responseListener, - indexMetadata, - TimeValue.timeValueNanos(remainingNanos) - ).run(); - } else { - int active = routingTable.primaryShardsActive(); - int total = indexMetadata.getNumberOfShards(); - responseListener.onFailure( - new UnavailableShardsException( - null, - "Primary shards were not active within timeout [timeout={}, shards={}, active={}]", - request.timeout(), - total, - active - ) - ); - } + new CheckpointFetcher( + client, + request, + listener, + state.getMetadata().index(index), + TimeValue.timeValueNanos(remainingNanos) + ).run(); + } catch (Exception e) { + listener.onFailure(e); } + } - @Override - public void onFailure(Exception e) { - responseListener.onFailure(e); + @Override + public void onTimeout(TimeValue timeout) { + final ClusterState state = clusterService.state(); + final Index index; + try { + index = resolver.concreteSingleIndex(state, request); + listener.onFailure( + new UnavailableShardsException( + null, + "Primary shards were not active within timeout [timeout={}, shards={}, active={}]", + request.timeout(), + state.getMetadata().index(index).getNumberOfShards(), + state.routingTable().index(index).primaryShardsActive() + ) + ); + } catch (Exception e) { + listener.onFailure(e); } - }); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + }, state -> { + try { + var index = resolver.concreteSingleIndex(state, request); + return state.routingTable().index(index).allPrimaryShardsActive(); + } catch (Exception e) { + return false; + } + }, request.timeout()); } private static class CheckpointFetcher extends ActionRunnable { From 48174929d5cbc97c8af91e70c86e7228e52e67ac Mon Sep 17 00:00:00 2001 From: Ievgen Degtiarenko Date: Wed, 20 Jul 2022 13:05:59 +0200 Subject: [PATCH 2/6] Update docs/changelog/88641.yaml --- docs/changelog/88641.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/88641.yaml diff --git a/docs/changelog/88641.yaml b/docs/changelog/88641.yaml new file mode 100644 index 0000000000000..1d7a784cd41ce --- /dev/null +++ b/docs/changelog/88641.yaml @@ -0,0 +1,5 @@ +pr: 88641 +summary: Replace health request with a state observer +area: Allocation +type: bug +issues: [] From 3360044da258cefc929f25d6cdddd0a0d5f23c80 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 21 Jul 2022 09:04:19 +0200 Subject: [PATCH 3/6] Add a test case --- .../action/GetGlobalCheckpointsActionIT.java | 104 ++++++++++-------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java index 4380029a331ff..478d0ca99d657 100644 --- a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java +++ b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -70,9 +71,7 @@ public void testGetGlobalCheckpoints() throws Exception { ); final GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).get(); long[] expected = new long[shards]; - for (int i = 0; i < shards; ++i) { - expected[i] = -1; - } + Arrays.fill(expected, -1); assertArrayEquals(expected, response.globalCheckpoints()); final int totalDocuments = shards * 3; @@ -149,7 +148,7 @@ public void testPollGlobalCheckpointAdvancement() throws Exception { } - public void testPollGlobalCheckpointAdvancementTimeout() throws Exception { + public void testPollGlobalCheckpointAdvancementTimeout() { String indexName = "test_index"; client().admin() .indices() @@ -182,7 +181,7 @@ public void testPollGlobalCheckpointAdvancementTimeout() throws Exception { assertEquals(29L, response.globalCheckpoints()[0]); } - public void testMustProvideCorrectNumberOfShards() throws Exception { + public void testMustProvideCorrectNumberOfShards() { String indexName = "test_index"; client().admin() .indices() @@ -214,7 +213,7 @@ public void testMustProvideCorrectNumberOfShards() throws Exception { ); } - public void testWaitForAdvanceOnlySupportsOneShard() throws Exception { + public void testWaitForAdvanceOnlySupportsOneShard() { String indexName = "test_index"; client().admin() .indices() @@ -305,42 +304,63 @@ public void testWaitOnIndexCreated() throws Exception { assertFalse(response.timedOut()); } - public void testPrimaryShardsNotReadyNoWait() throws Exception { - final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request( - "not-assigned", - false, - false, - EMPTY_ARRAY, - TEN_SECONDS - ); + /** + * Cluster remains yellow when initial primary is THROTTLED (and unavailable) during creation. + * This test verifies that implementation can handle this scenario. + */ + public void testWaitOnIndexCreatedWithThrottling() { + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 0).build() + ) + .get(); + client().admin() .indices() - .prepareCreate("not-assigned") + .prepareCreate("throttled-during-creation") .setWaitForActiveShards(ActiveShardCount.NONE) .setSettings( Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) - .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none") ) .get(); - UnavailableShardsException exception = expectThrows( - UnavailableShardsException.class, - () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet() - ); - assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage()); + try { + TimeValue timeout = TimeValue.timeValueMillis(between(10, 100)); + UnavailableShardsException exception = expectThrows( + UnavailableShardsException.class, + () -> client().execute( + GetGlobalCheckpointsAction.INSTANCE, + new GetGlobalCheckpointsAction.Request("throttled-during-creation", true, true, EMPTY_ARRAY, timeout) + ).actionGet() + ); + assertEquals( + "Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", + exception.getMessage() + ); + } finally { + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey()).build() + ) + .get(); + } } - public void testWaitOnPrimaryShardsReadyTimeout() throws Exception { - TimeValue timeout = TimeValue.timeValueMillis(between(1, 100)); + public void testPrimaryShardsNotReadyNoWait() { final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request( "not-assigned", - true, - true, + false, + false, EMPTY_ARRAY, - timeout + TEN_SECONDS ); client().admin() .indices() @@ -359,21 +379,21 @@ public void testWaitOnPrimaryShardsReadyTimeout() throws Exception { UnavailableShardsException.class, () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet() ); - assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage()); + assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage()); } - public void testWaitOnPrimaryShardsReady() throws Exception { - String indexName = "not-assigned"; + public void testWaitOnPrimaryShardsReadyTimeout() { + TimeValue timeout = TimeValue.timeValueMillis(between(1, 100)); final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request( - indexName, + "not-assigned", true, true, EMPTY_ARRAY, - TEN_SECONDS + timeout ); client().admin() .indices() - .prepareCreate(indexName) + .prepareCreate("not-assigned") .setWaitForActiveShards(ActiveShardCount.NONE) .setSettings( Settings.builder() @@ -384,20 +404,10 @@ public void testWaitOnPrimaryShardsReady() throws Exception { ) .get(); - long start = System.nanoTime(); - ActionFuture future = client().execute(GetGlobalCheckpointsAction.INSTANCE, request); - Thread.sleep(randomIntBetween(10, 100)); - client().admin() - .indices() - .prepareUpdateSettings(indexName) - .setSettings(Settings.builder().put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "")) - .get(); - client().prepareIndex(indexName).setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get(); - - GetGlobalCheckpointsAction.Response response = future.actionGet(); - long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds(); - assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds())); - assertThat(response.globalCheckpoints()[0], equalTo(0L)); - assertFalse(response.timedOut()); + UnavailableShardsException exception = expectThrows( + UnavailableShardsException.class, + () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet() + ); + assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage()); } } From b3b56384710ba8e70609842af2fb187b8c33f9af Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 21 Jul 2022 10:53:13 +0200 Subject: [PATCH 4/6] bring back the test --- .../action/GetGlobalCheckpointsActionIT.java | 39 +++++++++++++++++++ .../action/GetGlobalCheckpointsAction.java | 35 +++++++++++------ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java index 478d0ca99d657..a1174fbdbc376 100644 --- a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java +++ b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java @@ -410,4 +410,43 @@ public void testWaitOnPrimaryShardsReadyTimeout() { ); assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage()); } + + public void testWaitOnPrimaryShardsReady() throws Exception { + String indexName = "not-assigned"; + final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request( + indexName, + true, + true, + EMPTY_ARRAY, + TEN_SECONDS + ); + client().admin() + .indices() + .prepareCreate(indexName) + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none") + ) + .get(); + + long start = System.nanoTime(); + ActionFuture future = client().execute(GetGlobalCheckpointsAction.INSTANCE, request); + Thread.sleep(randomIntBetween(10, 100)); + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "")) + .get(); + client().prepareIndex(indexName).setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get(); + + GetGlobalCheckpointsAction.Response response = future.actionGet(); + long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds(); + assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds())); + assertThat(response.globalCheckpoints()[0], equalTo(0L)); + assertFalse(response.timedOut()); + } } diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java index c906175086e50..3a98339ef0918 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java @@ -221,13 +221,25 @@ public void onNewClusterState(ClusterState state) { var index = resolver.concreteSingleIndex(state, request); long elapsedNanos = System.nanoTime() - startNanos; long remainingNanos = request.timeout().nanos() - elapsedNanos; - new CheckpointFetcher( - client, - request, - listener, - state.getMetadata().index(index), - TimeValue.timeValueNanos(remainingNanos) - ).run(); + if (remainingNanos > 0) { + new CheckpointFetcher( + client, + request, + listener, + state.getMetadata().index(index), + TimeValue.timeValueNanos(remainingNanos) + ).run(); + } else { + listener.onFailure( + new UnavailableShardsException( + null, + "Primary shards were not active within timeout [timeout={}, shards={}, active={}]", + request.timeout(), + state.getMetadata().index(index).getNumberOfShards(), + state.routingTable().index(index).primaryShardsActive() + ) + ); + } } catch (Exception e) { listener.onFailure(e); } @@ -235,10 +247,9 @@ public void onNewClusterState(ClusterState state) { @Override public void onTimeout(TimeValue timeout) { - final ClusterState state = clusterService.state(); - final Index index; try { - index = resolver.concreteSingleIndex(state, request); + var state = clusterService.state(); + var index = resolver.concreteSingleIndex(state, request); listener.onFailure( new UnavailableShardsException( null, @@ -260,7 +271,9 @@ public void onClusterServiceClose() { }, state -> { try { var index = resolver.concreteSingleIndex(state, request); - return state.routingTable().index(index).allPrimaryShardsActive(); + boolean b = state.routingTable().index(index).allPrimaryShardsActive(); + logger.info("---> {}", b); + return b; } catch (Exception e) { return false; } From 56609fcb160ee92d031104a7cb729a5883edce89 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 21 Jul 2022 11:05:47 +0200 Subject: [PATCH 5/6] rework the test --- .../action/GetGlobalCheckpointsActionIT.java | 96 +++++++++---------- 1 file changed, 46 insertions(+), 50 deletions(-) diff --git a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java index a1174fbdbc376..989d4d10dc386 100644 --- a/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java +++ b/x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java @@ -304,56 +304,6 @@ public void testWaitOnIndexCreated() throws Exception { assertFalse(response.timedOut()); } - /** - * Cluster remains yellow when initial primary is THROTTLED (and unavailable) during creation. - * This test verifies that implementation can handle this scenario. - */ - public void testWaitOnIndexCreatedWithThrottling() { - - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 0).build() - ) - .get(); - - client().admin() - .indices() - .prepareCreate("throttled-during-creation") - .setWaitForActiveShards(ActiveShardCount.NONE) - .setSettings( - Settings.builder() - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - ) - .get(); - - try { - TimeValue timeout = TimeValue.timeValueMillis(between(10, 100)); - UnavailableShardsException exception = expectThrows( - UnavailableShardsException.class, - () -> client().execute( - GetGlobalCheckpointsAction.INSTANCE, - new GetGlobalCheckpointsAction.Request("throttled-during-creation", true, true, EMPTY_ARRAY, timeout) - ).actionGet() - ); - assertEquals( - "Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", - exception.getMessage() - ); - } finally { - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().putNull(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey()).build() - ) - .get(); - } - } - public void testPrimaryShardsNotReadyNoWait() { final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request( "not-assigned", @@ -449,4 +399,50 @@ public void testWaitOnPrimaryShardsReady() throws Exception { assertThat(response.globalCheckpoints()[0], equalTo(0L)); assertFalse(response.timedOut()); } + + public void testWaitOnPrimaryShardThrottled() throws Exception { + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 0).build() + ) + .get(); + + String indexName = "throttled"; + client().admin() + .indices() + .prepareCreate(indexName) + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ) + .get(); + + long start = System.nanoTime(); + var future = client().execute( + GetGlobalCheckpointsAction.INSTANCE, + new GetGlobalCheckpointsAction.Request(indexName, true, true, EMPTY_ARRAY, TEN_SECONDS) + ); + Thread.sleep(randomIntBetween(10, 100)); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().putNull(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey()).build() + ) + .get(); + client().prepareIndex(indexName).setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get(); + + var response = future.actionGet(); + long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds(); + assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds())); + assertThat(response.globalCheckpoints()[0], equalTo(0L)); + assertFalse(response.timedOut()); + } } From f0441c2eb9cbe96626de0cd08c29eddba64ee391 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 21 Jul 2022 11:07:26 +0200 Subject: [PATCH 6/6] remove debug info --- .../xpack/fleet/action/GetGlobalCheckpointsAction.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java index 3a98339ef0918..b7856daa8d842 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java @@ -271,9 +271,7 @@ public void onClusterServiceClose() { }, state -> { try { var index = resolver.concreteSingleIndex(state, request); - boolean b = state.routingTable().index(index).allPrimaryShardsActive(); - logger.info("---> {}", b); - return b; + return state.routingTable().index(index).allPrimaryShardsActive(); } catch (Exception e) { return false; }