Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private static void createFirstConcreteIndex(Client client,
String alias,
boolean addAlias,
ActionListener<Boolean> listener) {
logger.info("About to create first concrete index [{}] with alias [{}]", index, alias);
CreateIndexRequestBuilder requestBuilder = client.admin()
.indices()
.prepareCreate(index);
Expand Down Expand Up @@ -163,6 +164,7 @@ private static void updateWriteAlias(Client client,
@Nullable String currentIndex,
String newIndex,
ActionListener<Boolean> listener) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
IndicesAliasesRequestBuilder requestBuilder = client.admin()
.indices()
.prepareAliases()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -501,7 +503,6 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55221")
public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
initialize("classification_set_upgrade_mode");
indexData(sourceIndex, 300, 0, KEYWORD_FIELD);
Expand All @@ -519,18 +520,33 @@ public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), is(empty()));

GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(analyticsStats.getNode(), is(nullValue()));
assertBusy(() -> {
try {
GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(analyticsStats.getNode(), is(nullValue()));
} catch (ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
fail(e.getDetailedMessage());
}
});

setUpgradeModeTo(false);
assertThat(analyticsTaskList(), hasSize(1));
assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1)));

analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
assertBusy(() -> {
try {
GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
} catch (ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
fail(e.getDetailedMessage());
}
});

waitUntilAnalyticsIsStopped(jobId);
assertProgress(jobId, 100, 100, 100, 100);
}

public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,15 @@ private void searchStats(String configId, ActionListener<Stats> listener) {

executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap(
multiSearchResponse -> {
for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) {
MultiSearchResponse.Item[] itemResponses = multiSearchResponse.getResponses();
for (int i = 0; i < itemResponses.length; ++i) {
MultiSearchResponse.Item itemResponse = itemResponses[i];
if (itemResponse.isFailure()) {
SearchRequest itemRequest = multiSearchRequest.requests().get(i);
logger.error(
new ParameterizedMessage(
"[{}] Item failure encountered during multi search: {}", configId, itemResponse.getFailureMessage()),
"[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}",
configId, itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()),
itemResponse.getFailure());
listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure()));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,9 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(StartDataFrameAnal
@Override
protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params,
PersistentTaskState state) {
logger.info("[{}] Starting data frame analytics", params.getId());
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
logger.info("[{}] Starting data frame analytics from state [{}]", params.getId(), analyticsState);

// If we are "stopping" there is nothing to do and we should stop
if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -267,10 +266,6 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr
process.writeRecord(headerRecord);
}

private void indexDataCounts(DataCounts dataCounts) {
IndexRequest indexRequest = new IndexRequest(MlStatsIndex.writeAlias());
}

private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
AnalyticsProcess<AnalyticsResult> process) {
if (config.getAnalysis().persistsState() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void queueStats(InferenceStats stats) {
}

void stop() {
logger.info("About to stop TrainedModelStatsService");
stopped = true;
statsQueue.clear();

Expand All @@ -115,6 +116,7 @@ void stop() {
}

void start() {
logger.info("About to start TrainedModelStatsService");
stopped = false;
scheduledFuture = threadPool.scheduleWithFixedDelay(this::updateStats,
PERSISTENCE_INTERVAL,
Expand All @@ -126,11 +128,13 @@ void updateStats() {
return;
}
if (verifiedStatsIndexCreated == false) {
logger.info("About to create the stats index as it does not exist yet");
try {
PlainActionFuture<Boolean> listener = new PlainActionFuture<>();
MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, indexNameExpressionResolver, listener);
listener.actionGet();
verifiedStatsIndexCreated = true;
logger.info("Created stats index");
} catch (Exception e) {
logger.error("failure creating ml stats index for storing model stats", e);
return;
Expand Down