Skip to content

Commit 46b508d

Browse files
authored
Add wait_for_no_initializing_shards to cluster health API (#27489)
This adds a new option to the cluster health request allowing to wait until there is no initializing shards. Closes #25623
1 parent 93a988c commit 46b508d

File tree

9 files changed

+252
-17
lines changed

9 files changed

+252
-17
lines changed

core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.cluster.health;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRequestValidationException;
2324
import org.elasticsearch.action.IndicesRequest;
2425
import org.elasticsearch.action.support.ActiveShardCount;
@@ -40,6 +41,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
4041
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
4142
private ClusterHealthStatus waitForStatus;
4243
private boolean waitForNoRelocatingShards = false;
44+
private boolean waitForNoInitializingShards = false;
4345
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
4446
private String waitForNodes = "";
4547
private Priority waitForEvents = null;
@@ -72,6 +74,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
7274
if (in.readBoolean()) {
7375
waitForEvents = Priority.readFrom(in);
7476
}
77+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
78+
waitForNoInitializingShards = in.readBoolean();
79+
}
7580
}
7681

7782
@Override
@@ -101,6 +106,9 @@ public void writeTo(StreamOutput out) throws IOException {
101106
out.writeBoolean(true);
102107
Priority.writeTo(waitForEvents, out);
103108
}
109+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
110+
out.writeBoolean(waitForNoInitializingShards);
111+
}
104112
}
105113

106114
@Override
@@ -167,6 +175,21 @@ public ClusterHealthRequest waitForNoRelocatingShards(boolean waitForNoRelocatin
167175
return this;
168176
}
169177

178+
public boolean waitForNoInitializingShards() {
179+
return waitForNoInitializingShards;
180+
}
181+
182+
/**
183+
* Sets whether the request should wait for there to be no initializing shards before
184+
* retrieving the cluster health status. Defaults to {@code false}, meaning the
185+
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
186+
* to wait until the number of initializing shards in the cluster is 0.
187+
*/
188+
public ClusterHealthRequest waitForNoInitializingShards(boolean waitForNoInitializingShards) {
189+
this.waitForNoInitializingShards = waitForNoInitializingShards;
190+
return this;
191+
}
192+
170193
public ActiveShardCount waitForActiveShards() {
171194
return waitForActiveShards;
172195
}

core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ public ClusterHealthRequestBuilder setWaitForNoRelocatingShards(boolean waitForR
7373
return this;
7474
}
7575

76+
/**
77+
* Sets whether the request should wait for there to be no initializing shards before
78+
* retrieving the cluster health status. Defaults to <code>false</code>, meaning the
79+
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
80+
* to wait until the number of initializing shards in the cluster is 0.
81+
*/
82+
public ClusterHealthRequestBuilder setWaitForNoInitializingShards(boolean waitForNoInitializingShards) {
83+
request.waitForNoInitializingShards(waitForNoInitializingShards);
84+
return this;
85+
}
86+
7687
/**
7788
* Sets the number of shard copies that must be active before getting the health status.
7889
* Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.

core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -142,24 +142,26 @@ public void onFailure(String source, Exception e) {
142142
}
143143

144144
private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
145-
int waitFor = 5;
146-
if (request.waitForStatus() == null) {
147-
waitFor--;
145+
int waitFor = 0;
146+
if (request.waitForStatus() != null) {
147+
waitFor++;
148148
}
149-
if (request.waitForNoRelocatingShards() == false) {
150-
waitFor--;
149+
if (request.waitForNoRelocatingShards()) {
150+
waitFor++;
151151
}
152-
if (request.waitForActiveShards().equals(ActiveShardCount.NONE)) {
153-
waitFor--;
152+
if (request.waitForNoInitializingShards()) {
153+
waitFor++;
154154
}
155-
if (request.waitForNodes().isEmpty()) {
156-
waitFor--;
155+
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
156+
waitFor++;
157+
}
158+
if (request.waitForNodes().isEmpty() == false) {
159+
waitFor++;
157160
}
158-
if (request.indices() == null || request.indices().length == 0) { // check that they actually exists in the meta data
159-
waitFor--;
161+
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
162+
waitFor++;
160163
}
161164

162-
assert waitFor >= 0;
163165
final ClusterState state = clusterService.state();
164166
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
165167
if (request.timeout().millis() == 0) {
@@ -196,13 +198,15 @@ public void onTimeout(TimeValue timeout) {
196198
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
197199
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
198200
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
199-
return prepareResponse(request, response, clusterState, waitFor);
201+
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
202+
return readyCounter == waitFor;
200203
}
201204

202205
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
203206
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
204207
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
205-
boolean valid = prepareResponse(request, response, clusterState, waitFor);
208+
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
209+
boolean valid = (readyCounter == waitFor);
206210
assert valid || timedOut;
207211
// we check for a timeout here since this method might be called from the wait_for_events
208212
// response handler which might have timed out already.
@@ -213,14 +217,18 @@ private ClusterHealthResponse getResponse(final ClusterHealthRequest request, Cl
213217
return response;
214218
}
215219

216-
private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) {
220+
static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response,
221+
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
217222
int waitForCounter = 0;
218223
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
219224
waitForCounter++;
220225
}
221226
if (request.waitForNoRelocatingShards() && response.getRelocatingShards() == 0) {
222227
waitForCounter++;
223228
}
229+
if (request.waitForNoInitializingShards() && response.getInitializingShards() == 0) {
230+
waitForCounter++;
231+
}
224232
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
225233
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
226234
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false :
@@ -292,7 +300,7 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
292300
}
293301
}
294302
}
295-
return waitForCounter == waitFor;
303+
return waitForCounter;
296304
}
297305

298306

core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterHealthAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
6262
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
6363
}
6464
clusterHealthRequest.waitForNoRelocatingShards(
65-
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
65+
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
66+
clusterHealthRequest.waitForNoInitializingShards(
67+
request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards()));
6668
if (request.hasParam("wait_for_relocating_shards")) {
6769
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
6870
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.health;
21+
22+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
23+
import org.elasticsearch.common.Priority;
24+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
25+
import org.elasticsearch.common.io.stream.StreamInput;
26+
import org.elasticsearch.test.ESTestCase;
27+
28+
import static org.hamcrest.core.IsEqual.equalTo;
29+
30+
public class ClusterHealthRequestTests extends ESTestCase {
31+
public void testSerialize() throws Exception {
32+
final ClusterHealthRequest originalRequest = randomRequest();
33+
final ClusterHealthRequest cloneRequest;
34+
try (BytesStreamOutput out = new BytesStreamOutput()) {
35+
originalRequest.writeTo(out);
36+
try (StreamInput in = out.bytes().streamInput()) {
37+
cloneRequest = new ClusterHealthRequest(in);
38+
}
39+
}
40+
assertThat(cloneRequest.waitForStatus(), equalTo(originalRequest.waitForStatus()));
41+
assertThat(cloneRequest.waitForNodes(), equalTo(originalRequest.waitForNodes()));
42+
assertThat(cloneRequest.waitForNoInitializingShards(), equalTo(originalRequest.waitForNoInitializingShards()));
43+
assertThat(cloneRequest.waitForNoRelocatingShards(), equalTo(originalRequest.waitForNoRelocatingShards()));
44+
assertThat(cloneRequest.waitForActiveShards(), equalTo(originalRequest.waitForActiveShards()));
45+
assertThat(cloneRequest.waitForEvents(), equalTo(originalRequest.waitForEvents()));
46+
}
47+
48+
ClusterHealthRequest randomRequest() {
49+
ClusterHealthRequest request = new ClusterHealthRequest();
50+
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
51+
request.waitForNodes(randomFrom("", "<", "<=", ">", ">=") + between(0, 1000));
52+
request.waitForNoInitializingShards(randomBoolean());
53+
request.waitForNoRelocatingShards(randomBoolean());
54+
request.waitForActiveShards(randomIntBetween(0, 10));
55+
request.waitForEvents(randomFrom(Priority.values()));
56+
return request;
57+
}
58+
59+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.health;
21+
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.ClusterName;
24+
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.metadata.IndexMetaData;
26+
import org.elasticsearch.cluster.metadata.MetaData;
27+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
28+
import org.elasticsearch.cluster.routing.RoutingTable;
29+
import org.elasticsearch.cluster.routing.ShardRoutingState;
30+
import org.elasticsearch.cluster.routing.TestShardRouting;
31+
import org.elasticsearch.common.Randomness;
32+
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.index.Index;
34+
import org.elasticsearch.index.shard.ShardId;
35+
import org.elasticsearch.test.ESTestCase;
36+
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.stream.IntStream;
40+
41+
import static org.hamcrest.core.IsEqual.equalTo;
42+
43+
public class TransportClusterHealthActionTests extends ESTestCase {
44+
45+
public void testWaitForInitializingShards() throws Exception {
46+
final String[] indices = {"test"};
47+
final ClusterHealthRequest request = new ClusterHealthRequest();
48+
request.waitForNoInitializingShards(true);
49+
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
50+
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState);
51+
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
52+
53+
request.waitForNoInitializingShards(true);
54+
clusterState = randomClusterStateWithInitializingShards("test", between(1, 10));
55+
response = new ClusterHealthResponse("", indices, clusterState);
56+
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
57+
58+
request.waitForNoInitializingShards(false);
59+
clusterState = randomClusterStateWithInitializingShards("test", randomInt(20));
60+
response = new ClusterHealthResponse("", indices, clusterState);
61+
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
62+
}
63+
64+
ClusterState randomClusterStateWithInitializingShards(String index, final int initializingShards) {
65+
final IndexMetaData indexMetaData = IndexMetaData
66+
.builder(index)
67+
.settings(settings(Version.CURRENT))
68+
.numberOfShards(between(1, 10))
69+
.numberOfReplicas(randomInt(20))
70+
.build();
71+
72+
final List<ShardRoutingState> shardRoutingStates = new ArrayList<>();
73+
IntStream.range(0, between(1, 30)).forEach(i -> shardRoutingStates.add(randomFrom(
74+
ShardRoutingState.STARTED, ShardRoutingState.UNASSIGNED, ShardRoutingState.RELOCATING)));
75+
IntStream.range(0, initializingShards).forEach(i -> shardRoutingStates.add(ShardRoutingState.INITIALIZING));
76+
Randomness.shuffle(shardRoutingStates);
77+
78+
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
79+
final IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(indexMetaData.getIndex());
80+
81+
// Primary
82+
{
83+
ShardRoutingState state = shardRoutingStates.remove(0);
84+
String node = state == ShardRoutingState.UNASSIGNED ? null : "node";
85+
routingTable.addShard(
86+
TestShardRouting.newShardRouting(shardId, node, "relocating", true, state)
87+
);
88+
}
89+
90+
// Replicas
91+
for (int i = 0; i < shardRoutingStates.size(); i++) {
92+
ShardRoutingState state = shardRoutingStates.get(i);
93+
String node = state == ShardRoutingState.UNASSIGNED ? null : "node" + i;
94+
routingTable.addShard(TestShardRouting.newShardRouting(shardId, node, "relocating"+i, randomBoolean(), state));
95+
}
96+
97+
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
98+
.metaData(MetaData.builder().put(indexMetaData, true))
99+
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
100+
.build();
101+
}
102+
}

docs/reference/cluster/health.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ The cluster health API accepts the following request parameters:
8989
for the cluster to have no shard relocations. Defaults to false, which means
9090
it will not wait for relocating shards.
9191

92+
`wait_for_no_initializing_shards`::
93+
A boolean value which controls whether to wait (until the timeout provided)
94+
for the cluster to have no shard initializations. Defaults to false, which means
95+
it will not wait for initializing shards.
96+
9297
`wait_for_active_shards`::
9398
A number controlling to how many active shards to wait for, `all` to wait
9499
for all shards in the cluster to be active, or `0` to not wait. Defaults to `0`.

rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
"type" : "boolean",
4848
"description" : "Whether to wait until there are no relocating shards in the cluster"
4949
},
50+
"wait_for_no_initializing_shards": {
51+
"type" : "boolean",
52+
"description" : "Whether to wait until there are no initializing shards in the cluster"
53+
},
5054
"wait_for_status": {
5155
"type" : "enum",
5256
"options" : ["green","yellow","red"],

rest-api-spec/src/main/resources/rest-api-spec/test/cluster.health/10_basic.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,27 @@
9292
- match: { unassigned_shards: 0 }
9393
- gte: { number_of_pending_tasks: 0 }
9494

95+
---
96+
"cluster health basic test, one index with wait for no initializing shards":
97+
- skip:
98+
version: " - 6.99.99"
99+
reason: "wait_for_no_initializing_shards is introduced in 7.0.0"
100+
101+
- do:
102+
indices.create:
103+
index: test_index
104+
wait_for_active_shards: 0
105+
body:
106+
settings:
107+
index:
108+
number_of_replicas: 0
109+
110+
- do:
111+
cluster.health:
112+
wait_for_no_initializing_shards: true
113+
114+
- match: { initializing_shards: 0 }
115+
95116
---
96117
"cluster health levels":
97118
- do:

0 commit comments

Comments
 (0)