From eb03a32f7c74621c2e6a861ab248d4674753ad0f Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 10 Jan 2023 16:37:35 -0800 Subject: [PATCH 1/4] Add native memory circuit breaker. Refactor all breakers from common to plugin. Add dynamic setting for native memory circuit breaker. Signed-off-by: Jing Zhang --- plugin/build.gradle | 3 +- .../load/TransportLoadModelOnNodeAction.java | 2 +- .../opensearch/ml}/breaker/BreakerName.java | 5 +- .../ml}/breaker/CircuitBreaker.java | 2 +- .../ml}/breaker/DiskCircuitBreaker.java | 12 +-- .../ml}/breaker/MLCircuitBreakerService.java | 32 ++++++-- .../ml}/breaker/MemoryCircuitBreaker.java | 4 +- .../breaker/NativeMemoryCircuitBreaker.java | 50 ++++++++++++ .../ml}/breaker/ThresholdCircuitBreaker.java | 2 +- .../opensearch/ml/model/MLModelManager.java | 2 +- .../ml/plugin/MachineLearningPlugin.java | 13 ++- .../ml/settings/MLCommonsSettings.java | 3 + .../ml/task/MLExecuteTaskRunner.java | 2 +- .../ml/task/MLPredictTaskRunner.java | 2 +- .../org/opensearch/ml/task/MLTaskRunner.java | 2 +- .../ml/task/MLTrainAndPredictTaskRunner.java | 2 +- .../ml/task/MLTrainingTaskRunner.java | 2 +- .../org/opensearch/ml/utils/MLNodeUtils.java | 12 ++- .../ml/action/MLCommonsIntegTestCase.java | 9 ++- .../TransportLoadModelOnNodeActionTests.java | 2 +- .../breaker/MLCircuitBreakerServiceTests.java | 30 +++++-- .../breaker/MemoryCircuitBreakerTests.java | 6 +- .../NativeMemoryCircuitBreakerTests.java | 79 +++++++++++++++++++ .../ml/model/MLModelManagerTests.java | 11 ++- .../ml/rest/MLCommonsRestTestCase.java | 5 ++ .../ml/task/MLExecuteTaskRunnerTests.java | 2 +- .../ml/task/MLPredictTaskRunnerTests.java | 2 +- .../ml/task/MLTaskDispatcherTests.java | 2 +- .../MLTrainAndPredictTaskRunnerTests.java | 2 +- .../ml/task/MLTrainingTaskRunnerTests.java | 2 +- .../opensearch/ml/task/TaskRunnerTests.java | 9 ++- 31 files changed, 257 insertions(+), 56 deletions(-) rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/BreakerName.java (66%) rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/CircuitBreaker.java (87%) rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/DiskCircuitBreaker.java (87%) rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/MLCircuitBreakerService.java (64%) rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/MemoryCircuitBreaker.java (90%) create mode 100644 plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java rename {common/src/main/java/org/opensearch/ml/common => plugin/src/main/java/org/opensearch/ml}/breaker/ThresholdCircuitBreaker.java (92%) rename {common/src/test/java/org/opensearch/ml/common => plugin/src/test/java/org/opensearch/ml}/breaker/MLCircuitBreakerServiceTests.java (77%) rename {common/src/test/java/org/opensearch/ml/common => plugin/src/test/java/org/opensearch/ml}/breaker/MemoryCircuitBreakerTests.java (97%) create mode 100644 plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java diff --git a/plugin/build.gradle b/plugin/build.gradle index 956a5276dd..72657e7ee7 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -254,7 +254,8 @@ List jacocoExclusions = [ 'org.opensearch.ml.task.MLExecuteTaskRunner', 'org.opensearch.ml.action.profile.MLProfileTransportAction', 'org.opensearch.ml.action.models.DeleteModelTransportAction.1', - 'org.opensearch.ml.rest.RestMLPredictionAction' + 'org.opensearch.ml.rest.RestMLPredictionAction', + 'org.opensearch.ml.breaker.DiskCircuitBreaker' ] jacocoTestCoverageVerification { diff --git a/plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java b/plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java index 0a3b42cee3..1ce53aa567 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java @@ -28,9 +28,9 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLTask; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.transport.forward.MLForwardAction; import org.opensearch.ml.common.transport.forward.MLForwardInput; import org.opensearch.ml.common.transport.forward.MLForwardRequest; diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/BreakerName.java b/plugin/src/main/java/org/opensearch/ml/breaker/BreakerName.java similarity index 66% rename from common/src/main/java/org/opensearch/ml/common/breaker/BreakerName.java rename to plugin/src/main/java/org/opensearch/ml/breaker/BreakerName.java index fb6b9baad4..9bf1bfcf22 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/BreakerName.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/BreakerName.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; +package org.opensearch.ml.breaker; public enum BreakerName { MEMORY, - DISK + DISK, + NATIVE_MEMORY } diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/CircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/CircuitBreaker.java similarity index 87% rename from common/src/main/java/org/opensearch/ml/common/breaker/CircuitBreaker.java rename to plugin/src/main/java/org/opensearch/ml/breaker/CircuitBreaker.java index 5764c4b7d4..90f399fdbf 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/CircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/CircuitBreaker.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; +package org.opensearch.ml.breaker; /** * An interface for circuit breaker. diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/DiskCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java similarity index 87% rename from common/src/main/java/org/opensearch/ml/common/breaker/DiskCircuitBreaker.java rename to plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java index d1289c3623..47c269e02f 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/DiskCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java @@ -3,15 +3,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; - -import org.opensearch.ml.common.exception.MLException; +package org.opensearch.ml.breaker; import java.io.File; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import org.opensearch.ml.common.exception.MLException; + /** * A circuit breaker for disk usage. */ @@ -32,17 +32,17 @@ public DiskCircuitBreaker(long threshold, String diskDir) { @Override public String getName() { - return ML_DISK_CB; + return ML_DISK_CB; } @Override public boolean isOpen() { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - return (new File(diskDir).getFreeSpace()/1024/1024/1024) < getThreshold(); // in GB + return (new File(diskDir).getFreeSpace() / 1024 / 1024 / 1024) < getThreshold(); // in GB }); } catch (PrivilegedActionException e) { throw new MLException("Failed to run disk circuit breaker"); } } -} \ No newline at end of file +} diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/MLCircuitBreakerService.java b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java similarity index 64% rename from common/src/main/java/org/opensearch/ml/common/breaker/MLCircuitBreakerService.java rename to plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java index ba46d54d49..bd42c7d841 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/MLCircuitBreakerService.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java @@ -3,15 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; - -import lombok.extern.log4j.Log4j2; -import org.opensearch.monitor.jvm.JvmService; +package org.opensearch.ml.breaker; import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.os.OsService; + /** * This service registers internal system breakers and provide API for users to register their own breakers. */ @@ -20,14 +24,23 @@ public class MLCircuitBreakerService { private final ConcurrentMap breakers = new ConcurrentHashMap<>(); private final JvmService jvmService; + private final OsService osService; + private final Settings settings; + private final ClusterService clusterService; /** * Constructor. * * @param jvmService jvm info + * @param osService os info + * @param settings settings + * @param clusterService clusterService */ - public MLCircuitBreakerService(JvmService jvmService) { + public MLCircuitBreakerService(JvmService jvmService, OsService osService, Settings settings, ClusterService clusterService) { this.jvmService = jvmService; + this.osService = osService; + this.settings = settings; + this.clusterService = clusterService; } public void registerBreaker(BreakerName name, CircuitBreaker breaker) { @@ -65,18 +78,21 @@ public MLCircuitBreakerService init(Path path) { log.info("Registered ML memory breaker."); registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString())); log.info("Registered ML disk breaker."); + // Register native memory circuit breaker + registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService)); + log.info("Registered ML native memory breaker."); return this; } /** * - * @return the name of any open circuit breaker; otherwise return null + * @return any open circuit breaker; otherwise return null */ - public String checkOpenCB() { + public ThresholdCircuitBreaker checkOpenCB() { for (CircuitBreaker breaker : breakers.values()) { if (breaker.isOpen()) { - return breaker.getName(); + return (ThresholdCircuitBreaker) breaker; } } diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/MemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java similarity index 90% rename from common/src/main/java/org/opensearch/ml/common/breaker/MemoryCircuitBreaker.java rename to plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java index 9611775db5..165736baf2 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/MemoryCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; +package org.opensearch.ml.breaker; import org.opensearch.monitor.jvm.JvmService; @@ -11,7 +11,7 @@ * A circuit breaker for memory usage. */ public class MemoryCircuitBreaker extends ThresholdCircuitBreaker { - //TODO: make this value configurable as cluster setting + // TODO: make this value configurable as cluster setting private static final String ML_MEMORY_CB = "Memory Circuit Breaker"; public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85; private final JvmService jvmService; diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java new file mode 100644 index 0000000000..195a017648 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.breaker; + +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.os.OsService; + +/** + * A circuit breaker for native memory usage. + */ +public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker { + private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker"; + public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90; + private final OsService osService; + private volatile Integer nativeMemThreshold = 90; + + public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) { + super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD); + this.osService = osService; + this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it); + } + + public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) { + super(threshold.shortValue()); + this.nativeMemThreshold = threshold; + this.osService = osService; + } + + @Override + public String getName() { + return ML_MEMORY_CB; + } + + @Override + public Short getThreshold() { + return this.nativeMemThreshold.shortValue(); + } + + @Override + public boolean isOpen() { + return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue(); + } +} diff --git a/common/src/main/java/org/opensearch/ml/common/breaker/ThresholdCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java similarity index 92% rename from common/src/main/java/org/opensearch/ml/common/breaker/ThresholdCircuitBreaker.java rename to plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java index c461a794e1..1feec444ab 100644 --- a/common/src/main/java/org/opensearch/ml/common/breaker/ThresholdCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; +package org.opensearch.ml.breaker; /** * An abstract class for all breakers with threshold. diff --git a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java index 464325c544..a988d91c74 100644 --- a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java +++ b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java @@ -73,10 +73,10 @@ import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.reindex.DeleteByQueryAction; import org.opensearch.index.reindex.DeleteByQueryRequest; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.MLTask; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.exception.MLException; import org.opensearch.ml.common.exception.MLResourceNotFoundException; import org.opensearch.ml.common.model.MLModelState; diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index 541a7f6f6e..ed8b1b4c6b 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -15,6 +15,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import lombok.SneakyThrows; + import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.client.Client; @@ -56,11 +58,11 @@ import org.opensearch.ml.action.upload_chunk.MLModelMetaCreate; import org.opensearch.ml.action.upload_chunk.TransportCreateModelMetaAction; import org.opensearch.ml.action.upload_chunk.TransportUploadModelChunkAction; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.cluster.MLCommonsClusterEventListener; import org.opensearch.ml.cluster.MLCommonsClusterManagerEventListener; import org.opensearch.ml.common.FunctionName; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.input.execute.anomalylocalization.AnomalyLocalizationInput; import org.opensearch.ml.common.input.execute.samplecalculator.LocalSampleCalculatorInput; import org.opensearch.ml.common.input.parameter.ad.AnomalyDetectionLibSVMParams; @@ -131,6 +133,7 @@ import org.opensearch.ml.task.MLTrainingTaskRunner; import org.opensearch.ml.utils.IndexUtils; import org.opensearch.monitor.jvm.JvmService; +import org.opensearch.monitor.os.OsService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -207,6 +210,7 @@ public class MachineLearningPlugin extends Plugin implements ActionPlugin { ); } + @SneakyThrows @Override public Collection createComponents( Client client, @@ -232,7 +236,9 @@ public Collection createComponents( modelCacheHelper = new MLModelCacheHelper(clusterService, settings); JvmService jvmService = new JvmService(environment.settings()); - MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService).init(environment.dataFiles()[0]); + OsService osService = new OsService(environment.settings()); + MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService) + .init(environment.dataFiles()[0]); Map> stats = new ConcurrentHashMap<>(); // cluster level stats @@ -505,7 +511,8 @@ public List> getSettings() { MLCommonsSettings.ML_COMMONS_MAX_UPLOAD_TASKS_PER_NODE, MLCommonsSettings.ML_COMMONS_MAX_ML_TASK_PER_NODE, MLCommonsSettings.ML_COMMONS_MAX_LOAD_MODEL_TASKS_PER_NODE, - MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX + MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX, + MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD ); return settings; } diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java index 1f47d5b9c1..0e2bbefc8c 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java @@ -53,4 +53,7 @@ private MLCommonsSettings() {} Setting.Property.NodeScope, Setting.Property.Dynamic ); + + public static final Setting ML_COMMONS_NATIVE_MEM_THRESHOLD = Setting + .intSetting("plugins.ml_commons.native_mem_threshold", 90, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic); } diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java b/plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java index 0b414cf807..e9b53138e9 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java @@ -13,9 +13,9 @@ import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.input.Input; import org.opensearch.ml.common.output.Output; import org.opensearch.ml.common.transport.execute.MLExecuteTaskAction; diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java b/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java index 726401ef73..45c4b36c9c 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java @@ -32,13 +32,13 @@ import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.authuser.User; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.MLTaskType; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataset.MLInputDataType; import org.opensearch.ml.common.dataset.MLInputDataset; import org.opensearch.ml.common.exception.MLException; diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java b/plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java index 4f1d19f0ca..d867f6677a 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java @@ -14,10 +14,10 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.transport.MLTaskRequest; import org.opensearch.ml.common.transport.MLTaskResponse; import org.opensearch.ml.stats.MLNodeLevelStat; diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunner.java b/plugin/src/main/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunner.java index dfe1126849..7df3724b39 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunner.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunner.java @@ -17,11 +17,11 @@ import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.MLTaskType; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataset.MLInputDataType; import org.opensearch.ml.common.dataset.MLInputDataset; import org.opensearch.ml.common.input.MLInput; diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTrainingTaskRunner.java b/plugin/src/main/java/org/opensearch/ml/task/MLTrainingTaskRunner.java index 0c75d81714..1a5fa3afc6 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTrainingTaskRunner.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTrainingTaskRunner.java @@ -25,12 +25,12 @@ import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.MLTaskType; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataset.MLInputDataType; import org.opensearch.ml.common.dataset.MLInputDataset; import org.opensearch.ml.common.input.MLInput; diff --git a/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java b/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java index c58e5072c4..d86dbd4daa 100644 --- a/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java +++ b/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java @@ -17,7 +17,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.*; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; +import org.opensearch.ml.breaker.ThresholdCircuitBreaker; import org.opensearch.ml.common.exception.MLLimitExceededException; import org.opensearch.ml.stats.MLNodeLevelStat; import org.opensearch.ml.stats.MLStats; @@ -52,10 +53,15 @@ public static void parseField(XContentParser parser, Set set, Function(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService); Assert.assertNotNull(mlCircuitBreakerService.init(Path.of("/"))); } @@ -96,7 +112,7 @@ public void testIsOpen() { Assert.assertEquals(null, mlCircuitBreakerService.checkOpenCB()); when(mem.getHeapUsedPercent()).thenReturn((short) 90); - Assert.assertEquals("Memory Circuit Breaker", mlCircuitBreakerService.checkOpenCB()); + Assert.assertEquals("Memory Circuit Breaker", mlCircuitBreakerService.checkOpenCB().getName()); } } diff --git a/common/src/test/java/org/opensearch/ml/common/breaker/MemoryCircuitBreakerTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/MemoryCircuitBreakerTests.java similarity index 97% rename from common/src/test/java/org/opensearch/ml/common/breaker/MemoryCircuitBreakerTests.java rename to plugin/src/test/java/org/opensearch/ml/breaker/MemoryCircuitBreakerTests.java index aced22b1d1..a749b19264 100644 --- a/common/src/test/java/org/opensearch/ml/common/breaker/MemoryCircuitBreakerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/breaker/MemoryCircuitBreakerTests.java @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.ml.common.breaker; +package org.opensearch.ml.breaker; + +import static org.mockito.Mockito.when; import org.junit.Assert; import org.junit.Before; @@ -13,8 +15,6 @@ import org.opensearch.monitor.jvm.JvmService; import org.opensearch.monitor.jvm.JvmStats; -import static org.mockito.Mockito.when; - public class MemoryCircuitBreakerTests { @Mock diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java new file mode 100644 index 0000000000..4c68d83757 --- /dev/null +++ b/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.breaker; + +import static org.mockito.Mockito.when; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD; + +import java.util.Arrays; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.os.OsService; +import org.opensearch.monitor.os.OsStats; + +public class NativeMemoryCircuitBreakerTests { + + @Mock + ClusterService clusterService; + + @Mock + OsService osService; + + @Mock + OsStats osStats; + + @Mock + OsStats.Mem mem; + + private Settings settings; + private ClusterSettings clusterSettings; + + @Before + public void setup() { + settings = Settings.builder().put("plugins.ml_commons.native_mem_threshold", 90).build(); + clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD))); + MockitoAnnotations.openMocks(this); + when(osService.stats()).thenReturn(osStats); + when(osStats.getMem()).thenReturn(mem); + when(mem.getUsedPercent()).thenReturn((short) 50); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + + @Test + public void testIsOpen() { + // default threshold 90% + CircuitBreaker breaker = new NativeMemoryCircuitBreaker(osService, settings, clusterService); + Assert.assertFalse(breaker.isOpen()); + + // custom threshold 90% + breaker = new NativeMemoryCircuitBreaker(90, osService); + Assert.assertFalse(breaker.isOpen()); + } + + @Test + public void testIsOpen_ExceedMemoryThreshold() { + CircuitBreaker breaker = new NativeMemoryCircuitBreaker(osService, settings, clusterService); + + when(mem.getUsedPercent()).thenReturn((short) 95); + Assert.assertTrue(breaker.isOpen()); + } + + @Test + public void testIsOpen_CustomThreshold_ExceedMemoryThreshold() { + CircuitBreaker breaker = new NativeMemoryCircuitBreaker(90, osService); + + when(mem.getUsedPercent()).thenReturn((short) 95); + Assert.assertTrue(breaker.isOpen()); + } +} diff --git a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java index 1f4a861223..b8a89d4828 100644 --- a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java @@ -71,12 +71,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.ml.breaker.MLCircuitBreakerService; +import org.opensearch.ml.breaker.ThresholdCircuitBreaker; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.MLTaskType; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataset.MLInputDataType; import org.opensearch.ml.common.exception.MLException; import org.opensearch.ml.common.exception.MLLimitExceededException; @@ -145,6 +146,8 @@ public class MLModelManagerTests extends OpenSearchTestCase { @Mock private MLModelCacheHelper modelCacheHelper; private MLEngine mlEngine; + @Mock + ThresholdCircuitBreaker thresholdCircuitBreaker; @Before public void setup() throws URISyntaxException { @@ -263,9 +266,11 @@ public void testUploadMLModel_ExceedMaxRunningTask() { public void testUploadMLModel_CircuitBreakerOpen() { doNothing().when(mlTaskManager).checkLimitAndAddRunningTask(any(), any()); - when(mlCircuitBreakerService.checkOpenCB()).thenReturn("Disk Circuit Breaker"); + when(mlCircuitBreakerService.checkOpenCB()).thenReturn(thresholdCircuitBreaker); + when(thresholdCircuitBreaker.getName()).thenReturn("Disk Circuit Breaker"); + when(thresholdCircuitBreaker.getThreshold()).thenReturn(87); expectedEx.expect(MLException.class); - expectedEx.expectMessage("Disk Circuit Breaker is open, please check your resources!"); + expectedEx.expectMessage("Disk Circuit Breaker is open, threshold is 87. Please check your resources!"); modelManager.uploadMLModel(uploadInput, mlTask); verify(mlTaskManager).updateMLTask(anyString(), anyMap(), anyLong(), anyBoolean()); } diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index 4c1675e420..5f59410d97 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -116,6 +116,11 @@ public void setupSettings() throws IOException { ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) ); assertEquals(200, response.getStatusLine().getStatusCode()); + + String jsonEntity = "{\n" + " \"persistent\" : {\n" + " \"plugins.ml_commons.native_mem_threshold\" : 100 \n" + " }\n" + "}"; + response = TestHelper + .makeRequest(client(), "PUT", "_cluster/settings", ImmutableMap.of(), TestHelper.toHttpEntity(jsonEntity), null); + assertEquals(200, response.getStatusLine().getStatusCode()); } @Override diff --git a/plugin/src/test/java/org/opensearch/ml/task/MLExecuteTaskRunnerTests.java b/plugin/src/test/java/org/opensearch/ml/task/MLExecuteTaskRunnerTests.java index d6509794ae..60fbd19e80 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/MLExecuteTaskRunnerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/MLExecuteTaskRunnerTests.java @@ -27,9 +27,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.input.execute.samplecalculator.LocalSampleCalculatorInput; import org.opensearch.ml.common.transport.execute.MLExecuteTaskRequest; import org.opensearch.ml.common.transport.execute.MLExecuteTaskResponse; diff --git a/plugin/src/test/java/org/opensearch/ml/task/MLPredictTaskRunnerTests.java b/plugin/src/test/java/org/opensearch/ml/task/MLPredictTaskRunnerTests.java index 43d79a2355..c374a67f36 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/MLPredictTaskRunnerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/MLPredictTaskRunnerTests.java @@ -38,11 +38,11 @@ import org.opensearch.commons.authuser.User; import org.opensearch.index.get.GetResult; import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLModel; import org.opensearch.ml.common.MLTask; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataframe.DataFrame; import org.opensearch.ml.common.dataset.DataFrameInputDataset; import org.opensearch.ml.common.dataset.MLInputDataset; diff --git a/plugin/src/test/java/org/opensearch/ml/task/MLTaskDispatcherTests.java b/plugin/src/test/java/org/opensearch/ml/task/MLTaskDispatcherTests.java index f9b4e23a2a..02f907e081 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/MLTaskDispatcherTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/MLTaskDispatcherTests.java @@ -10,7 +10,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.ml.common.breaker.MemoryCircuitBreaker.DEFAULT_JVM_HEAP_USAGE_THRESHOLD; +import static org.opensearch.ml.breaker.MemoryCircuitBreaker.DEFAULT_JVM_HEAP_USAGE_THRESHOLD; import static org.opensearch.ml.plugin.MachineLearningPlugin.ML_ROLE_NAME; import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_ONLY_RUN_ON_ML_NODE; import static org.opensearch.ml.utils.TestHelper.ML_ROLE; diff --git a/plugin/src/test/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunnerTests.java b/plugin/src/test/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunnerTests.java index f7e40c99ad..0892ed2770 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunnerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunnerTests.java @@ -33,10 +33,10 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLTask; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataframe.DataFrame; import org.opensearch.ml.common.dataset.DataFrameInputDataset; import org.opensearch.ml.common.dataset.MLInputDataset; diff --git a/plugin/src/test/java/org/opensearch/ml/task/MLTrainingTaskRunnerTests.java b/plugin/src/test/java/org/opensearch/ml/task/MLTrainingTaskRunnerTests.java index 589393837f..f54a720aea 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/MLTrainingTaskRunnerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/MLTrainingTaskRunnerTests.java @@ -36,10 +36,10 @@ import org.opensearch.index.Index; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.shard.ShardId; +import org.opensearch.ml.breaker.MLCircuitBreakerService; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.MLTask; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.dataframe.DataFrame; import org.opensearch.ml.common.dataset.DataFrameInputDataset; import org.opensearch.ml.common.dataset.MLInputDataset; diff --git a/plugin/src/test/java/org/opensearch/ml/task/TaskRunnerTests.java b/plugin/src/test/java/org/opensearch/ml/task/TaskRunnerTests.java index 2ea45ae502..cc2edc88e6 100644 --- a/plugin/src/test/java/org/opensearch/ml/task/TaskRunnerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/task/TaskRunnerTests.java @@ -27,11 +27,12 @@ import org.mockito.MockitoAnnotations; import org.opensearch.action.ActionListener; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.ml.breaker.MLCircuitBreakerService; +import org.opensearch.ml.breaker.ThresholdCircuitBreaker; import org.opensearch.ml.cluster.DiscoveryNodeHelper; import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import org.opensearch.ml.common.MLTaskType; -import org.opensearch.ml.common.breaker.MLCircuitBreakerService; import org.opensearch.ml.common.exception.MLLimitExceededException; import org.opensearch.ml.common.transport.MLTaskRequest; import org.opensearch.ml.stats.MLNodeLevelStat; @@ -55,6 +56,8 @@ public class TaskRunnerTests extends OpenSearchTestCase { MLCircuitBreakerService mlCircuitBreakerService; @Mock ClusterService clusterService; + @Mock + ThresholdCircuitBreaker thresholdCircuitBreaker; MLTaskRunner mlTaskRunner; MLTask mlTask; @@ -129,7 +132,9 @@ public void testHandleAsyncMLTaskComplete_SyncTask() { } public void testRun_CircuitBreakerOpen() { - when(mlCircuitBreakerService.checkOpenCB()).thenReturn("Memory Circuit Breaker"); + when(mlCircuitBreakerService.checkOpenCB()).thenReturn(thresholdCircuitBreaker); + when(thresholdCircuitBreaker.getName()).thenReturn("Memory Circuit Breaker"); + when(thresholdCircuitBreaker.getThreshold()).thenReturn(87); TransportService transportService = mock(TransportService.class); ActionListener listener = mock(ActionListener.class); MLTaskRequest request = new MLTaskRequest(false); From 6493275c9c5f33f6fc63841326b1b87988c38047 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 10 Jan 2023 17:43:40 -0800 Subject: [PATCH 2/4] Address the comments 1. Signed-off-by: Jing Zhang --- .../java/org/opensearch/ml/breaker/DiskCircuitBreaker.java | 3 ++- .../java/org/opensearch/ml/settings/MLCommonsSettings.java | 2 +- .../opensearch/ml/breaker/MLCircuitBreakerServiceTests.java | 2 +- .../opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java | 2 +- .../java/org/opensearch/ml/rest/MLCommonsRestTestCase.java | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java index 47c269e02f..4b7fd942f9 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java @@ -18,6 +18,7 @@ public class DiskCircuitBreaker extends ThresholdCircuitBreaker { private static final String ML_DISK_CB = "Disk Circuit Breaker"; public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L; + private static final long GB = 1024 * 1024 * 1024; private String diskDir; public DiskCircuitBreaker(String diskDir) { @@ -39,7 +40,7 @@ public String getName() { public boolean isOpen() { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - return (new File(diskDir).getFreeSpace() / 1024 / 1024 / 1024) < getThreshold(); // in GB + return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB }); } catch (PrivilegedActionException e) { throw new MLException("Failed to run disk circuit breaker"); diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java index 0e2bbefc8c..26cf36a2bc 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java @@ -55,5 +55,5 @@ private MLCommonsSettings() {} ); public static final Setting ML_COMMONS_NATIVE_MEM_THRESHOLD = Setting - .intSetting("plugins.ml_commons.native_mem_threshold", 90, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic); + .intSetting("plugins.ml_commons.native_memory_threshold", 90, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic); } diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java index 7880c76ce7..8e5e503c82 100644 --- a/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java +++ b/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java @@ -95,7 +95,7 @@ public void testClearBreakers() { @Test public void testInit() { - Settings settings = Settings.builder().put("plugins.ml_commons.native_mem_threshold", 90).build(); + Settings settings = Settings.builder().put(ML_COMMONS_NATIVE_MEM_THRESHOLD.getKey(), 90).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD))); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService); diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java index 4c68d83757..14ee4716b7 100644 --- a/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreakerTests.java @@ -41,7 +41,7 @@ public class NativeMemoryCircuitBreakerTests { @Before public void setup() { - settings = Settings.builder().put("plugins.ml_commons.native_mem_threshold", 90).build(); + settings = Settings.builder().put(ML_COMMONS_NATIVE_MEM_THRESHOLD.getKey(), 90).build(); clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD))); MockitoAnnotations.openMocks(this); when(osService.stats()).thenReturn(osStats); diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index 5f59410d97..c929898109 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -117,7 +117,7 @@ public void setupSettings() throws IOException { ); assertEquals(200, response.getStatusLine().getStatusCode()); - String jsonEntity = "{\n" + " \"persistent\" : {\n" + " \"plugins.ml_commons.native_mem_threshold\" : 100 \n" + " }\n" + "}"; + String jsonEntity = "{\n" + " \"persistent\" : {\n" + " \"plugins.ml_commons.native_memory_threshold\" : 100 \n" + " }\n" + "}"; response = TestHelper .makeRequest(client(), "PUT", "_cluster/settings", ImmutableMap.of(), TestHelper.toHttpEntity(jsonEntity), null); assertEquals(200, response.getStatusLine().getStatusCode()); From fc43c48049971b865833f6bbb5d2f2add44cacd3 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 10 Jan 2023 17:47:59 -0800 Subject: [PATCH 3/4] Spotless changes. Signed-off-by: Jing Zhang --- .../java/org/opensearch/ml/rest/MLCommonsRestTestCase.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index c929898109..674ee63fd3 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -117,7 +117,11 @@ public void setupSettings() throws IOException { ); assertEquals(200, response.getStatusLine().getStatusCode()); - String jsonEntity = "{\n" + " \"persistent\" : {\n" + " \"plugins.ml_commons.native_memory_threshold\" : 100 \n" + " }\n" + "}"; + String jsonEntity = "{\n" + + " \"persistent\" : {\n" + + " \"plugins.ml_commons.native_memory_threshold\" : 100 \n" + + " }\n" + + "}"; response = TestHelper .makeRequest(client(), "PUT", "_cluster/settings", ImmutableMap.of(), TestHelper.toHttpEntity(jsonEntity), null); assertEquals(200, response.getStatusLine().getStatusCode()); From 8baa08f30036f6567d14f322b21b1d1c46ad94d5 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 10 Jan 2023 18:28:44 -0800 Subject: [PATCH 4/4] Address the comments 2. Signed-off-by: Jing Zhang --- .../java/org/opensearch/ml/breaker/DiskCircuitBreaker.java | 1 + .../src/main/java/org/opensearch/ml/utils/MLNodeUtils.java | 7 +------ .../java/org/opensearch/ml/model/MLModelManagerTests.java | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java index 4b7fd942f9..ab059b7c65 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java @@ -16,6 +16,7 @@ * A circuit breaker for disk usage. */ public class DiskCircuitBreaker extends ThresholdCircuitBreaker { + // TODO: make this value configurable as cluster setting private static final String ML_DISK_CB = "Disk Circuit Breaker"; public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L; private static final long GB = 1024 * 1024 * 1024; diff --git a/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java b/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java index d86dbd4daa..563b2855de 100644 --- a/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java +++ b/plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java @@ -56,12 +56,7 @@ public static void checkOpenCircuitBreaker(MLCircuitBreakerService mlCircuitBrea ThresholdCircuitBreaker openCircuitBreaker = mlCircuitBreakerService.checkOpenCB(); if (openCircuitBreaker != null) { mlStats.getStat(MLNodeLevelStat.ML_NODE_TOTAL_CIRCUIT_BREAKER_TRIGGER_COUNT).increment(); - throw new MLLimitExceededException( - openCircuitBreaker.getName() - + " is open, threshold is " - + openCircuitBreaker.getThreshold() - + ". Please check your resources!" - ); + throw new MLLimitExceededException(openCircuitBreaker.getName() + " is open, please check your resources!"); } } } diff --git a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java index b8a89d4828..0e61d19813 100644 --- a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java @@ -270,7 +270,7 @@ public void testUploadMLModel_CircuitBreakerOpen() { when(thresholdCircuitBreaker.getName()).thenReturn("Disk Circuit Breaker"); when(thresholdCircuitBreaker.getThreshold()).thenReturn(87); expectedEx.expect(MLException.class); - expectedEx.expectMessage("Disk Circuit Breaker is open, threshold is 87. Please check your resources!"); + expectedEx.expectMessage("Disk Circuit Breaker is open, please check your resources!"); modelManager.uploadMLModel(uploadInput, mlTask); verify(mlTaskManager).updateMLTask(anyString(), anyMap(), anyLong(), anyBoolean()); }