Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88641.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88641
summary: Replace health request with a state observer
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -70,9 +71,7 @@ public void testGetGlobalCheckpoints() throws Exception {
);
final GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).get();
long[] expected = new long[shards];
for (int i = 0; i < shards; ++i) {
expected[i] = -1;
}
Arrays.fill(expected, -1);
assertArrayEquals(expected, response.globalCheckpoints());

final int totalDocuments = shards * 3;
Expand Down Expand Up @@ -149,7 +148,7 @@ public void testPollGlobalCheckpointAdvancement() throws Exception {

}

public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
public void testPollGlobalCheckpointAdvancementTimeout() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -182,7 +181,7 @@ public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
assertEquals(29L, response.globalCheckpoints()[0]);
}

public void testMustProvideCorrectNumberOfShards() throws Exception {
public void testMustProvideCorrectNumberOfShards() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -214,7 +213,7 @@ public void testMustProvideCorrectNumberOfShards() throws Exception {
);
}

public void testWaitForAdvanceOnlySupportsOneShard() throws Exception {
public void testWaitForAdvanceOnlySupportsOneShard() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -305,7 +304,7 @@ public void testWaitOnIndexCreated() throws Exception {
assertFalse(response.timedOut());
}

public void testPrimaryShardsNotReadyNoWait() throws Exception {
public void testPrimaryShardsNotReadyNoWait() {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
false,
Expand Down Expand Up @@ -333,7 +332,7 @@ public void testPrimaryShardsNotReadyNoWait() throws Exception {
assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage());
}

public void testWaitOnPrimaryShardsReadyTimeout() throws Exception {
public void testWaitOnPrimaryShardsReadyTimeout() {
TimeValue timeout = TimeValue.timeValueMillis(between(1, 100));
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
Expand Down Expand Up @@ -400,4 +399,50 @@ public void testWaitOnPrimaryShardsReady() throws Exception {
assertThat(response.globalCheckpoints()[0], equalTo(0L));
assertFalse(response.timedOut());
}

public void testWaitOnPrimaryShardThrottled() throws Exception {

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 0).build()
)
.get();

String indexName = "throttled";
client().admin()
.indices()
.prepareCreate(indexName)
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
)
.get();

long start = System.nanoTime();
var future = client().execute(
GetGlobalCheckpointsAction.INSTANCE,
new GetGlobalCheckpointsAction.Request(indexName, true, true, EMPTY_ARRAY, TEN_SECONDS)
);
Thread.sleep(randomIntBetween(10, 100));

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey()).build()
)
.get();
client().prepareIndex(indexName).setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get();

var response = future.actionGet();
long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds()));
assertThat(response.globalCheckpoints()[0], equalTo(0L));
assertFalse(response.timedOut());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -35,8 +35,10 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -156,19 +158,22 @@ public static class TransportAction extends org.elasticsearch.action.support.Tra
private final ClusterService clusterService;
private final NodeClient client;
private final IndexNameExpressionResolver resolver;
private final ThreadPool threadPool;

@Inject
public TransportAction(
final ActionFilters actionFilters,
final TransportService transportService,
final ClusterService clusterService,
final NodeClient client,
final IndexNameExpressionResolver resolver
final IndexNameExpressionResolver resolver,
final ThreadPool threadPool
) {
super(NAME, actionFilters, transportService.getTaskManager());
this.clusterService = clusterService;
this.client = client;
this.resolver = resolver;
this.threadPool = threadPool;
}

@Override
Expand All @@ -180,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
index = resolver.concreteSingleIndex(state, request);
} catch (IndexNotFoundException e) {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
listener.onFailure(e);
}
Expand All @@ -194,7 +199,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new CheckpointFetcher(client, request, listener, indexMetadata, request.timeout()).run();
} else {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
Expand All @@ -205,60 +210,72 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}

private void handleIndexNotReady(final Request request, final ActionListener<Response> responseListener) {
private void handleIndexNotReady(ClusterState initialState, Request request, ActionListener<Response> listener) {
long startNanos = System.nanoTime();
client.admin()
.cluster()
.prepareHealth(request.index)
.setLocal(true)
.setTimeout(request.timeout())
.setWaitForYellowStatus()
.setWaitForNoInitializingShards(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(ClusterHealthResponse healthResponse) {
final long elapsedNanos = System.nanoTime() - startNanos;
final ClusterState state = clusterService.state();
final Index index;
try {
index = resolver.concreteSingleIndex(state, request);
} catch (Exception e) {
responseListener.onFailure(e);
return;
}

final IndexMetadata indexMetadata = state.getMetadata().index(index);
final IndexRoutingTable routingTable = state.routingTable().index(index);

var observer = new ClusterStateObserver(initialState, clusterService, request.timeout(), logger, threadPool.getThreadContext());

observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
var index = resolver.concreteSingleIndex(state, request);
long elapsedNanos = System.nanoTime() - startNanos;
long remainingNanos = request.timeout().nanos() - elapsedNanos;
if (routingTable.allPrimaryShardsActive() && remainingNanos > 0) {
if (remainingNanos > 0) {
new CheckpointFetcher(
client,
request,
responseListener,
indexMetadata,
listener,
state.getMetadata().index(index),
TimeValue.timeValueNanos(remainingNanos)
).run();
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
responseListener.onFailure(
listener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
total,
active
state.getMetadata().index(index).getNumberOfShards(),
state.routingTable().index(index).primaryShardsActive()
)
);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
responseListener.onFailure(e);
@Override
public void onTimeout(TimeValue timeout) {
try {
var state = clusterService.state();
var index = resolver.concreteSingleIndex(state, request);
listener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
state.getMetadata().index(index).getNumberOfShards(),
state.routingTable().index(index).primaryShardsActive()
)
);
} catch (Exception e) {
listener.onFailure(e);
}
});
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
}, state -> {
try {
var index = resolver.concreteSingleIndex(state, request);
return state.routingTable().index(index).allPrimaryShardsActive();
} catch (Exception e) {
return false;
}
}, request.timeout());
}

private static class CheckpointFetcher extends ActionRunnable<Response> {
Expand Down