diff --git a/build.gradle b/build.gradle index f716dcdc3..4a16d9a61 100644 --- a/build.gradle +++ b/build.gradle @@ -507,16 +507,6 @@ List jacocoExclusions = [ 'org.opensearch.ad.transport.SearchAnomalyResultTransportAction*', 'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction*', - // TODO: hc caused coverage to drop - 'org.opensearch.ad.indices.AnomalyDetectionIndices', - 'org.opensearch.ad.transport.handler.MultiEntityResultHandler', - 'org.opensearch.ad.util.ThrowingSupplierWrapper', - 'org.opensearch.ad.transport.ProfileNodeResponse', - 'org.opensearch.ad.transport.ADResultBulkResponse', - 'org.opensearch.ad.transport.AggregationType', - 'org.opensearch.ad.EntityProfileRunner', - 'org.opensearch.ad.NodeStateManager', - 'org.opensearch.ad.util.BulkUtil', // TODO: unified flow caused coverage drop 'org.opensearch.ad.transport.AnomalyDetectorJobRequest', diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 83a238a5d..09eac4a7f 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -649,7 +649,7 @@ public void maintenance() { */ @Override public void clear(String detectorId) { - if (detectorId == null) { + if (Strings.isEmpty(detectorId)) { return; } CacheBuffer buffer = activeEnities.remove(detectorId); diff --git a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java index 4ccf02db6..a7d6c3dd1 100644 --- a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java @@ -103,7 +103,8 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener { // The index name pattern to query all AD result, history and current AD result public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*"; - private static final String META = "_meta"; + // package private for testing + static final String META = "_meta"; private static final String SCHEMA_VERSION = "schema_version"; private ClusterService clusterService; diff --git a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java index c9556b02f..287cbc8b9 100644 --- a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java +++ b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java @@ -600,6 +600,8 @@ public void trainModelFromExistingSamples(ModelState modelState) { try { double[][] trainData = featureManager.batchShingle(samples.toArray(new double[0][0]), this.shingleSize); trainModelFromDataSegments(Collections.singletonList(trainData), model.getEntity().orElse(null), modelState); + // clear samples after using + samples.clear(); } catch (Exception e) { // e.g., exception from rcf. We can do nothing except logging the error // We won't retry training for the same entity in the cooldown period diff --git a/src/main/java/org/opensearch/ad/ml/ModelManager.java b/src/main/java/org/opensearch/ad/ml/ModelManager.java index 26920150e..aac5c8404 100644 --- a/src/main/java/org/opensearch/ad/ml/ModelManager.java +++ b/src/main/java/org/opensearch/ad/ml/ModelManager.java @@ -56,7 +56,6 @@ import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.constant.CommonErrorMessages; import org.opensearch.ad.feature.FeatureManager; -import org.opensearch.ad.ml.ModelManager.ModelType; import org.opensearch.ad.ml.rcf.CombinedRcfResult; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; @@ -288,7 +287,6 @@ private void getRcfResult(ModelState modelState, double[] point int forestSize = rcf.getNumberOfTrees(); double[] attribution = getAnomalyAttribution(rcf, point); rcf.update(point); - long totalUpdates = rcf.getTotalUpdates(); listener.onResponse(new RcfResult(score, confidence, forestSize, attribution, rcf.getTotalUpdates())); } @@ -797,7 +795,6 @@ public void getTotalUpdates(String modelId, String detectorId, ActionListener modelState, String modelId, - AnomalyDetector detector, Entity entity ) { if (modelState != null) { @@ -816,12 +812,10 @@ public ThresholdingResult getAnomalyResultForEntity( entityModel = new EntityModel(entity, new ArrayDeque<>(), null, null); modelState.setModel(entityModel); } - // trainModelFromExistingSamples may be able to make models not null if (entityModel.getRcf() == null || entityModel.getThreshold() == null) { entityColdStarter.trainModelFromExistingSamples(modelState); } - if (entityModel.getRcf() != null && entityModel.getThreshold() != null) { return score(datapoint, modelId, modelState); } else { diff --git a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java index 241c2f88e..c0d3ed8e4 100644 --- a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java @@ -757,7 +757,7 @@ private AnomalyDetectorSettings() {} // max concurrent preview to limit resource usage public static final Setting MAX_CONCURRENT_PREVIEW = Setting - .intSetting("plugins.anomaly_detection.max_concurrent_preview", 5, 1, 20, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("plugins.anomaly_detection.max_concurrent_preview", 2, 1, 20, Setting.Property.NodeScope, Setting.Property.Dynamic); // preview timeout in terms of milliseconds public static final long PREVIEW_TIMEOUT_IN_MILLIS = 60_000; diff --git a/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java index 67e815773..33229526b 100644 --- a/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java @@ -215,8 +215,7 @@ private ActionListener> onGetDetector( cacheMissEntities.put(categoricalValues, datapoint); continue; } - ThresholdingResult result = modelManager - .getAnomalyResultForEntity(datapoint, entityModel, modelId, detector, categoricalValues); + ThresholdingResult result = modelManager.getAnomalyResultForEntity(datapoint, entityModel, modelId, categoricalValues); // result.getRcfScore() = 0 means the model is not initialized // result.getGrade() = 0 means it is not an anomaly // So many OpenSearchRejectedExecutionException if we write no matter what diff --git a/src/main/java/org/opensearch/ad/transport/handler/MultiEntityResultHandler.java b/src/main/java/org/opensearch/ad/transport/handler/MultiEntityResultHandler.java index 7f7981ba8..3d514bcb4 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/MultiEntityResultHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/MultiEntityResultHandler.java @@ -59,8 +59,9 @@ */ public class MultiEntityResultHandler extends AnomalyIndexHandler { private static final Logger LOG = LogManager.getLogger(MultiEntityResultHandler.class); - private static final String SUCCESS_SAVING_RESULT_MSG = "Result saved successfully."; - private static final String CANNOT_SAVE_RESULT_ERR_MSG = "Cannot save results due to write block."; + // package private for testing + static final String SUCCESS_SAVING_RESULT_MSG = "Result saved successfully."; + static final String CANNOT_SAVE_RESULT_ERR_MSG = "Cannot save results due to write block."; @Inject public MultiEntityResultHandler( diff --git a/src/main/java/org/opensearch/ad/util/BulkUtil.java b/src/main/java/org/opensearch/ad/util/BulkUtil.java index 167243250..f2cc6ed68 100644 --- a/src/main/java/org/opensearch/ad/util/BulkUtil.java +++ b/src/main/java/org/opensearch/ad/util/BulkUtil.java @@ -42,30 +42,6 @@ public class BulkUtil { private static final Logger logger = LogManager.getLogger(BulkUtil.class); - public static List getIndexRequestToRetry(BulkRequest bulkRequest, BulkResponse bulkResponse) { - List res = new ArrayList<>(); - - Set failedId = new HashSet<>(); - for (BulkItemResponse response : bulkResponse.getItems()) { - if (response.isFailed() && ExceptionUtil.isRetryAble(response.getFailure().getStatus())) { - failedId.add(response.getId()); - } - } - - for (DocWriteRequest request : bulkRequest.requests()) { - try { - if (failedId.contains(request.id())) { - res.add(cloneIndexRequest((IndexRequest) request)); - } - } catch (ClassCastException e) { - logger.error("We only support IndexRequest", e); - throw e; - } - - } - return res; - } - public static List getFailedIndexRequest(BulkRequest bulkRequest, BulkResponse bulkResponse) { List res = new ArrayList<>(); diff --git a/src/main/java/org/opensearch/ad/util/MathUtil.java b/src/main/java/org/opensearch/ad/util/MathUtil.java index 527ea34b9..893047030 100644 --- a/src/main/java/org/opensearch/ad/util/MathUtil.java +++ b/src/main/java/org/opensearch/ad/util/MathUtil.java @@ -13,7 +13,7 @@ public class MathUtil { /* - * Private constructor to avoid Jacoco complain about public constructor + * Private constructor to avoid Jacoco complaining about public constructor * not covered: https://tinyurl.com/yetc7tra */ private MathUtil() {} diff --git a/src/main/java/org/opensearch/ad/util/ThrowingSupplierWrapper.java b/src/main/java/org/opensearch/ad/util/ThrowingSupplierWrapper.java index 703f86a66..2c8b38e48 100644 --- a/src/main/java/org/opensearch/ad/util/ThrowingSupplierWrapper.java +++ b/src/main/java/org/opensearch/ad/util/ThrowingSupplierWrapper.java @@ -29,6 +29,12 @@ import java.util.function.Supplier; public class ThrowingSupplierWrapper { + /* + * Private constructor to avoid Jacoco complaining about public constructor + * not covered: https://tinyurl.com/yetc7tra + */ + private ThrowingSupplierWrapper() {} + /** * Utility method to use a method throwing checked exception inside a place * that does not allow throwing the corresponding checked exception (e.g., diff --git a/src/test/java/org/opensearch/AnomalyResultResponse1_0.java b/src/test/java/org/opensearch/AnomalyResultResponse1_0.java new file mode 100644 index 000000000..8c4ba910a --- /dev/null +++ b/src/test/java/org/opensearch/AnomalyResultResponse1_0.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.opensearch.action.ActionResponse; +import org.opensearch.ad.model.FeatureData; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +public class AnomalyResultResponse1_0 extends ActionResponse implements ToXContentObject { + public static final String ANOMALY_GRADE_JSON_KEY = "anomalyGrade"; + public static final String CONFIDENCE_JSON_KEY = "confidence"; + public static final String ANOMALY_SCORE_JSON_KEY = "anomalyScore"; + public static final String ERROR_JSON_KEY = "error"; + public static final String FEATURES_JSON_KEY = "features"; + public static final String FEATURE_VALUE_JSON_KEY = "value"; + + private double anomalyGrade; + private double confidence; + private double anomalyScore; + private String error; + private List features; + + public AnomalyResultResponse1_0(double anomalyGrade, double confidence, double anomalyScore, List features) { + this(anomalyGrade, confidence, anomalyScore, features, null); + } + + public AnomalyResultResponse1_0(double anomalyGrade, double confidence, double anomalyScore, List features, String error) { + this.anomalyGrade = anomalyGrade; + this.confidence = confidence; + this.anomalyScore = anomalyScore; + this.features = features; + this.error = error; + } + + public AnomalyResultResponse1_0(StreamInput in) throws IOException { + super(in); + anomalyGrade = in.readDouble(); + confidence = in.readDouble(); + anomalyScore = in.readDouble(); + int size = in.readVInt(); + features = new ArrayList(); + for (int i = 0; i < size; i++) { + features.add(new FeatureData(in)); + } + error = in.readOptionalString(); + } + + public double getAnomalyGrade() { + return anomalyGrade; + } + + public List getFeatures() { + return features; + } + + public double getConfidence() { + return confidence; + } + + public double getAnomalyScore() { + return anomalyScore; + } + + public String getError() { + return error; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(anomalyGrade); + out.writeDouble(confidence); + out.writeDouble(anomalyScore); + out.writeVInt(features.size()); + for (FeatureData feature : features) { + feature.writeTo(out); + } + if (error != null) { + out.writeBoolean(true); + out.writeString(error); + } else { + out.writeBoolean(false); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ANOMALY_GRADE_JSON_KEY, anomalyGrade); + builder.field(CONFIDENCE_JSON_KEY, confidence); + builder.field(ANOMALY_SCORE_JSON_KEY, anomalyScore); + builder.field(ERROR_JSON_KEY, error); + builder.startArray(FEATURES_JSON_KEY); + for (FeatureData feature : features) { + feature.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + public static AnomalyResultResponse1_0 fromActionResponse(final ActionResponse actionResponse) { + if (actionResponse instanceof AnomalyResultResponse1_0) { + return (AnomalyResultResponse1_0) actionResponse; + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { + actionResponse.writeTo(osso); + try (InputStreamStreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { + return new AnomalyResultResponse1_0(input); + } + } catch (IOException e) { + throw new IllegalArgumentException("failed to parse ActionResponse into AnomalyResultResponse", e); + } + } +} diff --git a/src/test/java/org/opensearch/BwcTests.java b/src/test/java/org/opensearch/BwcTests.java index 0e3879c5a..6b5be9fdd 100644 --- a/src/test/java/org/opensearch/BwcTests.java +++ b/src/test/java/org/opensearch/BwcTests.java @@ -16,6 +16,9 @@ import static org.hamcrest.Matchers.equalTo; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,8 +35,12 @@ import org.opensearch.ad.constant.CommonName; import org.opensearch.ad.model.Entity; import org.opensearch.ad.model.EntityProfileName; +import org.opensearch.ad.model.FeatureData; import org.opensearch.ad.model.ModelProfile; import org.opensearch.ad.model.ModelProfileOnNode; +import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.ad.transport.CronNodeRequest; +import org.opensearch.ad.transport.CronRequest; import org.opensearch.ad.transport.EntityProfileAction; import org.opensearch.ad.transport.EntityProfileRequest; import org.opensearch.ad.transport.EntityProfileResponse; @@ -80,6 +87,13 @@ public class BwcTests extends AbstractADTest { private ModelProfile1_0[] convertedModelProfile; private RCFResultResponse rcfResultResponse1_1; private RCFResultResponse1_0 rcfResultResponse1_0; + private CronRequest cronRequest1_1; + private CronRequest1_0 cronRequest1_0; + private DiscoveryNode node; + private CronNodeRequest cronNodeRequest1_1; + private CronNodeRequest1_0 cronNodeRequest1_0; + private AnomalyResultResponse adResultResponse1_1; + private AnomalyResultResponse1_0 adResultResponse1_0; private boolean areEqualWithArrayValue(Map first, Map second) { if (first.size() != second.size()) { @@ -534,4 +548,153 @@ public void testSerializeRCFResultResponse1_0() throws IOException { assertThat(readResponse.getForestSize(), equalTo(rcfResultResponse1_0.getForestSize())); assertThat(readResponse.getRCFScore(), equalTo(rcfResultResponse1_0.getRCFScore())); } + + private void setUpCronRequest() throws UnknownHostException { + node = new DiscoveryNode( + nodeId, + new TransportAddress(new InetSocketAddress(InetAddress.getByName("1.2.3.4"), 9300)), + Version.V_1_0_0.minimumCompatibilityVersion() + ); + cronRequest1_1 = new CronRequest("foo", node); + cronRequest1_0 = new CronRequest1_0(node); + } + + public void testDeserializeCronRequest1_1() throws IOException { + setUpCronRequest(); + + cronRequest1_1.writeTo(output1_1); + + StreamInput streamInput = output1_1.bytes().streamInput(); + streamInput.setVersion(V_1_1_0); + CronRequest readRequest = new CronRequest(streamInput); + assertEquals(readRequest.getRequestId(), cronRequest1_1.getRequestId()); + assertEquals(1, readRequest.concreteNodes().length); + assertEquals(node, readRequest.concreteNodes()[0]); + } + + public void testDeserializeCronRequest1_0() throws IOException { + setUpCronRequest(); + + cronRequest1_0.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + CronRequest readRequest = new CronRequest(streamInput); + assertEquals(1, readRequest.concreteNodes().length); + assertEquals(node, readRequest.concreteNodes()[0]); + } + + public void testSerializeCronRequest1_0() throws IOException { + setUpCronRequest(); + + cronRequest1_1.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + CronRequest1_0 readRequest = new CronRequest1_0(streamInput); + assertEquals(1, readRequest.concreteNodes().length); + assertEquals(node, readRequest.concreteNodes()[0]); + } + + private void setUpCronNodeRequest() { + cronNodeRequest1_1 = new CronNodeRequest("blah"); + cronNodeRequest1_0 = new CronNodeRequest1_0(); + } + + public void testDeserializeCronNodeRequest1_1() throws IOException { + setUpCronNodeRequest(); + + cronNodeRequest1_1.writeTo(output1_1); + + StreamInput streamInput = output1_1.bytes().streamInput(); + streamInput.setVersion(V_1_1_0); + CronNodeRequest readRequest = new CronNodeRequest(streamInput); + assertEquals(readRequest.getRequestId(), cronNodeRequest1_1.getRequestId()); + } + + public void testDeserializeCronNodeRequest1_0() throws IOException { + setUpCronNodeRequest(); + + cronNodeRequest1_0.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + CronNodeRequest readRequest = new CronNodeRequest(streamInput); + assertTrue(readRequest != null); + } + + public void testSerializeCronNodeRequest1_0() throws IOException { + setUpCronNodeRequest(); + + cronNodeRequest1_1.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + CronNodeRequest1_0 readRequest = new CronNodeRequest1_0(streamInput); + assertTrue(readRequest != null); + } + + private void setUpAnomalyResultResponse() { + adResultResponse1_1 = new AnomalyResultResponse( + 0.5, + 0.993, + 1.01, + Collections.singletonList(new FeatureData("id", "name", 0d)), + null, + 15L, + 10L + ); + adResultResponse1_0 = new AnomalyResultResponse1_0(0.5, 0.993, 1.01, Collections.singletonList(new FeatureData("id", "name", 0d))); + } + + public void testDeserializeAnomalyResultResponse1_1() throws IOException { + setUpAnomalyResultResponse(); + + adResultResponse1_1.writeTo(output1_1); + + StreamInput streamInput = output1_1.bytes().streamInput(); + streamInput.setVersion(V_1_1_0); + AnomalyResultResponse readResponse = new AnomalyResultResponse(streamInput); + assertEquals(readResponse.getAnomalyGrade(), adResultResponse1_1.getAnomalyGrade(), 0.001); + assertEquals(readResponse.getConfidence(), adResultResponse1_1.getConfidence(), 0.001); + assertEquals(readResponse.getAnomalyScore(), adResultResponse1_1.getAnomalyScore(), 0.001); + assertEquals(1, readResponse.getFeatures().size()); + assertEquals(readResponse.getFeatures().get(0), adResultResponse1_1.getFeatures().get(0)); + assertEquals(readResponse.getError(), adResultResponse1_1.getError()); + assertEquals(readResponse.getRcfTotalUpdates(), adResultResponse1_1.getRcfTotalUpdates(), 0.001); + assertEquals(readResponse.getDetectorIntervalInMinutes(), adResultResponse1_1.getDetectorIntervalInMinutes()); + assertEquals(readResponse.isHCDetector(), adResultResponse1_1.isHCDetector()); + } + + public void testDeserializeAnomalyResultResponse1_0() throws IOException { + setUpAnomalyResultResponse(); + + adResultResponse1_0.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + AnomalyResultResponse readResponse = new AnomalyResultResponse(streamInput); + assertEquals(readResponse.getAnomalyGrade(), adResultResponse1_1.getAnomalyGrade(), 0.001); + assertEquals(readResponse.getConfidence(), adResultResponse1_1.getConfidence(), 0.001); + assertEquals(readResponse.getAnomalyScore(), adResultResponse1_1.getAnomalyScore(), 0.001); + assertEquals(1, readResponse.getFeatures().size()); + assertEquals(readResponse.getFeatures().get(0), adResultResponse1_1.getFeatures().get(0)); + assertEquals(readResponse.getError(), adResultResponse1_1.getError()); + } + + public void testSerializeAnomalyResultResponse1_0() throws IOException { + setUpAnomalyResultResponse(); + + adResultResponse1_1.writeTo(output1_0); + + StreamInput streamInput = output1_0.bytes().streamInput(); + streamInput.setVersion(Version.V_1_0_0); + AnomalyResultResponse1_0 readResponse = new AnomalyResultResponse1_0(streamInput); + assertEquals(readResponse.getAnomalyGrade(), adResultResponse1_1.getAnomalyGrade(), 0.001); + assertEquals(readResponse.getConfidence(), adResultResponse1_1.getConfidence(), 0.001); + assertEquals(readResponse.getAnomalyScore(), adResultResponse1_1.getAnomalyScore(), 0.001); + assertEquals(1, readResponse.getFeatures().size()); + assertEquals(readResponse.getFeatures().get(0), adResultResponse1_1.getFeatures().get(0)); + assertEquals(readResponse.getError(), adResultResponse1_1.getError()); + } } diff --git a/src/test/java/org/opensearch/CronNodeRequest1_0.java b/src/test/java/org/opensearch/CronNodeRequest1_0.java new file mode 100644 index 000000000..c75e1a3ba --- /dev/null +++ b/src/test/java/org/opensearch/CronNodeRequest1_0.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch; + +import java.io.IOException; + +import org.opensearch.action.support.nodes.BaseNodeRequest; +import org.opensearch.common.io.stream.StreamInput; + +/** + * Delete model represents the request to an individual node + */ +public class CronNodeRequest1_0 extends BaseNodeRequest { + + public CronNodeRequest1_0() {} + + public CronNodeRequest1_0(StreamInput in) throws IOException { + super(in); + } + +} diff --git a/src/test/java/org/opensearch/CronRequest1_0.java b/src/test/java/org/opensearch/CronRequest1_0.java new file mode 100644 index 000000000..83a7f7025 --- /dev/null +++ b/src/test/java/org/opensearch/CronRequest1_0.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch; + +import java.io.IOException; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; + +/** + * Request should be sent from the handler logic of transport delete detector API + * + */ +public class CronRequest1_0 extends BaseNodesRequest { + + public CronRequest1_0() { + super((String[]) null); + } + + public CronRequest1_0(StreamInput in) throws IOException { + super(in); + } + + public CronRequest1_0(DiscoveryNode... nodes) { + super(nodes); + } +} diff --git a/src/test/java/org/opensearch/ad/AbstractADTest.java b/src/test/java/org/opensearch/ad/AbstractADTest.java index 8946f0093..0155f9f6f 100644 --- a/src/test/java/org/opensearch/ad/AbstractADTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADTest.java @@ -62,6 +62,7 @@ import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -177,10 +178,10 @@ private String convertToRegex(String formattedStr) { protected void setUpLog4jForJUnit(Class cls) { String loggerName = toLoggerName(callerClass(cls)); logger = (Logger) LogManager.getLogger(loggerName); + Loggers.setLevel(logger, Level.DEBUG); testAppender = new TestAppender(loggerName); testAppender.start(); logger.addAppender(testAppender); - logger.setLevel(Level.DEBUG); } private static String toLoggerName(final Class cls) { diff --git a/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java b/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java index ff463c48e..7763c9ae7 100644 --- a/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java +++ b/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java @@ -33,6 +33,8 @@ import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import java.io.IOException; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -40,6 +42,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.lucene.search.TotalHits; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -52,18 +55,24 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.Entity; +import org.opensearch.ad.model.EntityProfile; import org.opensearch.ad.model.EntityProfileName; import org.opensearch.ad.model.EntityState; +import org.opensearch.ad.model.InitProgressProfile; +import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.model.ModelProfile; import org.opensearch.ad.model.ModelProfileOnNode; import org.opensearch.ad.transport.EntityProfileAction; import org.opensearch.ad.transport.EntityProfileResponse; import org.opensearch.client.Client; +import org.opensearch.common.text.Text; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.internal.InternalSearchResponse; public class EntityProfileRunnerTests extends AbstractADTest { private AnomalyDetector detector; @@ -205,6 +214,57 @@ private void setUpExecuteEntityProfileAction(InittedEverResultStatus initted) { return null; }).when(client).execute(any(EntityProfileAction.class), any(), any()); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + SearchRequest request = (SearchRequest) args[0]; + String indexName = request.indices()[0]; + ActionListener listener = (ActionListener) args[1]; + SearchResponse searchResponse = null; + if (indexName.equals(CommonName.ANOMALY_RESULT_INDEX_ALIAS)) { + InternalMax maxAgg = new InternalMax(CommonName.AGG_NAME_MAX_TIME, latestSampleTimestamp, DocValueFormat.RAW, emptyMap()); + InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(maxAgg)); + + SearchHits hits = new SearchHits(new SearchHit[] {}, null, Float.NaN); + SearchResponseSections searchSections = new SearchResponseSections(hits, internalAggregations, null, false, false, null, 1); + + searchResponse = new SearchResponse( + searchSections, + null, + 1, + 1, + 0, + 30, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } else { + SearchHits collapsedHits = new SearchHits( + new SearchHit[] { + new SearchHit(2, "ID", new Text("type"), Collections.emptyMap(), Collections.emptyMap()), + new SearchHit(3, "ID", new Text("type"), Collections.emptyMap(), Collections.emptyMap()) }, + new TotalHits(1, TotalHits.Relation.EQUAL_TO), + 1.0F + ); + + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits, null, null, null, false, null, 1); + searchResponse = new SearchResponse( + internalSearchResponse, + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } + + listener.onResponse(searchResponse); + + return null; + + }).when(client).search(any(), any()); } public void stateTestTemplate(InittedEverResultStatus returnedState, EntityState expectedState) throws InterruptedException { @@ -222,6 +282,18 @@ public void stateTestTemplate(InittedEverResultStatus returnedState, EntityState assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } + public void testRunningState() throws InterruptedException { + stateTestTemplate(InittedEverResultStatus.INITTED, EntityState.RUNNING); + } + + public void testUnknownState() throws InterruptedException { + stateTestTemplate(InittedEverResultStatus.UNKNOWN, EntityState.UNKNOWN); + } + + public void testInitState() throws InterruptedException { + stateTestTemplate(InittedEverResultStatus.NOT_INITTED, EntityState.INIT); + } + public void testEmptyProfile() throws InterruptedException { final CountDownLatch inProgressLatch = new CountDownLatch(1); @@ -234,4 +306,116 @@ public void testEmptyProfile() throws InterruptedException { })); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } + + public void testModel() throws InterruptedException { + setUpExecuteEntityProfileAction(InittedEverResultStatus.INITTED); + EntityProfile.Builder expectedProfile = new EntityProfile.Builder(); + ModelProfileOnNode modelProfile = new ModelProfileOnNode(nodeId, new ModelProfile(modelId, entity, modelSize)); + expectedProfile.modelProfile(modelProfile); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + runner.profile(detectorId, entity, model, ActionListener.wrap(response -> { + assertEquals(expectedProfile.build(), response); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Should not reach here", false); + inProgressLatch.countDown(); + })); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + public void testJobIndexNotFound() throws InterruptedException { + setUpExecuteEntityProfileAction(InittedEverResultStatus.INITTED); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + GetRequest request = (GetRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + + String indexName = request.index(); + if (indexName.equals(ANOMALY_DETECTORS_INDEX)) { + listener + .onResponse(TestHelpers.createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX)); + } else if (indexName.equals(ANOMALY_DETECTOR_JOB_INDEX)) { + listener.onFailure(new IndexNotFoundException(ANOMALY_DETECTOR_JOB_INDEX)); + } + + return null; + }).when(client).get(any(), any()); + + EntityProfile expectedProfile = new EntityProfile.Builder().build(); + + runner.profile(detectorId, entity, initNInfo, ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + LOG.error("Unexpected error", exception); + assertTrue("Should not reach here", false); + inProgressLatch.countDown(); + })); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + public void testNotMultiEntityDetector() throws IOException, InterruptedException { + detector = TestHelpers.randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(detectorIntervalMin, ChronoUnit.MINUTES)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + GetRequest request = (GetRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + + String indexName = request.index(); + if (indexName.equals(ANOMALY_DETECTORS_INDEX)) { + listener + .onResponse(TestHelpers.createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX)); + } + + return null; + }).when(client).get(any(), any()); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detectorId, entity, state, ActionListener.wrap(response -> { + assertTrue("Should not reach here", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception.getMessage().contains(EntityProfileRunner.NOT_HC_DETECTOR_ERR_MSG)); + inProgressLatch.countDown(); + })); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testInitNInfo() throws InterruptedException { + setUpExecuteEntityProfileAction(InittedEverResultStatus.NOT_INITTED); + latestSampleTimestamp = 1_603_989_830_158L; + + EntityProfile.Builder expectedProfile = new EntityProfile.Builder(); + + // 1 / 128 rounded to 1% + int neededSamples = requiredSamples - smallUpdates; + InitProgressProfile profile = new InitProgressProfile( + "1%", + neededSamples * detector.getDetectorIntervalInSeconds() / 60, + neededSamples + ); + expectedProfile.initProgress(profile); + expectedProfile.isActive(isActive); + expectedProfile.lastActiveTimestampMs(latestActiveTimestamp); + expectedProfile.lastSampleTimestampMs(latestSampleTimestamp); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detectorId, entity, initNInfo, ActionListener.wrap(response -> { + assertEquals(expectedProfile.build(), response); + inProgressLatch.countDown(); + }, exception -> { + LOG.error("Unexpected error", exception); + assertTrue("Should not reach here", false); + inProgressLatch.countDown(); + })); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } } diff --git a/src/test/java/org/opensearch/ad/NodeStateManagerTests.java b/src/test/java/org/opensearch/ad/NodeStateManagerTests.java index b7a745c6b..99fd2ecc5 100644 --- a/src/test/java/org/opensearch/ad/NodeStateManagerTests.java +++ b/src/test/java/org/opensearch/ad/NodeStateManagerTests.java @@ -60,6 +60,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.ad.ml.ModelPartitioner; import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.transport.AnomalyResultTests; import org.opensearch.ad.util.ClientUtil; import org.opensearch.ad.util.Throttler; @@ -91,9 +92,11 @@ public class NodeStateManagerTests extends AbstractADTest { private AnomalyDetector detectorToCheck; private Settings settings; private String adId = "123"; + private String nodeId = "123"; private GetResponse checkpointResponse; private ClusterService clusterService; + private ClusterSettings clusterSettings; @Override protected NamedXContentRegistry xContentRegistry() { @@ -132,7 +135,7 @@ public void setUp() throws Exception { Set> nodestateSetting = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); nodestateSetting.add(MAX_RETRY_FOR_UNRESPONSIVE_NODE); nodestateSetting.add(BACKOFF_MINUTES); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, nodestateSetting); + clusterSettings = new ClusterSettings(Settings.EMPTY, nodestateSetting); DiscoveryNode discoveryNode = new DiscoveryNode( "node1", @@ -241,7 +244,6 @@ public void testGetLastError() throws IOException, InterruptedException { } public void testShouldMute() { - String nodeId = "123"; assertTrue(!stateManager.isMuted(nodeId, adId)); when(clock.millis()).thenReturn(10000L); @@ -387,4 +389,40 @@ public void testColdStartRunning() { stateManager.markColdStartRunning(adId); assertTrue(stateManager.isColdStartRunning(adId)); } + + public void testSettingUpdateMaxRetry() { + when(clock.millis()).thenReturn(System.currentTimeMillis()); + stateManager.addPressure(nodeId, adId); + // In setUp method, we mute after 3 tries + assertTrue(!stateManager.isMuted(nodeId, adId)); + + Settings newSettings = Settings.builder().put(AnomalyDetectorSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE.getKey(), "1").build(); + Settings.Builder target = Settings.builder(); + clusterSettings.updateDynamicSettings(newSettings, target, Settings.builder(), "test"); + clusterSettings.applySettings(target.build()); + stateManager.addPressure(nodeId, adId); + // since we have one violation and the max is 1, this is flagged as muted + assertTrue(stateManager.isMuted(nodeId, adId)); + } + + public void testSettingUpdateBackOffMin() { + when(clock.millis()).thenReturn(1000L); + // In setUp method, we mute after 3 tries + for (int i = 0; i < 4; i++) { + stateManager.addPressure(nodeId, adId); + } + + assertTrue(stateManager.isMuted(nodeId, adId)); + + Settings newSettings = Settings.builder().put(AnomalyDetectorSettings.BACKOFF_MINUTES.getKey(), "1m").build(); + Settings.Builder target = Settings.builder(); + clusterSettings.updateDynamicSettings(newSettings, target, Settings.builder(), "test"); + clusterSettings.applySettings(target.build()); + stateManager.addPressure(nodeId, adId); + // move the clobk by 1000 milliseconds + // when evaluating isMuted, 62000 - 1000 (last mute time) > 60000, which + // make isMuted true + when(clock.millis()).thenReturn(62000L); + assertTrue(!stateManager.isMuted(nodeId, adId)); + } } diff --git a/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java b/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java new file mode 100644 index 000000000..d00096076 --- /dev/null +++ b/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.indices; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.opensearch.ad.AbstractADTest; +import org.opensearch.ad.constant.CommonName; +import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +public class UpdateMappingTests extends AbstractADTest { + private static String resultIndexName; + + private AnomalyDetectionIndices adIndices; + private ClusterService clusterService; + private int numberOfNodes; + private AdminClient adminClient; + private ClusterState clusterState; + private IndicesAdminClient indicesAdminClient; + + @BeforeClass + public static void setUpBeforeClass() { + resultIndexName = ".opendistro-anomaly-results-history-2020.06.24-000003"; + } + + @SuppressWarnings("unchecked") + @Override + public void setUp() throws Exception { + super.setUp(); + + Client client = mock(Client.class); + adminClient = mock(AdminClient.class); + when(client.admin()).thenReturn(adminClient); + indicesAdminClient = mock(IndicesAdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArgument(1); + + ImmutableOpenMap.Builder> builder = ImmutableOpenMap.builder(); + List aliasMetadata = new ArrayList<>(); + aliasMetadata.add(AliasMetadata.builder(ADIndex.RESULT.name()).build()); + builder.put(resultIndexName, aliasMetadata); + + listener.onResponse(new GetAliasesResponse(builder.build())); + return null; + }).when(indicesAdminClient).getAliases(any(GetAliasesRequest.class), any()); + + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD, + AnomalyDetectorSettings.MAX_PRIMARY_SHARDS + ) + ) + ) + ); + + clusterState = mock(ClusterState.class); + + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + IndexMetadata indexMetadata = IndexMetadata + .builder(resultIndexName) + .putAlias(AliasMetadata.builder(ADIndex.RESULT.getIndexName())) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + ImmutableOpenMap.Builder openMapBuilder = ImmutableOpenMap.builder(); + openMapBuilder.put(resultIndexName, indexMetadata); + Metadata metadata = Metadata.builder().indices(openMapBuilder.build()).build(); + when(clusterState.getMetadata()).thenReturn(metadata); + when(clusterState.metadata()).thenReturn(metadata); + + RoutingTable routingTable = mock(RoutingTable.class); + when(clusterState.getRoutingTable()).thenReturn(routingTable); + when(routingTable.hasIndex(anyString())).thenReturn(true); + + Settings settings = Settings.EMPTY; + DiscoveryNodeFilterer nodeFilter = mock(DiscoveryNodeFilterer.class); + numberOfNodes = 2; + when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes); + adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter); + } + + public void testNoIndexToUpdate() { + adIndices.updateMappingIfNecessary(); + verify(indicesAdminClient, never()).putMapping(any(), any()); + // for each index, we check doesAliasExists/doesIndexExists and shouldUpdateConcreteIndex + verify(clusterService, times(10)).state(); + adIndices.updateMappingIfNecessary(); + // we will not trigger new check since we have checked all indices before + verify(clusterService, times(10)).state(); + } + + @SuppressWarnings("serial") + public void testUpdate() throws IOException { + IndexMetadata indexMetadata = IndexMetadata + .builder(resultIndexName) + .putAlias(AliasMetadata.builder(ADIndex.RESULT.getIndexName())) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(new MappingMetadata("type", new HashMap() { + { + put(AnomalyDetectionIndices.META, new HashMap() { + { + // version 1 will cause update + put(CommonName.SCHEMA_VERSION_FIELD, 1); + } + }); + } + })) + .build(); + ImmutableOpenMap.Builder openMapBuilder = ImmutableOpenMap.builder(); + openMapBuilder.put(resultIndexName, indexMetadata); + Metadata metadata = Metadata.builder().indices(openMapBuilder.build()).build(); + when(clusterState.getMetadata()).thenReturn(metadata); + when(clusterState.metadata()).thenReturn(metadata); + adIndices.updateMappingIfNecessary(); + verify(indicesAdminClient, times(1)).putMapping(any(), any()); + } +} diff --git a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java index 477f5fd36..0630d79ac 100644 --- a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java +++ b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java @@ -71,13 +71,19 @@ import org.opensearch.action.ActionListener; import org.opensearch.ad.AnomalyDetectorPlugin; import org.opensearch.ad.MemoryTracker; +import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.breaker.ADCircuitBreakerService; import org.opensearch.ad.caching.EntityCache; import org.opensearch.ad.common.exception.LimitExceededException; import org.opensearch.ad.common.exception.ResourceNotFoundException; +import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator; +import org.opensearch.ad.dataprocessor.LinearUniformInterpolator; +import org.opensearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator; import org.opensearch.ad.feature.FeatureManager; +import org.opensearch.ad.feature.SearchFeatureDao; import org.opensearch.ad.ml.rcf.CombinedRcfResult; import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.ratelimit.CheckpointWriteWorker; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.util.DiscoveryNodeFilterer; import org.opensearch.cluster.node.DiscoveryNode; @@ -91,6 +97,9 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; + import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.returntypes.DiVector; import com.google.common.collect.Sets; @@ -957,4 +966,132 @@ public void getPreviewResults_returnAnomalies_forLastAnomaly() { public void getPreviewResults_throwIllegalArgument_forInvalidInput() { modelManager.getPreviewResults(new double[0][0], shingleSize); } + + @Test + public void processEmptyCheckpoint() { + ModelState modelState = modelManager.processEntityCheckpoint(Optional.empty(), null, "", ""); + assertEquals(Instant.MIN, modelState.getLastCheckpointTime()); + } + + @Test + public void processNonEmptyCheckpoint() { + String modelId = "abc"; + String detectorId = "123"; + EntityModel model = MLUtil.createNonEmptyModel(modelId); + Instant checkpointTime = Instant.ofEpochMilli(1000); + ModelState modelState = modelManager + .processEntityCheckpoint(Optional.of(new SimpleImmutableEntry<>(model, checkpointTime)), null, modelId, detectorId); + assertEquals(checkpointTime, modelState.getLastCheckpointTime()); + assertEquals(model.getSamples().size(), modelState.getModel().getSamples().size()); + assertEquals(now, modelState.getLastUsedTime()); + } + + @Test + public void getNullState() { + assertEquals(new ThresholdingResult(0, 0, 0), modelManager.getAnomalyResultForEntity(new double[] {}, null, "", null)); + } + + @Test + public void getEmptyStateFullSamples() { + SearchFeatureDao searchFeatureDao = mock(SearchFeatureDao.class); + + SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator = + new IntegerSensitiveSingleFeatureLinearUniformInterpolator(); + LinearUniformInterpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator); + + NodeStateManager stateManager = mock(NodeStateManager.class); + featureManager = new FeatureManager( + searchFeatureDao, + interpolator, + clock, + AnomalyDetectorSettings.MAX_TRAIN_SAMPLE, + AnomalyDetectorSettings.MAX_SAMPLE_STRIDE, + AnomalyDetectorSettings.TRAIN_SAMPLE_TIME_RANGE_IN_HOURS, + AnomalyDetectorSettings.MIN_TRAIN_SAMPLES, + AnomalyDetectorSettings.MAX_SHINGLE_PROPORTION_MISSING, + AnomalyDetectorSettings.MAX_IMPUTATION_NEIGHBOR_DISTANCE, + AnomalyDetectorSettings.PREVIEW_SAMPLE_RATE, + AnomalyDetectorSettings.MAX_PREVIEW_SAMPLES, + AnomalyDetectorSettings.HOURLY_MAINTENANCE, + threadPool, + AnomalyDetectorPlugin.AD_THREAD_POOL_NAME + ); + + CheckpointWriteWorker checkpointWriteQueue = mock(CheckpointWriteWorker.class); + + entityColdStarter = new EntityColdStarter( + clock, + threadPool, + stateManager, + AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE, + AnomalyDetectorSettings.MULTI_ENTITY_NUM_TREES, + AnomalyDetectorSettings.TIME_DECAY, + numMinSamples, + AnomalyDetectorSettings.MAX_SAMPLE_STRIDE, + AnomalyDetectorSettings.MAX_TRAIN_SAMPLE, + interpolator, + searchFeatureDao, + AnomalyDetectorSettings.DEFAULT_MULTI_ENTITY_SHINGLE, + AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE, + AnomalyDetectorSettings.THRESHOLD_MAX_RANK_ERROR, + AnomalyDetectorSettings.THRESHOLD_MAX_SCORE, + AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES, + AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES, + AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES, + featureManager, + settings, + AnomalyDetectorSettings.HOURLY_MAINTENANCE, + checkpointWriteQueue + ); + + modelManager = spy( + new ModelManager( + checkpointDao, + clock, + numTrees, + numSamples, + rcfTimeDecay, + numMinSamples, + thresholdMinPvalue, + thresholdMaxRankError, + thresholdMaxScore, + thresholdNumLogNormalQuantiles, + thresholdDownsamples, + thresholdMaxSamples, + minPreviewSize, + modelTtl, + checkpointInterval, + entityColdStarter, + modelPartitioner, + featureManager, + memoryTracker + ) + ); + + ModelState state = MLUtil + .randomModelState(new RandomModelStateConfig.Builder().fullModel(false).sampleSize(numMinSamples).build()); + EntityModel model = state.getModel(); + assertTrue(model.getRcf() == null || model.getThreshold() == null); + ThresholdingResult result = modelManager.getAnomalyResultForEntity(new double[] { -1 }, state, "", null); + // model outputs scores + assertTrue(result.getRcfScore() != 0); + // added the sample to score since our model is empty + assertEquals(0, model.getSamples().size()); + } + + @Test + public void getEmptyStateNotFullSamples() { + ModelState state = MLUtil + .randomModelState(new RandomModelStateConfig.Builder().fullModel(false).sampleSize(numMinSamples - 1).build()); + assertEquals(new ThresholdingResult(0, 0, 0), modelManager.getAnomalyResultForEntity(new double[] { -1 }, state, "", null)); + assertEquals(numMinSamples, state.getModel().getSamples().size()); + } + + @Test + public void scoreSamples() { + ModelState state = MLUtil.randomModelState(new RandomModelStateConfig.Builder().fullModel(true).build()); + modelManager.getAnomalyResultForEntity(new double[] { -1 }, state, "", null); + assertEquals(0, state.getModel().getSamples().size()); + assertEquals(now, state.getLastUsedTime()); + } } diff --git a/src/test/java/org/opensearch/ad/transport/ADResultBulkResponseTests.java b/src/test/java/org/opensearch/ad/transport/ADResultBulkResponseTests.java new file mode 100644 index 000000000..7d9308bb7 --- /dev/null +++ b/src/test/java/org/opensearch/ad/transport/ADResultBulkResponseTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.transport; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.opensearch.action.index.IndexRequest; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +public class ADResultBulkResponseTests extends OpenSearchTestCase { + public void testSerialization() throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + List retryRequests = new ArrayList<>(); + retryRequests.add(new IndexRequest("index").id("blah").source(Collections.singletonMap("foo", "bar"))); + ADResultBulkResponse response = new ADResultBulkResponse(retryRequests); + response.writeTo(output); + StreamInput streamInput = output.bytes().streamInput(); + ADResultBulkResponse readResponse = new ADResultBulkResponse(streamInput); + assertTrue(readResponse.hasFailures()); + } +} diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index 9514c97e0..6574b14dd 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -407,7 +407,7 @@ private void setUpEntityResult(int nodeIndex, NodeStateManager nodeStateManager) threadPool ); - when(normalModelManager.getAnomalyResultForEntity(any(), any(), any(), any(), any())).thenReturn(new ThresholdingResult(0, 1, 1)); + when(normalModelManager.getAnomalyResultForEntity(any(), any(), any(), any())).thenReturn(new ThresholdingResult(0, 1, 1)); } private void setUpEntityResult(int nodeIndex) { diff --git a/src/test/java/org/opensearch/ad/transport/handler/AbstractIndexHandlerTest.java b/src/test/java/org/opensearch/ad/transport/handler/AbstractIndexHandlerTest.java new file mode 100644 index 000000000..3c5a80c5b --- /dev/null +++ b/src/test/java/org/opensearch/ad/transport/handler/AbstractIndexHandlerTest.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.transport.handler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; +import static org.opensearch.ad.TestHelpers.createIndexBlockedState; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.ad.AbstractADTest; +import org.opensearch.ad.TestHelpers; +import org.opensearch.ad.constant.CommonName; +import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.transport.AnomalyResultTests; +import org.opensearch.ad.util.ClientUtil; +import org.opensearch.ad.util.IndexUtils; +import org.opensearch.ad.util.Throttler; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +public abstract class AbstractIndexHandlerTest extends AbstractADTest { + enum IndexCreation { + RUNTIME_EXCEPTION, + RESOURCE_EXISTS_EXCEPTION, + ACKED, + NOT_ACKED + } + + protected static Settings settings; + protected ClientUtil clientUtil; + protected ThreadPool context; + protected IndexUtils indexUtil; + protected String detectorId = "123"; + + @Mock + protected Client client; + + @Mock + protected AnomalyDetectionIndices anomalyDetectionIndices; + + @Mock + protected Throttler throttler; + + @Mock + protected ClusterService clusterService; + + @Mock + protected IndexNameExpressionResolver indexNameResolver; + + @BeforeClass + public static void setUpBeforeClass() { + setUpThreadPool(AnomalyResultTests.class.getSimpleName()); + settings = Settings + .builder() + .put("plugins.anomaly_detection.max_retry_for_backoff", 2) + .put("plugins.anomaly_detection.backoff_initial_delay", TimeValue.timeValueMillis(1)) + .build(); + } + + @AfterClass + public static void tearDownAfterClass() { + tearDownThreadPool(); + settings = null; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.initMocks(this); + setWriteBlockAdResultIndex(false); + context = TestHelpers.createThreadPool(); + clientUtil = new ClientUtil(settings, client, throttler, context); + indexUtil = new IndexUtils(client, clientUtil, clusterService, indexNameResolver); + } + + protected void setWriteBlockAdResultIndex(boolean blocked) { + String indexName = randomAlphaOfLength(10); + Settings settings = blocked + ? Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build() + : Settings.EMPTY; + ClusterState blockedClusterState = createIndexBlockedState(indexName, settings, CommonName.ANOMALY_RESULT_INDEX_ALIAS); + when(clusterService.state()).thenReturn(blockedClusterState); + when(indexNameResolver.concreteIndexNames(any(), any(), any(String.class))).thenReturn(new String[] { indexName }); + } + + @SuppressWarnings("unchecked") + protected void setUpSavingAnomalyResultIndex(boolean anomalyResultIndexExists, IndexCreation creationResult) throws IOException { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 1); + ActionListener listener = invocation.getArgument(0); + assertTrue(listener != null); + switch (creationResult) { + case RUNTIME_EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + case RESOURCE_EXISTS_EXCEPTION: + listener.onFailure(new ResourceAlreadyExistsException(CommonName.ANOMALY_RESULT_INDEX_ALIAS)); + break; + case ACKED: + listener.onResponse(new CreateIndexResponse(true, true, CommonName.ANOMALY_RESULT_INDEX_ALIAS)); + break; + case NOT_ACKED: + listener.onResponse(new CreateIndexResponse(false, false, CommonName.ANOMALY_RESULT_INDEX_ALIAS)); + break; + default: + assertTrue("should not reach here", false); + break; + } + return null; + }).when(anomalyDetectionIndices).initAnomalyResultIndexDirectly(any()); + when(anomalyDetectionIndices.doesAnomalyResultIndexExist()).thenReturn(anomalyResultIndexExists); + } + + protected void setUpSavingAnomalyResultIndex(boolean anomalyResultIndexExists) throws IOException { + setUpSavingAnomalyResultIndex(anomalyResultIndexExists, IndexCreation.ACKED); + } +} diff --git a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultHandlerTests.java b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultHandlerTests.java index a2a83a6f4..44117e037 100644 --- a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultHandlerTests.java +++ b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultHandlerTests.java @@ -32,8 +32,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.opensearch.ad.TestHelpers.createIndexBlockedState; import java.io.IOException; import java.time.Clock; @@ -41,95 +39,37 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.ArgumentMatchers; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.ad.AbstractADTest; import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.TestHelpers; import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.constant.CommonName; -import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.AnomalyResult; -import org.opensearch.ad.transport.AnomalyResultTests; -import org.opensearch.ad.util.ClientUtil; -import org.opensearch.ad.util.IndexUtils; -import org.opensearch.ad.util.Throttler; import org.opensearch.ad.util.ThrowingConsumerWrapper; -import org.opensearch.client.Client; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; -import org.opensearch.threadpool.ThreadPool; - -public class AnomalyResultHandlerTests extends AbstractADTest { - private static Settings settings; - @Mock - private ClusterService clusterService; - - @Mock - private Client client; - - private ClientUtil clientUtil; - - @Mock - private IndexNameExpressionResolver indexNameResolver; - - @Mock - private AnomalyDetectionIndices anomalyDetectionIndices; - - private String detectorId = "123"; - - @Mock - private Throttler throttler; - - private ThreadPool context; - - private IndexUtils indexUtil; +public class AnomalyResultHandlerTests extends AbstractIndexHandlerTest { @Mock private NodeStateManager nodeStateManager; @Mock private Clock clock; - @BeforeClass - public static void setUpBeforeClass() { - setUpThreadPool(AnomalyResultTests.class.getSimpleName()); - settings = Settings.EMPTY; - } - - @AfterClass - public static void tearDownAfterClass() { - tearDownThreadPool(); - settings = null; - } - @Override @Before public void setUp() throws Exception { super.setUp(); super.setUpLog4jForJUnit(AnomalyIndexHandler.class); - MockitoAnnotations.initMocks(this); - setWriteBlockAdResultIndex(false); - context = TestHelpers.createThreadPool(); - clientUtil = new ClientUtil(settings, client, throttler, context); - indexUtil = new IndexUtils(client, clientUtil, clusterService, indexNameResolver); } @Override @@ -208,7 +148,7 @@ public void testIndexWriteBlock() { @Test public void testAdResultIndexExist() throws IOException { - setInitAnomalyResultIndexException(true); + setUpSavingAnomalyResultIndex(false, IndexCreation.RESOURCE_EXISTS_EXCEPTION); AnomalyIndexHandler handler = new AnomalyIndexHandler( client, settings, @@ -229,7 +169,7 @@ public void testAdResultIndexOtherException() throws IOException { expectedEx.expect(AnomalyDetectionException.class); expectedEx.expectMessage("Error in saving .opendistro-anomaly-results for detector " + detectorId); - setInitAnomalyResultIndexException(false); + setUpSavingAnomalyResultIndex(false, IndexCreation.RUNTIME_EXCEPTION); AnomalyIndexHandler handler = new AnomalyIndexHandler( client, settings, @@ -245,28 +185,6 @@ public void testAdResultIndexOtherException() throws IOException { verify(client, never()).index(any(), any()); } - private void setInitAnomalyResultIndexException(boolean indexExistException) throws IOException { - Exception e = indexExistException ? mock(ResourceAlreadyExistsException.class) : mock(RuntimeException.class); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 1); - ActionListener listener = invocation.getArgument(0); - assertTrue(listener != null); - listener.onFailure(e); - return null; - }).when(anomalyDetectionIndices).initAnomalyResultIndexDirectly(any()); - } - - private void setWriteBlockAdResultIndex(boolean blocked) { - String indexName = randomAlphaOfLength(10); - Settings settings = blocked - ? Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build() - : Settings.EMPTY; - ClusterState blockedClusterState = createIndexBlockedState(indexName, settings, CommonName.ANOMALY_RESULT_INDEX_ALIAS); - when(clusterService.state()).thenReturn(blockedClusterState); - when(indexNameResolver.concreteIndexNames(any(), any(), any(String.class))).thenReturn(new String[] { indexName }); - } - /** * Template to test exponential backoff retry during saving anomaly result. * @@ -324,18 +242,4 @@ private void savingFailureTemplate(boolean throwOpenSearchRejectedExecutionExcep backoffLatch.await(1, TimeUnit.MINUTES); } - - @SuppressWarnings("unchecked") - private void setUpSavingAnomalyResultIndex(boolean anomalyResultIndexExists) throws IOException { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length >= 1); - ActionListener listener = invocation.getArgument(0); - assertTrue(listener != null); - listener.onResponse(new CreateIndexResponse(true, true, CommonName.ANOMALY_RESULT_INDEX_ALIAS) { - }); - return null; - }).when(anomalyDetectionIndices).initAnomalyResultIndexDirectly(any()); - when(anomalyDetectionIndices.doesAnomalyResultIndexExist()).thenReturn(anomalyResultIndexExists); - } } diff --git a/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java b/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java new file mode 100644 index 000000000..c74ab0a4b --- /dev/null +++ b/src/test/java/org/opensearch/ad/transport/handler/MultiEntityResultHandlerTests.java @@ -0,0 +1,194 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.transport.handler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.opensearch.action.ActionListener; +import org.opensearch.ad.TestHelpers; +import org.opensearch.ad.common.exception.AnomalyDetectionException; +import org.opensearch.ad.transport.ADResultBulkAction; +import org.opensearch.ad.transport.ADResultBulkRequest; +import org.opensearch.ad.transport.ADResultBulkResponse; + +public class MultiEntityResultHandlerTests extends AbstractIndexHandlerTest { + private MultiEntityResultHandler handler; + private ADResultBulkRequest request; + private ADResultBulkResponse response; + + @Override + public void setUp() throws Exception { + super.setUp(); + + handler = new MultiEntityResultHandler( + client, + settings, + threadPool, + anomalyDetectionIndices, + clientUtil, + indexUtil, + clusterService + ); + + request = new ADResultBulkRequest(); + request.add(TestHelpers.randomAnomalyDetectResult()); + + response = new ADResultBulkResponse(); + + super.setUpLog4jForJUnit(MultiEntityResultHandler.class); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(response); + return null; + }).when(client).execute(eq(ADResultBulkAction.INSTANCE), any(), ArgumentMatchers.>any()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + @Test + public void testIndexWriteBlock() throws InterruptedException { + setWriteBlockAdResultIndex(true); + + CountDownLatch verified = new CountDownLatch(1); + + handler.flush(request, ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + }, exception -> { + assertTrue(exception instanceof AnomalyDetectionException); + assertTrue( + "actual: " + exception.getMessage(), + exception.getMessage().contains(MultiEntityResultHandler.CANNOT_SAVE_RESULT_ERR_MSG) + ); + verified.countDown(); + })); + + assertTrue(verified.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testSavingAdResult() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { verified.countDown(); }, exception -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + assertEquals(1, testAppender.countMessage(MultiEntityResultHandler.SUCCESS_SAVING_RESULT_MSG, false)); + } + + @Test + public void testSavingFailure() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onFailure(new RuntimeException()); + return null; + }).when(client).execute(eq(ADResultBulkAction.INSTANCE), any(), ArgumentMatchers.>any()); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testAdResultIndexExists() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(true); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { verified.countDown(); }, exception -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + assertEquals(1, testAppender.countMessage(MultiEntityResultHandler.SUCCESS_SAVING_RESULT_MSG, false)); + } + + @Test + public void testNothingToSave() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(new ADResultBulkRequest(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + }, exception -> { + assertTrue(exception instanceof AnomalyDetectionException); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testCreateUnAcked() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false, IndexCreation.NOT_ACKED); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + }, exception -> { + assertTrue(exception instanceof AnomalyDetectionException); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testCreateRuntimeException() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false, IndexCreation.RUNTIME_EXCEPTION); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testCreateResourcExistsException() throws IOException, InterruptedException { + setUpSavingAnomalyResultIndex(false, IndexCreation.RESOURCE_EXISTS_EXCEPTION); + + CountDownLatch verified = new CountDownLatch(1); + handler.flush(request, ActionListener.wrap(response -> { verified.countDown(); }, exception -> { + assertTrue("Should not reach here ", false); + verified.countDown(); + })); + assertTrue(verified.await(100, TimeUnit.SECONDS)); + assertEquals(1, testAppender.countMessage(MultiEntityResultHandler.SUCCESS_SAVING_RESULT_MSG, false)); + } +} diff --git a/src/test/java/org/opensearch/ad/util/BulkUtilTests.java b/src/test/java/org/opensearch/ad/util/BulkUtilTests.java new file mode 100644 index 000000000..5beba1515 --- /dev/null +++ b/src/test/java/org/opensearch/ad/util/BulkUtilTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.util; + +import java.util.List; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkItemResponse.Failure; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.Index; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +public class BulkUtilTests extends OpenSearchTestCase { + public void testGetFailedIndexRequest() { + BulkItemResponse[] itemResponses = new BulkItemResponse[2]; + String indexName = "index"; + String type = "_doc"; + String idPrefix = "id"; + String uuid = "uuid"; + int shardIntId = 0; + ShardId shardId = new ShardId(new Index(indexName, uuid), shardIntId); + itemResponses[0] = new BulkItemResponse( + 0, + randomFrom(DocWriteRequest.OpType.values()), + new Failure(indexName, type, idPrefix + 0, new VersionConflictEngineException(shardId, "", "blah")) + ); + itemResponses[1] = new BulkItemResponse( + 1, + randomFrom(DocWriteRequest.OpType.values()), + new IndexResponse(shardId, "type", idPrefix + 1, 1, 1, randomInt(), true) + ); + BulkResponse response = new BulkResponse(itemResponses, 0); + + BulkRequest request = new BulkRequest(); + for (int i = 0; i < 2; i++) { + request.add(new IndexRequest(indexName).id(idPrefix + i).source(XContentType.JSON, "field", "value")); + } + + List retry = BulkUtil.getFailedIndexRequest(request, response); + assertEquals(1, retry.size()); + assertEquals(idPrefix + 0, retry.get(0).id()); + } +} diff --git a/src/test/java/org/opensearch/ad/util/ThrowingSupplierWrapperTests.java b/src/test/java/org/opensearch/ad/util/ThrowingSupplierWrapperTests.java new file mode 100644 index 000000000..3dbe7eb58 --- /dev/null +++ b/src/test/java/org/opensearch/ad/util/ThrowingSupplierWrapperTests.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.util; + +import java.io.IOException; + +import org.opensearch.test.OpenSearchTestCase; + +public class ThrowingSupplierWrapperTests extends OpenSearchTestCase { + private static String foo() throws IOException { + throw new IOException("blah"); + } + + public void testExceptionThrown() { + expectThrows( + RuntimeException.class, + () -> ThrowingSupplierWrapper.throwingSupplierWrapper(ThrowingSupplierWrapperTests::foo).get() + ); + } +}