diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 3507653909b82..6b969a5ed1763 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; @@ -18,6 +20,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -69,13 +72,27 @@ private MlIndexAndAlias() {} * Adds an {@code alias} to that index if it was created, * or to the index with the highest suffix if the index did not have to be created. * The listener is notified with a {@code boolean} that informs whether the index or the alias were created. + * If the index is created, the listener is not called until the index is ready to use via the supplied alias, + * so that a method that receives a success response from this method can safely use the index immediately. */ public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver, String indexPatternPrefix, String alias, - ActionListener listener) { + ActionListener finalListener) { + + // If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready + ActionListener indexCreatedListener = ActionListener.wrap( + created -> { + if (created) { + waitForShardsReady(client, alias, finalListener); + } else { + finalListener.onResponse(false); + } + }, + finalListener::onFailure + ); String legacyIndexWithoutSuffix = indexPatternPrefix; String indexPattern = indexPatternPrefix + "*"; @@ -89,7 +106,7 @@ public static void createIndexAndAliasIfNecessary(Client client, if (concreteIndexNames.length == 0) { if (indexPointedByCurrentWriteAlias.isEmpty()) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener); return; } logger.error( @@ -97,7 +114,7 @@ public static void createIndexAndAliasIfNecessary(Client client, indexPattern, alias, indexPointedByCurrentWriteAlias.get()); } else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) { if (indexPointedByCurrentWriteAlias.isEmpty()) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener); return; } if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) { @@ -107,8 +124,8 @@ public static void createIndexAndAliasIfNecessary(Client client, alias, false, ActionListener.wrap( - unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener), - listener::onFailure) + unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, indexCreatedListener), + finalListener::onFailure) ); return; } @@ -119,12 +136,28 @@ public static void createIndexAndAliasIfNecessary(Client client, if (indexPointedByCurrentWriteAlias.isEmpty()) { assert concreteIndexNames.length > 0; String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); - updateWriteAlias(client, alias, null, latestConcreteIndexName, listener); + updateWriteAlias(client, alias, null, latestConcreteIndexName, finalListener); return; } } // If the alias is set, there is nothing more to do. - listener.onResponse(false); + finalListener.onResponse(false); + } + + private static void waitForShardsReady(Client client, String index, ActionListener listener) { + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(index) + .waitForYellowStatus() + .waitForNoRelocatingShards(true) + .waitForNoInitializingShards(true); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + healthRequest, + ActionListener.wrap( + response -> listener.onResponse(response.isTimedOut() == false), + listener::onFailure), + client.admin().cluster()::health + ); } private static void createFirstConcreteIndex(Client client, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 34ede15f54340..605704cc2ad9a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -7,6 +7,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -19,6 +21,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -66,6 +69,7 @@ public class MlIndexAndAliasTests extends ESTestCase { private ThreadPool threadPool; private IndicesAdminClient indicesAdminClient; + private ClusterAdminClient clusterAdminClient; private AdminClient adminClient; private Client client; private ActionListener listener; @@ -85,8 +89,17 @@ public void setUpMocks() { when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE)); doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any()); + clusterAdminClient = mock(ClusterAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new ClusterHealthResponse()); + return null; + }).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class)); + adminClient = mock(AdminClient.class); when(adminClient.indices()).thenReturn(indicesAdminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index e57f89f92235a..06ee0e48b6fd3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 84d6f39df4369..f2c79e4e42445 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -297,7 +297,6 @@ public void testStopAndRestart() throws Exception { assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807") public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source"; indexData(sourceIndex, 100, 0);