Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ List<String> jacocoExclusions = [
'org.opensearch.ml.task.MLExecuteTaskRunner',
'org.opensearch.ml.action.profile.MLProfileTransportAction',
'org.opensearch.ml.action.models.DeleteModelTransportAction.1',
'org.opensearch.ml.rest.RestMLPredictionAction'
'org.opensearch.ml.rest.RestMLPredictionAction',
'org.opensearch.ml.breaker.DiskCircuitBreaker'
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.transport.forward.MLForwardAction;
import org.opensearch.ml.common.transport.forward.MLForwardInput;
import org.opensearch.ml.common.transport.forward.MLForwardRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;
package org.opensearch.ml.breaker;

public enum BreakerName {
MEMORY,
DISK
DISK,
NATIVE_MEMORY
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;
package org.opensearch.ml.breaker;

/**
* An interface for circuit breaker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;

import org.opensearch.ml.common.exception.MLException;
package org.opensearch.ml.breaker;

import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;

import org.opensearch.ml.common.exception.MLException;

/**
* A circuit breaker for disk usage.
*/
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
// TODO: make this value configurable as cluster setting
private static final String ML_DISK_CB = "Disk Circuit Breaker";
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
private static final long GB = 1024 * 1024 * 1024;
private String diskDir;

public DiskCircuitBreaker(String diskDir) {
Expand All @@ -32,17 +34,17 @@ public DiskCircuitBreaker(long threshold, String diskDir) {

@Override
public String getName() {
return ML_DISK_CB;
return ML_DISK_CB;
}

@Override
public boolean isOpen() {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
return (new File(diskDir).getFreeSpace()/1024/1024/1024) < getThreshold(); // in GB
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just set threshold and compare directly without dividing GB for every request? Just like what you did for native memory breaker?

});
} catch (PrivilegedActionException e) {
throw new MLException("Failed to run disk circuit breaker");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;

import lombok.extern.log4j.Log4j2;
import org.opensearch.monitor.jvm.JvmService;
package org.opensearch.ml.breaker;

import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import lombok.extern.log4j.Log4j2;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.os.OsService;

/**
* This service registers internal system breakers and provide API for users to register their own breakers.
*/
Expand All @@ -20,14 +24,23 @@ public class MLCircuitBreakerService {

private final ConcurrentMap<BreakerName, CircuitBreaker> breakers = new ConcurrentHashMap<>();
private final JvmService jvmService;
private final OsService osService;
private final Settings settings;
private final ClusterService clusterService;

/**
* Constructor.
*
* @param jvmService jvm info
* @param osService os info
* @param settings settings
* @param clusterService clusterService
*/
public MLCircuitBreakerService(JvmService jvmService) {
public MLCircuitBreakerService(JvmService jvmService, OsService osService, Settings settings, ClusterService clusterService) {
this.jvmService = jvmService;
this.osService = osService;
this.settings = settings;
this.clusterService = clusterService;
}

public void registerBreaker(BreakerName name, CircuitBreaker breaker) {
Expand Down Expand Up @@ -65,18 +78,21 @@ public MLCircuitBreakerService init(Path path) {
log.info("Registered ML memory breaker.");
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
log.info("Registered ML disk breaker.");
// Register native memory circuit breaker
registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
log.info("Registered ML native memory breaker.");

return this;
}

/**
*
* @return the name of any open circuit breaker; otherwise return null
* @return any open circuit breaker; otherwise return null
*/
public String checkOpenCB() {
public ThresholdCircuitBreaker checkOpenCB() {
for (CircuitBreaker breaker : breakers.values()) {
if (breaker.isOpen()) {
return breaker.getName();
return (ThresholdCircuitBreaker) breaker;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breaker class is CircuitBreaker, this line enforce it to ThresholdCircuitBreaker, seems not so extensible. If we have new child class of CircuitBreaker, this line will throw error.

Why need to change the return type from String to ThresholdCircuitBreaker?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to get more information like the current threshold when breaker is triggered to help debug.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add a warning log to help

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;
package org.opensearch.ml.breaker;

import org.opensearch.monitor.jvm.JvmService;

/**
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
//TODO: make this value configurable as cluster setting
// TODO: make this value configurable as cluster setting
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.breaker;

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.os.OsService;

/**
* A circuit breaker for native memory usage.
*/
public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
private final OsService osService;
private volatile Integer nativeMemThreshold = 90;

public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
this.osService = osService;
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
}

public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
super(threshold.shortValue());
this.nativeMemThreshold = threshold;
this.osService = osService;
}

@Override
public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.nativeMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.breaker;
package org.opensearch.ml.breaker;

/**
* An abstract class for all breakers with threshold.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.exception.MLException;
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
import org.opensearch.ml.common.model.MLModelState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import lombok.SneakyThrows;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -56,11 +58,11 @@
import org.opensearch.ml.action.upload_chunk.MLModelMetaCreate;
import org.opensearch.ml.action.upload_chunk.TransportCreateModelMetaAction;
import org.opensearch.ml.action.upload_chunk.TransportUploadModelChunkAction;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.cluster.MLCommonsClusterEventListener;
import org.opensearch.ml.cluster.MLCommonsClusterManagerEventListener;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.input.execute.anomalylocalization.AnomalyLocalizationInput;
import org.opensearch.ml.common.input.execute.samplecalculator.LocalSampleCalculatorInput;
import org.opensearch.ml.common.input.parameter.ad.AnomalyDetectionLibSVMParams;
Expand Down Expand Up @@ -131,6 +133,7 @@
import org.opensearch.ml.task.MLTrainingTaskRunner;
import org.opensearch.ml.utils.IndexUtils;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.os.OsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -207,6 +210,7 @@ public class MachineLearningPlugin extends Plugin implements ActionPlugin {
);
}

@SneakyThrows
@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -232,7 +236,9 @@ public Collection<Object> createComponents(
modelCacheHelper = new MLModelCacheHelper(clusterService, settings);

JvmService jvmService = new JvmService(environment.settings());
MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService).init(environment.dataFiles()[0]);
OsService osService = new OsService(environment.settings());
MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService)
.init(environment.dataFiles()[0]);

Map<Enum, MLStat<?>> stats = new ConcurrentHashMap<>();
// cluster level stats
Expand Down Expand Up @@ -505,7 +511,8 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_MAX_UPLOAD_TASKS_PER_NODE,
MLCommonsSettings.ML_COMMONS_MAX_ML_TASK_PER_NODE,
MLCommonsSettings.ML_COMMONS_MAX_LOAD_MODEL_TASKS_PER_NODE,
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX,
MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD
);
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ private MLCommonsSettings() {}
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> ML_COMMONS_NATIVE_MEM_THRESHOLD = Setting
.intSetting("plugins.ml_commons.native_memory_threshold", 90, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.input.Input;
import org.opensearch.ml.common.output.Output;
import org.opensearch.ml.common.transport.execute.MLExecuteTaskAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.dataset.MLInputDataType;
import org.opensearch.ml.common.dataset.MLInputDataset;
import org.opensearch.ml.common.exception.MLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import org.opensearch.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.transport.MLTaskRequest;
import org.opensearch.ml.common.transport.MLTaskResponse;
import org.opensearch.ml.stats.MLNodeLevelStat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.dataset.MLInputDataType;
import org.opensearch.ml.common.dataset.MLInputDataset;
import org.opensearch.ml.common.input.MLInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
import org.opensearch.ml.common.dataset.MLInputDataType;
import org.opensearch.ml.common.dataset.MLInputDataset;
import org.opensearch.ml.common.input.MLInput;
Expand Down
Loading