Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -507,16 +507,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.SearchAnomalyResultTransportAction*',
'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction*',

// TODO: hc caused coverage to drop
'org.opensearch.ad.indices.AnomalyDetectionIndices',
'org.opensearch.ad.transport.handler.MultiEntityResultHandler',
'org.opensearch.ad.util.ThrowingSupplierWrapper',
'org.opensearch.ad.transport.ProfileNodeResponse',
'org.opensearch.ad.transport.ADResultBulkResponse',
'org.opensearch.ad.transport.AggregationType',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.NodeStateManager',
'org.opensearch.ad.util.BulkUtil',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.AnomalyDetectorJobRequest',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ public void maintenance() {
*/
@Override
public void clear(String detectorId) {
if (detectorId == null) {
if (Strings.isEmpty(detectorId)) {
return;
}
CacheBuffer buffer = activeEnities.remove(detectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
// The index name pattern to query all AD result, history and current AD result
public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*";

private static final String META = "_meta";
// package private for testing
static final String META = "_meta";
private static final String SCHEMA_VERSION = "schema_version";

private ClusterService clusterService;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ public void trainModelFromExistingSamples(ModelState<EntityModel> modelState) {
try {
double[][] trainData = featureManager.batchShingle(samples.toArray(new double[0][0]), this.shingleSize);
trainModelFromDataSegments(Collections.singletonList(trainData), model.getEntity().orElse(null), modelState);
// clear samples after using
samples.clear();
} catch (Exception e) {
// e.g., exception from rcf. We can do nothing except logging the error
// We won't retry training for the same entity in the cooldown period
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.ml.ModelManager.ModelType;
import org.opensearch.ad.ml.rcf.CombinedRcfResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
Expand Down Expand Up @@ -288,7 +287,6 @@ private void getRcfResult(ModelState<RandomCutForest> modelState, double[] point
int forestSize = rcf.getNumberOfTrees();
double[] attribution = getAnomalyAttribution(rcf, point);
rcf.update(point);
long totalUpdates = rcf.getTotalUpdates();
listener.onResponse(new RcfResult(score, confidence, forestSize, attribution, rcf.getTotalUpdates()));
}

Expand Down Expand Up @@ -797,7 +795,6 @@ public void getTotalUpdates(String modelId, String detectorId, ActionListener<Lo
* @param datapoint Data point
* @param modelState the state associated with the entity
* @param modelId the model Id
* @param detector Detector accessor
* @param entity entity accessor
*
* @return anomaly result, confidence, and the corresponding RCF score.
Expand All @@ -806,7 +803,6 @@ public ThresholdingResult getAnomalyResultForEntity(
double[] datapoint,
ModelState<EntityModel> modelState,
String modelId,
AnomalyDetector detector,
Entity entity
) {
if (modelState != null) {
Expand All @@ -816,12 +812,10 @@ public ThresholdingResult getAnomalyResultForEntity(
entityModel = new EntityModel(entity, new ArrayDeque<>(), null, null);
modelState.setModel(entityModel);
}

// trainModelFromExistingSamples may be able to make models not null
if (entityModel.getRcf() == null || entityModel.getThreshold() == null) {
entityColdStarter.trainModelFromExistingSamples(modelState);
}

if (entityModel.getRcf() != null && entityModel.getThreshold() != null) {
return score(datapoint, modelId, modelState);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ private AnomalyDetectorSettings() {}

// max concurrent preview to limit resource usage
public static final Setting<Integer> MAX_CONCURRENT_PREVIEW = Setting
.intSetting("plugins.anomaly_detection.max_concurrent_preview", 5, 1, 20, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("plugins.anomaly_detection.max_concurrent_preview", 2, 1, 20, Setting.Property.NodeScope, Setting.Property.Dynamic);

// preview timeout in terms of milliseconds
public static final long PREVIEW_TIMEOUT_IN_MILLIS = 60_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ private ActionListener<Optional<AnomalyDetector>> onGetDetector(
cacheMissEntities.put(categoricalValues, datapoint);
continue;
}
ThresholdingResult result = modelManager
.getAnomalyResultForEntity(datapoint, entityModel, modelId, detector, categoricalValues);
ThresholdingResult result = modelManager.getAnomalyResultForEntity(datapoint, entityModel, modelId, categoricalValues);
// result.getRcfScore() = 0 means the model is not initialized
// result.getGrade() = 0 means it is not an anomaly
// So many OpenSearchRejectedExecutionException if we write no matter what
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
*/
public class MultiEntityResultHandler extends AnomalyIndexHandler<AnomalyResult> {
private static final Logger LOG = LogManager.getLogger(MultiEntityResultHandler.class);
private static final String SUCCESS_SAVING_RESULT_MSG = "Result saved successfully.";
private static final String CANNOT_SAVE_RESULT_ERR_MSG = "Cannot save results due to write block.";
// package private for testing
static final String SUCCESS_SAVING_RESULT_MSG = "Result saved successfully.";
static final String CANNOT_SAVE_RESULT_ERR_MSG = "Cannot save results due to write block.";

@Inject
public MultiEntityResultHandler(
Expand Down
24 changes: 0 additions & 24 deletions src/main/java/org/opensearch/ad/util/BulkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,6 @@
public class BulkUtil {
private static final Logger logger = LogManager.getLogger(BulkUtil.class);

public static List<IndexRequest> getIndexRequestToRetry(BulkRequest bulkRequest, BulkResponse bulkResponse) {
List<IndexRequest> res = new ArrayList<>();

Set<String> failedId = new HashSet<>();
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed() && ExceptionUtil.isRetryAble(response.getFailure().getStatus())) {
failedId.add(response.getId());
}
}

for (DocWriteRequest<?> request : bulkRequest.requests()) {
try {
if (failedId.contains(request.id())) {
res.add(cloneIndexRequest((IndexRequest) request));
}
} catch (ClassCastException e) {
logger.error("We only support IndexRequest", e);
throw e;
}

}
return res;
}

public static List<IndexRequest> getFailedIndexRequest(BulkRequest bulkRequest, BulkResponse bulkResponse) {
List<IndexRequest> res = new ArrayList<>();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/util/MathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class MathUtil {
/*
* Private constructor to avoid Jacoco complain about public constructor
* Private constructor to avoid Jacoco complaining about public constructor
* not covered: https://tinyurl.com/yetc7tra
*/
private MathUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
import java.util.function.Supplier;

public class ThrowingSupplierWrapper {
/*
* Private constructor to avoid Jacoco complaining about public constructor
* not covered: https://tinyurl.com/yetc7tra
*/
private ThrowingSupplierWrapper() {}

/**
* Utility method to use a method throwing checked exception inside a place
* that does not allow throwing the corresponding checked exception (e.g.,
Expand Down
150 changes: 150 additions & 0 deletions src/test/java/org/opensearch/AnomalyResultResponse1_0.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.opensearch.action.ActionResponse;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

public class AnomalyResultResponse1_0 extends ActionResponse implements ToXContentObject {
public static final String ANOMALY_GRADE_JSON_KEY = "anomalyGrade";
public static final String CONFIDENCE_JSON_KEY = "confidence";
public static final String ANOMALY_SCORE_JSON_KEY = "anomalyScore";
public static final String ERROR_JSON_KEY = "error";
public static final String FEATURES_JSON_KEY = "features";
public static final String FEATURE_VALUE_JSON_KEY = "value";

private double anomalyGrade;
private double confidence;
private double anomalyScore;
private String error;
private List<FeatureData> features;

public AnomalyResultResponse1_0(double anomalyGrade, double confidence, double anomalyScore, List<FeatureData> features) {
this(anomalyGrade, confidence, anomalyScore, features, null);
}

public AnomalyResultResponse1_0(double anomalyGrade, double confidence, double anomalyScore, List<FeatureData> features, String error) {
this.anomalyGrade = anomalyGrade;
this.confidence = confidence;
this.anomalyScore = anomalyScore;
this.features = features;
this.error = error;
}

public AnomalyResultResponse1_0(StreamInput in) throws IOException {
super(in);
anomalyGrade = in.readDouble();
confidence = in.readDouble();
anomalyScore = in.readDouble();
int size = in.readVInt();
features = new ArrayList<FeatureData>();
for (int i = 0; i < size; i++) {
features.add(new FeatureData(in));
}
error = in.readOptionalString();
}

public double getAnomalyGrade() {
return anomalyGrade;
}

public List<FeatureData> getFeatures() {
return features;
}

public double getConfidence() {
return confidence;
}

public double getAnomalyScore() {
return anomalyScore;
}

public String getError() {
return error;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(anomalyGrade);
out.writeDouble(confidence);
out.writeDouble(anomalyScore);
out.writeVInt(features.size());
for (FeatureData feature : features) {
feature.writeTo(out);
}
if (error != null) {
out.writeBoolean(true);
out.writeString(error);
} else {
out.writeBoolean(false);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ANOMALY_GRADE_JSON_KEY, anomalyGrade);
builder.field(CONFIDENCE_JSON_KEY, confidence);
builder.field(ANOMALY_SCORE_JSON_KEY, anomalyScore);
builder.field(ERROR_JSON_KEY, error);
builder.startArray(FEATURES_JSON_KEY);
for (FeatureData feature : features) {
feature.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

public static AnomalyResultResponse1_0 fromActionResponse(final ActionResponse actionResponse) {
if (actionResponse instanceof AnomalyResultResponse1_0) {
return (AnomalyResultResponse1_0) actionResponse;
}

try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionResponse.writeTo(osso);
try (InputStreamStreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new AnomalyResultResponse1_0(input);
}
} catch (IOException e) {
throw new IllegalArgumentException("failed to parse ActionResponse into AnomalyResultResponse", e);
}
}
}
Loading