diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 35f4cf336d886..3507653909b82 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -132,6 +132,7 @@ private static void createFirstConcreteIndex(Client client, String alias, boolean addAlias, ActionListener listener) { + logger.info("About to create first concrete index [{}] with alias [{}]", index, alias); CreateIndexRequestBuilder requestBuilder = client.admin() .indices() .prepareCreate(index); @@ -163,6 +164,7 @@ private static void updateWriteAlias(Client client, @Nullable String currentIndex, String newIndex, ActionListener listener) { + logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex); IndicesAliasesRequestBuilder requestBuilder = client.admin() .indices() .prepareAliases() diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 0b5d799e87e8f..3ff2090b381cf 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -501,7 +503,6 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55221") public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception { initialize("classification_set_upgrade_mode"); indexData(sourceIndex, 300, 0, KEYWORD_FIELD); @@ -519,18 +520,33 @@ public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception { assertThat(analyticsTaskList(), hasSize(1)); assertThat(analyticsAssignedTaskList(), is(empty())); - GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId); - assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation()))); - assertThat(analyticsStats.getNode(), is(nullValue())); + assertBusy(() -> { + try { + GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId); + assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation()))); + assertThat(analyticsStats.getNode(), is(nullValue())); + } catch (ElasticsearchException e) { + logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e); + fail(e.getDetailedMessage()); + } + }); setUpgradeModeTo(false); assertThat(analyticsTaskList(), hasSize(1)); assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1))); - analyticsStats = getAnalyticsStats(jobId); - assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation())))); + assertBusy(() -> { + try { + GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId); + assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation())))); + } catch (ElasticsearchException e) { + logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e); + fail(e.getDetailedMessage()); + } + }); waitUntilAnalyticsIsStopped(jobId); + assertProgress(jobId, 100, 100, 100, 100); } public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 6893ecf9053da..02e7f576cc868 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -212,11 +212,15 @@ private void searchStats(String configId, ActionListener listener) { executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap( multiSearchResponse -> { - for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) { + MultiSearchResponse.Item[] itemResponses = multiSearchResponse.getResponses(); + for (int i = 0; i < itemResponses.length; ++i) { + MultiSearchResponse.Item itemResponse = itemResponses[i]; if (itemResponse.isFailure()) { + SearchRequest itemRequest = multiSearchRequest.requests().get(i); logger.error( new ParameterizedMessage( - "[{}] Item failure encountered during multi search: {}", configId, itemResponse.getFailureMessage()), + "[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}", + configId, itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()), itemResponse.getFailure()); listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure())); return; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 4ce08e998545a..c210b3cfb02a6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -643,9 +643,9 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(StartDataFrameAnal @Override protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params, PersistentTaskState state) { - logger.info("[{}] Starting data frame analytics", params.getId()); DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState(); + logger.info("[{}] Starting data frame analytics from state [{}]", params.getId(), analyticsState); // If we are "stopping" there is nothing to do and we should stop if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index a2bf76b103394..5296ff74164f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -11,7 +11,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -267,10 +266,6 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr process.writeRecord(headerRecord); } - private void indexDataCounts(DataCounts dataCounts) { - IndexRequest indexRequest = new IndexRequest(MlStatsIndex.writeAlias()); - } - private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess process) { if (config.getAnalysis().persistsState() == false) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 2213347c62254..7a6cc7974c3e2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -105,6 +105,7 @@ public void queueStats(InferenceStats stats) { } void stop() { + logger.info("About to stop TrainedModelStatsService"); stopped = true; statsQueue.clear(); @@ -115,6 +116,7 @@ void stop() { } void start() { + logger.info("About to start TrainedModelStatsService"); stopped = false; scheduledFuture = threadPool.scheduleWithFixedDelay(this::updateStats, PERSISTENCE_INTERVAL, @@ -126,11 +128,13 @@ void updateStats() { return; } if (verifiedStatsIndexCreated == false) { + logger.info("About to create the stats index as it does not exist yet"); try { PlainActionFuture listener = new PlainActionFuture<>(); MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, indexNameExpressionResolver, listener); listener.actionGet(); verifiedStatsIndexCreated = true; + logger.info("Created stats index"); } catch (Exception e) { logger.error("failure creating ml stats index for storing model stats", e); return;