Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88641.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88641
summary: Replace health request with a state observer
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -70,9 +71,7 @@ public void testGetGlobalCheckpoints() throws Exception {
);
final GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).get();
long[] expected = new long[shards];
for (int i = 0; i < shards; ++i) {
expected[i] = -1;
}
Arrays.fill(expected, -1);
assertArrayEquals(expected, response.globalCheckpoints());

final int totalDocuments = shards * 3;
Expand Down Expand Up @@ -149,7 +148,7 @@ public void testPollGlobalCheckpointAdvancement() throws Exception {

}

public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
public void testPollGlobalCheckpointAdvancementTimeout() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -182,7 +181,7 @@ public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
assertEquals(29L, response.globalCheckpoints()[0]);
}

public void testMustProvideCorrectNumberOfShards() throws Exception {
public void testMustProvideCorrectNumberOfShards() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -214,7 +213,7 @@ public void testMustProvideCorrectNumberOfShards() throws Exception {
);
}

public void testWaitForAdvanceOnlySupportsOneShard() throws Exception {
public void testWaitForAdvanceOnlySupportsOneShard() {
String indexName = "test_index";
client().admin()
.indices()
Expand Down Expand Up @@ -305,42 +304,63 @@ public void testWaitOnIndexCreated() throws Exception {
assertFalse(response.timedOut());
}

public void testPrimaryShardsNotReadyNoWait() throws Exception {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
false,
false,
EMPTY_ARRAY,
TEN_SECONDS
);
/**
* Cluster remains yellow when initial primary is THROTTLED (and unavailable) during creation.
* This test verifies that implementation can handle this scenario.
*/
public void testWaitOnIndexCreatedWithThrottling() {

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 0).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm it seems like a bug that we even permit 0 here. I don't see a good reason to do this in production and it would be pretty harmful to do this accidentally. Ok for now, but if we fixed this bug we'd need to find some other way to delay allocation.

)
.get();

client().admin()
.indices()
.prepareCreate("not-assigned")
.prepareCreate("throttled-during-creation")
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", "none")
)
.get();

UnavailableShardsException exception = expectThrows(
UnavailableShardsException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage());
try {
TimeValue timeout = TimeValue.timeValueMillis(between(10, 100));
UnavailableShardsException exception = expectThrows(
UnavailableShardsException.class,
() -> client().execute(
GetGlobalCheckpointsAction.INSTANCE,
new GetGlobalCheckpointsAction.Request("throttled-during-creation", true, true, EMPTY_ARRAY, timeout)
).actionGet()
);
assertEquals(
"Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]",
exception.getMessage()
);
} finally {
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey()).build()
)
.get();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline diff is totally confusing, consider reviewing in a split mode.

In short, this test:

  • sets cluster.routing.allocation.node_initial_primaries_recoveries=0 so that every created index is throttled
  • creates an index
  • executes GetGlobalCheckpointsAction with a timeout
  • asserts it is complaining about unavailable shards rather then anything else
  • reverts the setting to a default value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I could also rewrite it to revert the setting after executing GetGlobalCheckpointsAction and assert it could get result back but I am not sure it would make the test better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we want that. As it stands the test suite still passes on master today (i.e. reverting the production-code changes in this PR). I think we have to check the success case here to see the value in this change.

}

public void testWaitOnPrimaryShardsReadyTimeout() throws Exception {
TimeValue timeout = TimeValue.timeValueMillis(between(1, 100));
public void testPrimaryShardsNotReadyNoWait() {
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
"not-assigned",
true,
true,
false,
false,
EMPTY_ARRAY,
timeout
TEN_SECONDS
);
client().admin()
.indices()
Expand All @@ -359,21 +379,21 @@ public void testWaitOnPrimaryShardsReadyTimeout() throws Exception {
UnavailableShardsException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage());
assertEquals("Primary shards were not active [shards=1, active=0]", exception.getMessage());
}

public void testWaitOnPrimaryShardsReady() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't want to remove this test. It's still valid isn't it?

I was expecting us to strengthen this test to verify that creating the index concurrently with running the action still works. It turns out we already have that test, so we can leave this one as-is IMO.

String indexName = "not-assigned";
public void testWaitOnPrimaryShardsReadyTimeout() {
TimeValue timeout = TimeValue.timeValueMillis(between(1, 100));
final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
indexName,
"not-assigned",
true,
true,
EMPTY_ARRAY,
TEN_SECONDS
timeout
);
client().admin()
.indices()
.prepareCreate(indexName)
.prepareCreate("not-assigned")
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
Expand All @@ -384,20 +404,10 @@ public void testWaitOnPrimaryShardsReady() throws Exception {
)
.get();

long start = System.nanoTime();
ActionFuture<GetGlobalCheckpointsAction.Response> future = client().execute(GetGlobalCheckpointsAction.INSTANCE, request);
Thread.sleep(randomIntBetween(10, 100));
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "node", ""))
.get();
client().prepareIndex(indexName).setId(Integer.toString(0)).setSource("{}", XContentType.JSON).get();

GetGlobalCheckpointsAction.Response response = future.actionGet();
long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
assertThat(elapsed, lessThanOrEqualTo(TEN_SECONDS.seconds()));
assertThat(response.globalCheckpoints()[0], equalTo(0L));
assertFalse(response.timedOut());
UnavailableShardsException exception = expectThrows(
UnavailableShardsException.class,
() -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
);
assertEquals("Primary shards were not active within timeout [timeout=" + timeout + ", shards=1, active=0]", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -35,8 +35,10 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -156,19 +158,22 @@ public static class TransportAction extends org.elasticsearch.action.support.Tra
private final ClusterService clusterService;
private final NodeClient client;
private final IndexNameExpressionResolver resolver;
private final ThreadPool threadPool;

@Inject
public TransportAction(
final ActionFilters actionFilters,
final TransportService transportService,
final ClusterService clusterService,
final NodeClient client,
final IndexNameExpressionResolver resolver
final IndexNameExpressionResolver resolver,
final ThreadPool threadPool
) {
super(NAME, actionFilters, transportService.getTaskManager());
this.clusterService = clusterService;
this.client = client;
this.resolver = resolver;
this.threadPool = threadPool;
}

@Override
Expand All @@ -180,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
index = resolver.concreteSingleIndex(state, request);
} catch (IndexNotFoundException e) {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
listener.onFailure(e);
}
Expand All @@ -194,7 +199,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new CheckpointFetcher(client, request, listener, indexMetadata, request.timeout()).run();
} else {
if (request.waitForIndex()) {
handleIndexNotReady(request, listener);
handleIndexNotReady(state, request, listener);
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
Expand All @@ -205,60 +210,61 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}

private void handleIndexNotReady(final Request request, final ActionListener<Response> responseListener) {
private void handleIndexNotReady(ClusterState initialState, Request request, ActionListener<Response> listener) {
long startNanos = System.nanoTime();
client.admin()
.cluster()
.prepareHealth(request.index)
.setLocal(true)
.setTimeout(request.timeout())
.setWaitForYellowStatus()
.setWaitForNoInitializingShards(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(ClusterHealthResponse healthResponse) {
final long elapsedNanos = System.nanoTime() - startNanos;
final ClusterState state = clusterService.state();
final Index index;
try {
index = resolver.concreteSingleIndex(state, request);
} catch (Exception e) {
responseListener.onFailure(e);
return;
}

final IndexMetadata indexMetadata = state.getMetadata().index(index);
final IndexRoutingTable routingTable = state.routingTable().index(index);

var observer = new ClusterStateObserver(initialState, clusterService, request.timeout(), logger, threadPool.getThreadContext());

observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
var index = resolver.concreteSingleIndex(state, request);
long elapsedNanos = System.nanoTime() - startNanos;
long remainingNanos = request.timeout().nanos() - elapsedNanos;
if (routingTable.allPrimaryShardsActive() && remainingNanos > 0) {
new CheckpointFetcher(
client,
request,
responseListener,
indexMetadata,
TimeValue.timeValueNanos(remainingNanos)
).run();
} else {
int active = routingTable.primaryShardsActive();
int total = indexMetadata.getNumberOfShards();
responseListener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
total,
active
)
);
}
new CheckpointFetcher(
client,
request,
listener,
state.getMetadata().index(index),
TimeValue.timeValueNanos(remainingNanos)
).run();
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
responseListener.onFailure(e);
@Override
public void onTimeout(TimeValue timeout) {
final ClusterState state = clusterService.state();
final Index index;
try {
index = resolver.concreteSingleIndex(state, request);
listener.onFailure(
new UnavailableShardsException(
null,
"Primary shards were not active within timeout [timeout={}, shards={}, active={}]",
request.timeout(),
state.getMetadata().index(index).getNumberOfShards(),
state.routingTable().index(index).primaryShardsActive()
)
);
} catch (Exception e) {
listener.onFailure(e);
}
});
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
}, state -> {
try {
var index = resolver.concreteSingleIndex(state, request);
return state.routingTable().index(index).allPrimaryShardsActive();
} catch (Exception e) {
return false;
}
}, request.timeout());
}

private static class CheckpointFetcher extends ActionRunnable<Response> {
Expand Down