diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java index 9d2b931013952..3f1782911b060 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java @@ -12,8 +12,8 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -68,10 +68,13 @@ public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client clie final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME) .waitForYellowStatus() .masterNodeTimeout(masterNodeTimeout); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, - ActionListener.wrap( - r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure), - client.admin().cluster()::health); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + ClusterHealthAction.INSTANCE, + request, + ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure) + ); }, finalListener::onFailure); createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 6dad58439e7e4..30919dbfcdbdd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -8,13 +8,19 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import org.elasticsearch.xpack.core.template.TemplateUtils; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + /** * Methods for handling index naming related functions */ @@ -83,6 +89,35 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta finalListener); } + public static void createStateIndexAndAliasIfNecessaryAndWaitForYellow(Client client, + ClusterState state, + IndexNameExpressionResolver resolver, + TimeValue masterNodeTimeout, + final ActionListener finalListener) { + final ActionListener stateIndexAndAliasCreated = ActionListener.wrap(success -> { + final ClusterHealthRequest request = Requests.clusterHealthRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .waitForYellowStatus() + .masterNodeTimeout(masterNodeTimeout); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + ClusterHealthAction.INSTANCE, + request, + ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure) + ); + }, finalListener::onFailure); + + MlIndexAndAlias.createIndexAndAliasIfNecessary( + client, + state, + resolver, + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, + AnomalyDetectorsIndex.jobStateIndexWriteAlias(), + masterNodeTimeout, + stateIndexAndAliasCreated + ); + } + public static String wrappedResultsMapping() { return "{\n\"_doc\" : " + resultsMapping() + "\n}"; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 5ac6beac166ce..c6e0697a39982 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -130,8 +130,13 @@ public void execute(DataFrameAnalyticsTask task, ClusterState clusterState, Time ); // Make sure the state index and alias exist - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(new ParentTaskAssigningClient(client, task.getParentTaskId()), - clusterState, expressionResolver, masterNodeTimeout, stateAliasListener); + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow( + new ParentTaskAssigningClient(client, task.getParentTaskId()), + clusterState, + expressionResolver, + masterNodeTimeout, + stateAliasListener + ); } private void createStatsIndexAndUpdateMappingsIfNecessary(Client client, ClusterState clusterState, TimeValue masterNodeTimeout, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index fd7cd115eb177..cdd8f6820d459 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -525,11 +525,19 @@ public void openJob(JobTask jobTask, ClusterState clusterState, TimeValue master } ); - // Make sure the state index and alias exist + // Make sure the state index and alias exist and are writeable ActionListener resultsMappingUpdateHandler = ActionListener.wrap( - ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, masterNodeTimeout, - stateAliasHandler), - e -> closeHandler.accept(e, true) + ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow( + client, + clusterState, + expressionResolver, + masterNodeTimeout, + stateAliasHandler + ), + e -> { + logger.error(new ParameterizedMessage("[{}] ML state index alias could not be updated", jobId), e); + closeHandler.accept(e, true); + } ); // Try adding the results doc mapping - this updates to the latest version if an old mapping is present diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 0fd0fdaeede77..d08bb28ac4ab1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -9,6 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; @@ -22,6 +25,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.env.Environment; @@ -72,6 +76,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -146,13 +152,36 @@ public class AutodetectProcessManagerTests extends ESTestCase { private Quantiles quantiles = new Quantiles("foo", new Date(), "state"); @Before + @SuppressWarnings("unchecked") public void setup() throws Exception { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); client = mock(Client.class); - threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(client.threadPool()).thenReturn(threadPool); + doAnswer(invocationOnMock -> { + if (invocationOnMock.getArguments()[0] instanceof ActionType) { + ActionType v = (ActionType) invocationOnMock.getArguments()[0]; + ActionListener l = (ActionListener) invocationOnMock.getArguments()[2]; + ParameterizedType parameterizedType = (ParameterizedType) v.getClass().getGenericSuperclass(); + Type t = parameterizedType.getActualTypeArguments()[0]; + if (t.getTypeName().contains("AcknowledgedResponse")) { + ActionListener listener = (ActionListener) l; + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + } + if (t.getTypeName().contains("ClusterHealthResponse")) { + ActionListener listener = (ActionListener) l; + listener.onResponse( + new ClusterHealthResponse("test", new String[0], ClusterState.EMPTY_STATE, 0, 0, 0, TimeValue.ZERO, false) + ); + return null; + } + fail("Mock not configured to handle generic type " + t.getTypeName()); + } + return null; + }).when(client).execute(any(), any(), any()); analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings)); jobManager = mock(JobManager.class);