Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,12 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
}, exception -> { log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception); }));
} else {
log.info("AD Job was disabled for " + detectorId);
// function.execute();
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
}
} else {
log.info("AD Job was not found for " + detectorId);
// function.execute();
}
}, exception -> log.error("JobRunner failed to get detector job " + detectorId, exception));

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskManager,
nodeFilter,
threadPool,
client
client,
stateManager,
adTaskCacheManager,
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);

// return objects used by Guice to inject dependencies for e.g.,
Expand Down
54 changes: 9 additions & 45 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -64,8 +63,6 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
Expand Down Expand Up @@ -451,14 +448,15 @@ private void profileMultiEntityDetectorStateRelated(
if (profileResponse.getTotalUpdates() < requiredSamples) {
// need to double check since what ProfileResponse returns is the highest priority entity currently in memory, but
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
confirmMultiEntityDetectorInitStatus(
detector,
job.getEnabledTime().toEpochMilli(),
profileBuilder,
profilesToCollect,
profileResponse.getTotalUpdates(),
listener
);
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmDetectorRealtimeInitStatus(
detector,
enabledTime,
client,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
Expand All @@ -471,18 +469,6 @@ private void profileMultiEntityDetectorStateRelated(
}
}

private void confirmMultiEntityDetectorInitStatus(
AnomalyDetector detector,
long enabledTime,
DetectorProfile.Builder profile,
Set<DetectorProfileName> profilesToCollect,
long totalUpdates,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
SearchRequest searchLatestResult = createInittedEverRequest(detector.getDetectorId(), enabledTime, detector.getResultIndex());
client.search(searchLatestResult, onInittedEver(enabledTime, profile, profilesToCollect, detector, totalUpdates, listener));
}

private ActionListener<SearchResponse> onInittedEver(
long lastUpdateTimeMs,
DetectorProfile.Builder profileBuilder,
Expand Down Expand Up @@ -602,26 +588,4 @@ private void processInitResponse(

listener.onResponse(builder.build());
}

/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}
}
134 changes: 118 additions & 16 deletions src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
Expand All @@ -32,6 +34,7 @@
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -40,10 +43,12 @@
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;

public class ExecuteADResultResponseRecorder {
Expand All @@ -55,21 +60,30 @@ public class ExecuteADResultResponseRecorder {
private DiscoveryNodeFilterer nodeFilter;
private ThreadPool threadPool;
private Client client;
private NodeStateManager nodeStateManager;
private ADTaskCacheManager adTaskCacheManager;
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client
Client client,
NodeStateManager nodeStateManager,
ADTaskCacheManager adTaskCacheManager,
int rcfMinSamples
) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.anomalyResultHandler = anomalyResultHandler;
this.adTaskManager = adTaskManager;
this.nodeFilter = nodeFilter;
this.threadPool = threadPool;
this.client = client;
this.nodeStateManager = nodeStateManager;
this.adTaskCacheManager = adTaskCacheManager;
this.rcfMinSamples = rcfMinSamples;
}

public void indexAnomalyResult(
Expand Down Expand Up @@ -185,27 +199,66 @@ private void updateLatestRealtimeTask(
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
ActionListener<UpdateResponse> listener = ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
});

// rcfTotalUpdates is null when we save exception messages
if (!adTaskCacheManager.hasQueriedResultIndex(detectorId) && rcfTotalUpdates != null && rcfTotalUpdates < rcfMinSamples) {
// confirm the total updates number since it is possible that we have already had results after job enabling time
// If yes, total updates should be at least rcfMinSamples so that the init progress reaches 100%.
confirmTotalRCFUpdatesFound(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
})
ActionListener
.wrap(
r -> adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
r,
detectorIntervalInMinutes,
error,
listener
),
e -> {
log.error("Fail to confirm rcf update", e);
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
)
);
} else {
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
listener
);
}
}

/**
Expand Down Expand Up @@ -285,4 +338,53 @@ public void indexAnomalyResultException(
}
}

private void confirmTotalRCFUpdatesFound(
String detectorId,
String taskState,
Long rcfTotalUpdates,
Long detectorIntervalInMinutes,
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
ActionListener.completeWith(listener, () -> {
SearchHits hits = searchResponse.getHits();
Long correctedTotalUpdates = rcfTotalUpdates;
if (hits.getTotalHits().value > 0L) {
// correct the number if we have already had results after job enabling time
// so that the detector won't stay initialized
correctedTotalUpdates = Long.valueOf(rcfMinSamples);
}
adTaskCacheManager.markResultIndexQueried(detectorId);
return correctedTotalUpdates;
});
}, exception -> {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
// anomaly result index is not created yet
adTaskCacheManager.markResultIndexQueried(detectorId);
listener.onResponse(0L);
} else {
listener.onFailure(exception);
}
})
);
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get job"))));
}, e -> listener.onFailure(new AnomalyDetectionException(detectorId, "fail to get detector"))));
}
}
68 changes: 68 additions & 0 deletions src/main/java/org/opensearch/ad/ProfileUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.client.Client;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ExistsQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;

public class ProfileUtil {
/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time.
* Note this function is only meant to check for status of real time analysis.
*
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private static SearchRequest createRealtimeInittedEverRequest(String detectorId, long enabledTime, String resultIndex) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));
// Historical analysis result also stored in result index, which has non-null task_id.
// For realtime detection result, we should filter task_id == null
ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(AnomalyResult.TASK_ID_FIELD);
filterQuery.mustNot(taskIdExistsFilter);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
request.source(source);
if (resultIndex != null) {
request.indices(resultIndex);
}
return request;
}

public static void confirmDetectorRealtimeInitStatus(
AnomalyDetector detector,
long enabledTime,
Client client,
ActionListener<SearchResponse> listener
) {
SearchRequest searchLatestResult = createRealtimeInittedEverRequest(
detector.getDetectorId(),
enabledTime,
detector.getResultIndex()
);
client.search(searchLatestResult, listener);
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/constant/CommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class CommonName {

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";
// index pattern matching to all ad result indices
public static final String ANOMALY_RESULT_INDEX_ALL = ".opendistro-anomaly-results-history*";

// ======================================
// Format name
Expand Down
Loading