Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88641.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88641
summary: Replace health request with a state observer
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -35,8 +35,10 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -156,19 +158,22 @@ public static class TransportAction extends org.elasticsearch.action.support.Tra
private final ClusterService clusterService;
private final NodeClient client;
private final IndexNameExpressionResolver resolver;
private final ThreadPool threadPool;

@Inject
public TransportAction(
final ActionFilters actionFilters,
final TransportService transportService,
final ClusterService clusterService,
final NodeClient client,
final IndexNameExpressionResolver resolver
final IndexNameExpressionResolver resolver,
final ThreadPool threadPool
) {
super(NAME, actionFilters, transportService.getTaskManager());
this.clusterService = clusterService;
this.client = client;
this.resolver = resolver;
this.threadPool = threadPool;
}

@Override
Expand All @@ -180,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
index = resolver.concreteSingleIndex(state, request);
} catch (IndexNotFoundException e) {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
listener.onFailure(e);
}
Expand All @@ -194,7 +199,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new CheckpointFetcher(client, request, listener, indexMetadata, request.timeout()).run();
} else {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
Expand All @@ -205,60 +210,61 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}

private void handleIndexNotReady(final Request request, final ActionListener<Response> responseListener) {
private void handleIndexNotReady(ClusterState initialState, Request request, ActionListener<Response> listener) {
long startNanos = System.nanoTime();
client.admin()
.cluster()
.prepareHealth(request.index)
.setLocal(true)
.setTimeout(request.timeout())
.setWaitForYellowStatus()
.setWaitForNoInitializingShards(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(ClusterHealthResponse healthResponse) {
final long elapsedNanos = System.nanoTime() - startNanos;
final ClusterState state = clusterService.state();
final Index index;
try {
index = resolver.concreteSingleIndex(state, request);
} catch (Exception e) {
responseListener.onFailure(e);
return;
}

final IndexMetadata indexMetadata = state.getMetadata().index(index);
final IndexRoutingTable routingTable = state.routingTable().index(index);

var observer = new ClusterStateObserver(initialState, clusterService, request.timeout(), logger, threadPool.getThreadContext());

observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
var index = resolver.concreteSingleIndex(state, request);
long elapsedNanos = System.nanoTime() - startNanos;
long remainingNanos = request.timeout().nanos() - elapsedNanos;
if (routingTable.allPrimaryShardsActive() && remainingNanos > 0) {
new CheckpointFetcher(
client,
request,
responseListener,
indexMetadata,
TimeValue.timeValueNanos(remainingNanos)
).run();
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
responseListener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
total,
active
)
);
}
new CheckpointFetcher(
client,
request,
listener,
state.getMetadata().index(index),
TimeValue.timeValueNanos(remainingNanos)
).run();
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
responseListener.onFailure(e);
@Override
public void onTimeout(TimeValue timeout) {
final ClusterState state = clusterService.state();
final Index index;
try {
index = resolver.concreteSingleIndex(state, request);
listener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
state.getMetadata().index(index).getNumberOfShards(),
state.routingTable().index(index).primaryShardsActive()
)
);
} catch (Exception e) {
listener.onFailure(e);
}
});
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
}, state -> {
try {
var index = resolver.concreteSingleIndex(state, request);
return state.routingTable().index(index).allPrimaryShardsActive();
} catch (Exception e) {
return false;
}
}, request.timeout());
}

private static class CheckpointFetcher extends ActionRunnable<Response> {
Expand Down