From 70ebd5631e1590ca0565a554c643f84a312fddc0 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 17 Nov 2020 10:54:11 -0500 Subject: [PATCH 1/2] [ML] Autoscaling for machine learning (#59309) This provides an autoscaling service integration for machine learning. The underlying logic is fairly straightforward with a couple of caveats: - When considering to scale up/down, ML will automatically translate between Node size and the memory that the node will potentially provide for ML after the scaling plan is implemented. - If knowledge of job sizes is out of date, we will do a best effort check for scaling up. But, if that cannot be determined with our current view of job memory, we attempt a refresh and return a no_scale event - We assume that if the auto memory percent calculation is being used, we treat all JVM sizes on the nodes the same. - For scale down, we keep our last scale down calculation time in memory. So, if master nodes are changed in between, we reset the scale down delay. --- .../elasticsearch/xpack/core/ml/MlTasks.java | 8 + x-pack/plugin/ml/build.gradle | 3 +- .../xpack/ml/MachineLearning.java | 31 +- ...ransportStartDataFrameAnalyticsAction.java | 1 + .../MlAutoscalingDeciderConfiguration.java | 154 +++++ .../MlAutoscalingDeciderService.java | 586 ++++++++++++++++++ .../autoscaling/MlAutoscalingExtension.java | 27 + .../MlAutoscalingNamedWritableProvider.java | 30 + .../xpack/ml/autoscaling/MlScalingReason.java | 191 ++++++ .../ml/autoscaling/NativeMemoryCapacity.java | 74 +++ .../xpack/ml/job/JobNodeSelector.java | 49 +- .../elasticsearch/xpack/ml/job/NodeLoad.java | 248 ++++++++ .../xpack/ml/job/NodeLoadDetector.java | 144 +---- .../upgrader/SnapshotUpgradeTaskExecutor.java | 1 + .../task/OpenJobPersistentTasksExecutor.java | 1 + .../xpack/ml/process/MlMemoryTracker.java | 15 +- .../AbstractJobPersistentTasksExecutor.java | 3 + .../ml/utils/NativeMemoryCalculator.java | 52 +- ...rch.xpack.autoscaling.AutoscalingExtension | 7 + ...lAutoscalingDeciderConfigurationTests.java | 44 ++ .../MlAutoscalingDeciderServiceTests.java | 289 +++++++++ .../ml/autoscaling/MlScalingReasonTests.java | 43 ++ .../NativeMemoryCapacityTests.java | 70 +++ .../ml/job/JobNodeLoadDetectorTests.java | 6 +- .../xpack/ml/job/JobNodeSelectorTests.java | 161 ++++- 25 files changed, 2060 insertions(+), 178 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfiguration.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingExtension.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacity.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java create mode 100644 x-pack/plugin/ml/src/main/resources/META-INF/services/org.elasticsearch.xpack.autoscaling.AutoscalingExtension create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfigurationTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReasonTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacityTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index cf0f7fa18a817..9ef9531c2e2a7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -48,6 +48,10 @@ public static String jobTaskId(String jobId) { return JOB_TASK_ID_PREFIX + jobId; } + public static String jobId(String jobTaskId) { + return jobTaskId.substring(JOB_TASK_ID_PREFIX.length()); + } + /** * Namespaces the task ids for datafeeds. * A job id can be used as a datafeed id, because they are stored separately in cluster state. @@ -67,6 +71,10 @@ public static String dataFrameAnalyticsTaskId(String id) { return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id; } + public static String dataFrameAnalyticsId(String taskId) { + return taskId.substring(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX.length()); + } + @Nullable public static PersistentTasksCustomMetadata.PersistentTask getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) { return tasks == null ? null : tasks.getTask(jobTaskId(jobId)); diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index d9cb8b007caa3..859e220cd22ef 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -5,7 +5,7 @@ esplugin { description 'Elasticsearch Expanded Pack Plugin - Machine Learning' classname 'org.elasticsearch.xpack.ml.MachineLearning' hasNativeController true - extendedPlugins = ['x-pack-core', 'lang-painless'] + extendedPlugins = ['x-pack-autoscaling', 'lang-painless'] } @@ -50,6 +50,7 @@ dependencies { compileOnly project(path: xpackModule('core'), configuration: 'default') testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts') testImplementation project(path: xpackModule('ilm'), configuration: 'default') + compileOnly project(path: xpackModule('autoscaling'), configuration: 'default') testImplementation project(path: xpackModule('data-streams'), configuration: 'default') // This should not be here testImplementation project(path: xpackModule('security'), configuration: 'testArtifacts') diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c1cf6595f0ce5..56a2e9ae92c2f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -65,6 +65,8 @@ import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; @@ -212,6 +214,7 @@ import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction; import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; +import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -413,7 +416,7 @@ public Set getRoles() { // controls the types of jobs that can be created, and each job alone is considerably smaller than what each node // can handle. public static final Setting MAX_MACHINE_MEMORY_PERCENT = - Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope); + Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope); /** * This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead. * @@ -469,6 +472,17 @@ public Set getRoles() { Property.NodeScope ); + /** + * This is the maximum possible node size for a machine learning node. It is useful when determining if a job could ever be opened + * on the cluster. + * + * If the value is the default special case of `0b`, that means the value is ignored when assigning jobs. + */ + public static final Setting MAX_ML_NODE_SIZE = Setting.byteSizeSetting( + "xpack.ml.max_ml_node_size", + ByteSizeValue.ZERO, + Property.NodeScope); + private static final Logger logger = LogManager.getLogger(MachineLearning.class); private final Settings settings; @@ -483,6 +497,7 @@ public Set getRoles() { private final SetOnce mlUpgradeModeActionFilter = new SetOnce<>(); private final SetOnce inferenceModelBreaker = new SetOnce<>(); private final SetOnce modelLoadingService = new SetOnce<>(); + private final SetOnce mlAutoscalingDeciderService = new SetOnce<>(); public MachineLearning(Settings settings, Path configPath) { this.settings = settings; @@ -520,7 +535,8 @@ public List> getSettings() { ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND, - USE_AUTO_MACHINE_MEMORY_PERCENT)); + USE_AUTO_MACHINE_MEMORY_PERCENT, + MAX_ML_NODE_SIZE)); } public Settings additionalSettings() { @@ -754,6 +770,8 @@ public Collection createComponents(Client client, ClusterService cluster // Perform node startup operations nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown(); + mlAutoscalingDeciderService.set(new MlAutoscalingDeciderService(memoryTracker, settings, clusterService)); + return Arrays.asList( mlLifeCycleService, jobResultsProvider, @@ -1115,4 +1133,13 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) { assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME); this.inferenceModelBreaker.set(circuitBreaker); } + + public Collection> deciders() { + if (enabled) { + assert mlAutoscalingDeciderService.get() != null; + return Collections.singletonList(mlAutoscalingDeciderService.get()); + } else { + return Collections.emptyList(); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 956263a39acc9..ab353c0034a50 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -635,6 +635,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, + maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfiguration.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfiguration.java new file mode 100644 index 0000000000000..fff6501aaa298 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfiguration.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; + +import java.io.IOException; +import java.util.Objects; + +public class MlAutoscalingDeciderConfiguration implements AutoscalingDeciderConfiguration { + static final String NAME = "ml"; + + private static final int DEFAULT_ANOMALY_JOBS_IN_QUEUE = 0; + private static final int DEFAULT_ANALYTICS_JOBS_IN_QUEUE = 0; + + private static final ParseField NUM_ANOMALY_JOBS_IN_QUEUE = new ParseField("num_anomaly_jobs_in_queue"); + private static final ParseField NUM_ANALYTICS_JOBS_IN_QUEUE = new ParseField("num_analytics_jobs_in_queue"); + private static final ParseField DOWN_SCALE_DELAY = new ParseField("down_scale_delay"); + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, + MlAutoscalingDeciderConfiguration.Builder::new); + + static { + PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnomalyJobsInQueue, NUM_ANOMALY_JOBS_IN_QUEUE); + PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnalyticsJobsInQueue, NUM_ANALYTICS_JOBS_IN_QUEUE); + PARSER.declareString(MlAutoscalingDeciderConfiguration.Builder::setDownScaleDelay, DOWN_SCALE_DELAY); + } + + public static MlAutoscalingDeciderConfiguration parse(final XContentParser parser) { + return PARSER.apply(parser, null).build(); + } + + private final int numAnomalyJobsInQueue; + private final int numAnalyticsJobsInQueue; + private final TimeValue downScaleDelay; + + MlAutoscalingDeciderConfiguration(int numAnomalyJobsInQueue, int numAnalyticsJobsInQueue, TimeValue downScaleDelay) { + if (numAnomalyJobsInQueue < 0) { + throw new IllegalArgumentException("[" + NUM_ANOMALY_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative"); + } + if (numAnalyticsJobsInQueue < 0) { + throw new IllegalArgumentException("[" + NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative"); + } + this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue; + this.numAnomalyJobsInQueue = numAnomalyJobsInQueue; + this.downScaleDelay = downScaleDelay; + } + + public MlAutoscalingDeciderConfiguration(StreamInput in) throws IOException { + numAnomalyJobsInQueue = in.readVInt(); + numAnalyticsJobsInQueue = in.readVInt(); + downScaleDelay = in.readTimeValue(); + } + + @Override + public String name() { + return NAME; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(numAnomalyJobsInQueue); + out.writeVInt(numAnalyticsJobsInQueue); + out.writeTimeValue(downScaleDelay); + } + + public int getNumAnomalyJobsInQueue() { + return numAnomalyJobsInQueue; + } + + public int getNumAnalyticsJobsInQueue() { + return numAnalyticsJobsInQueue; + } + + public TimeValue getDownScaleDelay() { + return downScaleDelay; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MlAutoscalingDeciderConfiguration that = (MlAutoscalingDeciderConfiguration) o; + return numAnomalyJobsInQueue == that.numAnomalyJobsInQueue && + numAnalyticsJobsInQueue == that.numAnalyticsJobsInQueue && + Objects.equals(downScaleDelay, that.downScaleDelay); + } + + @Override + public int hashCode() { + return Objects.hash(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NUM_ANOMALY_JOBS_IN_QUEUE .getPreferredName(), numAnomalyJobsInQueue); + builder.field(NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName(), numAnalyticsJobsInQueue); + builder.field(DOWN_SCALE_DELAY.getPreferredName(), downScaleDelay.getStringRep()); + builder.endObject(); + return builder; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private int numAnomalyJobsInQueue = DEFAULT_ANOMALY_JOBS_IN_QUEUE; + private int numAnalyticsJobsInQueue = DEFAULT_ANALYTICS_JOBS_IN_QUEUE; + private TimeValue downScaleDelay = TimeValue.ZERO; + + public Builder setNumAnomalyJobsInQueue(int numAnomalyJobsInQueue) { + this.numAnomalyJobsInQueue = numAnomalyJobsInQueue; + return this; + } + + public Builder setNumAnalyticsJobsInQueue(int numAnalyticsJobsInQueue) { + this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue; + return this; + } + + Builder setDownScaleDelay(String unparsedTimeValue) { + return setDownScaleDelay(TimeValue.parseTimeValue(unparsedTimeValue, DOWN_SCALE_DELAY.getPreferredName())); + } + + public Builder setDownScaleDelay(TimeValue downScaleDelay) { + this.downScaleDelay = Objects.requireNonNull(downScaleDelay); + return this; + } + + public MlAutoscalingDeciderConfiguration build() { + return new MlAutoscalingDeciderConfiguration(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay); + } + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java new file mode 100644 index 0000000000000..d32e9c28f4608 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -0,0 +1,586 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.autoscaling; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.NodeLoad; +import org.elasticsearch.xpack.ml.job.NodeLoadDetector; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.xpack.core.ml.MlTasks.getDataFrameAnalyticsState; +import static org.elasticsearch.xpack.core.ml.MlTasks.getJobStateModifiedForReassignments; +import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT; + +public class MlAutoscalingDeciderService implements AutoscalingDeciderService, + LocalNodeMasterListener { + + private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class); + private static final Duration DEFAULT_MEMORY_REFRESH_RATE = Duration.ofMinutes(15); + private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale"; + private static final long NO_SCALE_DOWN_POSSIBLE = -1L; + + private final NodeLoadDetector nodeLoadDetector; + private final MlMemoryTracker mlMemoryTracker; + private final Supplier timeSupplier; + private final boolean useAuto; + + private volatile boolean isMaster; + private volatile boolean running; + private volatile int maxMachineMemoryPercent; + private volatile int maxOpenJobs; + private volatile long lastTimeToScale; + private volatile long scaleDownDetected; + + public MlAutoscalingDeciderService(MlMemoryTracker memoryTracker, Settings settings, ClusterService clusterService) { + this(new NodeLoadDetector(memoryTracker), settings, clusterService, System::currentTimeMillis); + } + + MlAutoscalingDeciderService(NodeLoadDetector nodeLoadDetector, + Settings settings, + ClusterService clusterService, + Supplier timeSupplier) { + this.nodeLoadDetector = nodeLoadDetector; + this.mlMemoryTracker = nodeLoadDetector.getMlMemoryTracker(); + this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); + this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings); + this.useAuto = MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings); + this.timeSupplier = timeSupplier; + this.scaleDownDetected = NO_SCALE_DOWN_POSSIBLE; + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + this::setMaxMachineMemoryPercent); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); + clusterService.addLocalNodeMasterListener(this); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + running = true; + if (isMaster) { + mlMemoryTracker.asyncRefresh(); + } + } + + @Override + public void beforeStop() { + running = false; + } + }); + } + + static OptionalLong getNodeJvmSize(DiscoveryNode node) { + Map nodeAttributes = node.getAttributes(); + OptionalLong value = OptionalLong.empty(); + String valueStr = nodeAttributes.get(MachineLearning.MAX_JVM_SIZE_NODE_ATTR); + try { + value = OptionalLong.of(Long.parseLong(valueStr)); + } catch (NumberFormatException e) { + logger.debug(() -> new ParameterizedMessage( + "could not parse stored string value [{}] in node attribute [{}]", + valueStr, + MachineLearning.MAX_JVM_SIZE_NODE_ATTR)); + } + return value; + } + + static List getNodes(final ClusterState clusterState) { + return clusterState.nodes() + .mastersFirstStream() + .filter(MachineLearning::isMlNode) + .collect(Collectors.toList()); + } + + /** + * @param unassignedJobs The list of unassigned jobs + * @param sizeFunction Function providing the memory required for a job + * @param maxNumInQueue The number of unassigned jobs allowed. + * @return The capacity needed to reduce the length of `unassignedJobs` to `maxNumInQueue` + */ + static Optional requiredCapacityForUnassignedJobs(List unassignedJobs, + Function sizeFunction, + int maxNumInQueue) { + List jobSizes = unassignedJobs + .stream() + .map(sizeFunction) + .map(l -> l == null ? 0L : l) + .collect(Collectors.toList()); + // Only possible if unassignedJobs was empty. + if (jobSizes.isEmpty()) { + return Optional.empty(); + } + jobSizes.sort(Comparator.comparingLong(Long::longValue).reversed()); + long tierMemory = 0L; + long nodeMemory = jobSizes.get(0); + Iterator iter = jobSizes.iterator(); + while (jobSizes.size() > maxNumInQueue && iter.hasNext()) { + tierMemory += iter.next(); + iter.remove(); + } + return Optional.of(new NativeMemoryCapacity(tierMemory, nodeMemory)); + } + + private static Collection> anomalyDetectionTasks(PersistentTasksCustomMetadata tasksCustomMetadata) { + if (tasksCustomMetadata == null) { + return Collections.emptyList(); + } + + return tasksCustomMetadata.findTasks(MlTasks.JOB_TASK_NAME, + t -> getJobStateModifiedForReassignments(t).isAnyOf(JobState.OPENED, JobState.OPENING)); + } + + private static Collection> dataframeAnalyticsTasks(PersistentTasksCustomMetadata tasksCustomMetadata) { + if (tasksCustomMetadata == null) { + return Collections.emptyList(); + } + + return tasksCustomMetadata.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, + t -> getDataFrameAnalyticsState(t).isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STARTING)); + } + + @SuppressWarnings("unchecked") + private static Collection> datafeedTasks(PersistentTasksCustomMetadata tasksCustomMetadata) { + if (tasksCustomMetadata == null) { + return Collections.emptyList(); + } + + return tasksCustomMetadata.findTasks(MlTasks.DATAFEED_TASK_NAME, t -> true) + .stream() + .map(p -> (PersistentTask) p) + .collect(Collectors.toList()); + } + + void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) { + this.maxMachineMemoryPercent = maxMachineMemoryPercent; + } + + void setMaxOpenJobs(int maxOpenJobs) { + this.maxOpenJobs = maxOpenJobs; + } + + @Override + public void onMaster() { + isMaster = true; + if (running) { + mlMemoryTracker.asyncRefresh(); + } + } + + private void resetScaleDownCoolDown() { + this.scaleDownDetected = NO_SCALE_DOWN_POSSIBLE; + } + + private boolean canScaleDown(TimeValue coolDown) { + if (this.scaleDownDetected == NO_SCALE_DOWN_POSSIBLE) { + return false; + } + return timeSupplier.get() - scaleDownDetected >= coolDown.millis(); + } + + private boolean newScaleDownCheck() { + return scaleDownDetected == NO_SCALE_DOWN_POSSIBLE; + } + + NativeMemoryCapacity currentScale(final List machineLearningNodes) { + long[] mlMemory = machineLearningNodes.stream() + .mapToLong(node -> NativeMemoryCalculator.allowedBytesForMl(node, maxMachineMemoryPercent, useAuto).orElse(0L)) + .toArray(); + + return new NativeMemoryCapacity( + Arrays.stream(mlMemory).sum(), + Arrays.stream(mlMemory).max().orElse(0L), + // We assume that JVM size is universal, at least, the largest JVM indicates the largest node + machineLearningNodes.stream() + .map(MlAutoscalingDeciderService::getNodeJvmSize) + .mapToLong(l -> l.orElse(0L)) + .boxed() + .max(Long::compare) + .orElse(null) + ); + } + + @Override + public void offMaster() { + isMaster = false; + } + + @Override + public AutoscalingDeciderResult scale(MlAutoscalingDeciderConfiguration decider, AutoscalingDeciderContext context) { + if (isMaster == false) { + throw new IllegalArgumentException("request for scaling information is only allowed on the master node"); + } + final Duration memoryTrackingStale; + long previousTimeStamp = this.lastTimeToScale; + this.lastTimeToScale = this.timeSupplier.get(); + if (previousTimeStamp == 0L) { + memoryTrackingStale = DEFAULT_MEMORY_REFRESH_RATE; + } else { + memoryTrackingStale = Duration.ofMillis(TimeValue.timeValueMinutes(1).millis() + this.lastTimeToScale - previousTimeStamp); + } + + final ClusterState clusterState = context.state(); + + PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + Collection> anomalyDetectionTasks = anomalyDetectionTasks(tasks); + Collection> dataframeAnalyticsTasks = dataframeAnalyticsTasks(tasks); + final List nodes = getNodes(clusterState); + Optional futureFreedCapacity = calculateFutureFreedCapacity(tasks, memoryTrackingStale); + + final List waitingAnomalyJobs = anomalyDetectionTasks.stream() + .filter(t -> AWAITING_LAZY_ASSIGNMENT.equals(t.getAssignment())) + .map(t -> MlTasks.jobId(t.getId())) + .collect(Collectors.toList()); + final List waitingAnalyticsJobs = dataframeAnalyticsTasks.stream() + .filter(t -> AWAITING_LAZY_ASSIGNMENT.equals(t.getAssignment())) + .map(t -> MlTasks.dataFrameAnalyticsId(t.getId())) + .collect(Collectors.toList()); + + final NativeMemoryCapacity currentScale = currentScale(nodes); + final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder() + .setWaitingAnomalyJobs(waitingAnomalyJobs) + .setWaitingAnalyticsJobs(waitingAnalyticsJobs) + .setCurrentMlCapacity(currentScale.autoscalingCapacity(maxMachineMemoryPercent, useAuto)) + .setPassedConfiguration(decider); + + final Optional scaleUpDecision = checkForScaleUp(decider, + waitingAnomalyJobs, + waitingAnalyticsJobs, + futureFreedCapacity.orElse(null), + currentScale, + reasonBuilder); + + if (scaleUpDecision.isPresent()) { + resetScaleDownCoolDown(); + return scaleUpDecision.get(); + } + if (waitingAnalyticsJobs.isEmpty() == false || waitingAnomalyJobs.isEmpty() == false) { + // We don't want to continue to consider a scale down if there are now waiting jobs + resetScaleDownCoolDown(); + noScaleResultOrRefresh(reasonBuilder, memoryTrackingStale, new AutoscalingDeciderResult( + context.currentCapacity(), + reasonBuilder + .setSimpleReason("Passing currently perceived capacity as there are analytics and anomaly jobs in the queue, " + + "but the number in the queue is less than the configured maximum allowed.") + .build())); + } + if (mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false) { + logger.debug(() -> new ParameterizedMessage( + "view of job memory is stale given duration [{}]. Not attempting to scale down", + memoryTrackingStale)); + return buildDecisionAndRequestRefresh(reasonBuilder); + } + + long largestJob = Math.max( + anomalyDetectionTasks.stream() + .filter(PersistentTask::isAssigned) + // Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used + .mapToLong(t -> { + Long mem = this.getAnomalyMemoryRequirement(t); + assert mem != null : "unexpected null for anomaly memory requirement after recent stale check"; + return mem; + }) + .max() + .orElse(0L), + dataframeAnalyticsTasks.stream() + .filter(PersistentTask::isAssigned) + // Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used + .mapToLong(t -> { + Long mem = this.getAnalyticsMemoryRequirement(t); + assert mem != null : "unexpected null for analytics memory requirement after recent stale check"; + return mem; + }) + .max() + .orElse(0L)); + + final Optional scaleDownDecision = + checkForScaleDown(nodes, clusterState, largestJob, currentScale, reasonBuilder); + + if (scaleDownDecision.isPresent()) { + if (newScaleDownCheck()) { + scaleDownDetected = timeSupplier.get(); + } + if (canScaleDown(decider.getDownScaleDelay())) { + return scaleDownDecision.get(); + } + logger.debug(() -> new ParameterizedMessage( + "not scaling down as the current scale down delay [{}] is not satisfied. The last time scale down was detected [{}]", + decider.getDownScaleDelay().getStringRep(), + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected))); + return new AutoscalingDeciderResult( + context.currentCapacity(), + reasonBuilder + .setSimpleReason( + "Passing currently perceived capacity as configured down scale delay has not be satisfied; configured delay [" + + decider.getDownScaleDelay().millis() + + "] last detected scale down event [" + + scaleDownDetected + + "]") + .build()); + } + + return noScaleResultOrRefresh(reasonBuilder, memoryTrackingStale, new AutoscalingDeciderResult(context.currentCapacity(), + reasonBuilder + .setSimpleReason("Passing currently perceived capacity as no scaling changes were detected to be possible") + .build())); + } + + AutoscalingDeciderResult noScaleResultOrRefresh(MlScalingReason.Builder reasonBuilder, + Duration memoryTrackingStale, + AutoscalingDeciderResult potentialResult) { + if (mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false) { + logger.debug(() -> new ParameterizedMessage( + "current view of job memory is stale given the duration [{}]. Returning a no scale event", + memoryTrackingStale.toString())); + return buildDecisionAndRequestRefresh(reasonBuilder); + } else { + return potentialResult; + } + } + + Optional checkForScaleUp(MlAutoscalingDeciderConfiguration decider, + List waitingAnomalyJobs, + List waitingAnalyticsJobs, + @Nullable NativeMemoryCapacity futureFreedCapacity, + NativeMemoryCapacity currentScale, + MlScalingReason.Builder reasonBuilder) { + + // Are we in breach of maximum waiting jobs? + if (waitingAnalyticsJobs.size() > decider.getNumAnalyticsJobsInQueue() + || waitingAnomalyJobs.size() > decider.getNumAnomalyJobsInQueue()) { + NativeMemoryCapacity updatedCapacity = NativeMemoryCapacity.from(currentScale); + Optional analyticsCapacity = requiredCapacityForUnassignedJobs(waitingAnalyticsJobs, + this::getAnalyticsMemoryRequirement, + decider.getNumAnalyticsJobsInQueue()); + Optional anomalyCapacity = requiredCapacityForUnassignedJobs(waitingAnomalyJobs, + this::getAnomalyMemoryRequirement, + decider.getNumAnomalyJobsInQueue()); + + updatedCapacity.merge(anomalyCapacity.orElse(NativeMemoryCapacity.ZERO)) + .merge(analyticsCapacity.orElse(NativeMemoryCapacity.ZERO)); + return Optional.of(new AutoscalingDeciderResult( + updatedCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto), + reasonBuilder.setSimpleReason("requesting scale up as number of jobs in queues exceeded configured limit").build())); + } + + // Could the currently waiting jobs ever be assigned? + if (waitingAnalyticsJobs.isEmpty() == false || waitingAnomalyJobs.isEmpty() == false) { + // we are unable to determine new tier size, but maybe we can see if our nodes are big enough. + if (futureFreedCapacity == null) { + Optional maxSize = Stream.concat( + waitingAnalyticsJobs.stream().map(mlMemoryTracker::getDataFrameAnalyticsJobMemoryRequirement), + waitingAnomalyJobs.stream().map(mlMemoryTracker::getAnomalyDetectorJobMemoryRequirement)) + .filter(Objects::nonNull) + .max(Long::compareTo); + if (maxSize.isPresent() && maxSize.get() > currentScale.getNode()) { + return Optional.of(new AutoscalingDeciderResult( + new NativeMemoryCapacity(Math.max(currentScale.getTier(), maxSize.get()), maxSize.get()) + .autoscalingCapacity(maxMachineMemoryPercent, useAuto), + reasonBuilder.setSimpleReason("requesting scale up as there is no node large enough to handle queued jobs") + .build())); + } + // we have no info, allow the caller to make the appropriate action, probably returning a no_scale + return Optional.empty(); + } + long newTierNeeded = 0L; + // could any of the nodes actually run the job? + long newNodeMax = currentScale.getNode(); + for (String analyticsJob : waitingAnalyticsJobs) { + Long requiredMemory = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(analyticsJob); + // it is OK to continue here as we have not breached our queuing limit + if (requiredMemory == null) { + continue; + } + // Is there "future capacity" on a node that could run this job? If not, we need that much more in the tier. + if (futureFreedCapacity.getNode() < requiredMemory) { + newTierNeeded = Math.max(requiredMemory, newTierNeeded); + } + newNodeMax = Math.max(newNodeMax, requiredMemory); + } + for (String anomalyJob : waitingAnomalyJobs) { + Long requiredMemory = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(anomalyJob); + // it is OK to continue here as we have not breached our queuing limit + if (requiredMemory == null) { + continue; + } + // Is there "future capacity" on a node that could run this job? If not, we need that much more in the tier. + if (futureFreedCapacity.getNode() < requiredMemory) { + newTierNeeded = Math.max(requiredMemory, newTierNeeded); + } + newNodeMax = Math.max(newNodeMax, requiredMemory); + } + if (newNodeMax > currentScale.getNode() || newTierNeeded > 0L) { + NativeMemoryCapacity newCapacity = new NativeMemoryCapacity(newTierNeeded, newNodeMax); + return Optional.of(new AutoscalingDeciderResult( + // We need more memory in the tier, or our individual node size requirements has increased + NativeMemoryCapacity.from(currentScale).merge(newCapacity).autoscalingCapacity(maxMachineMemoryPercent, useAuto), + reasonBuilder + .setSimpleReason("scaling up as adequate space would not automatically become available when running jobs finish") + .build() + )); + } + } + + return Optional.empty(); + } + + // This calculates the the following the potentially automatically free capacity of sometime in the future + // Since jobs with lookback only datafeeds, and data frame analytics jobs all have some potential future end date + // we can assume (without user intervention) that these will eventually stop and free their currently occupied resources. + // + // The capacity is as follows: + // tier: The sum total of the resources that will be removed + // node: The largest block of memory that will be freed on a given node. + // - If > 1 "batch" ml tasks are running on the same node, we sum their resources. + Optional calculateFutureFreedCapacity(PersistentTasksCustomMetadata tasks, Duration jobMemoryExpiry) { + final List> jobsWithLookbackDatafeeds = datafeedTasks(tasks).stream() + .filter(t -> t.getParams().getEndTime() != null && t.getExecutorNode() != null) + .collect(Collectors.toList()); + final List> assignedAnalyticsJobs = dataframeAnalyticsTasks(tasks).stream() + .filter(t -> t.getExecutorNode() != null) + .collect(Collectors.toList()); + + if (jobsWithLookbackDatafeeds.isEmpty() && assignedAnalyticsJobs.isEmpty()) { + return Optional.of(NativeMemoryCapacity.ZERO); + } + if (mlMemoryTracker.isRecentlyRefreshed(jobMemoryExpiry) == false) { + return Optional.empty(); + } + // What is the largest chunk of memory that could be freed on a node in the future + Map maxNodeBytes = new HashMap<>(); + for (PersistentTask lookbackOnlyDf : jobsWithLookbackDatafeeds) { + Long jobSize = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(lookbackOnlyDf.getParams().getJobId()); + if (jobSize == null) { + return Optional.empty(); + } + maxNodeBytes.compute(lookbackOnlyDf.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v); + } + for (PersistentTask task : assignedAnalyticsJobs) { + Long jobSize = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(MlTasks.dataFrameAnalyticsId(task.getId())); + if (jobSize == null) { + return Optional.empty(); + } + maxNodeBytes.compute(task.getExecutorNode(), (_k, v) -> v == null ? jobSize : jobSize + v); + } + return Optional.of(new NativeMemoryCapacity( + maxNodeBytes.values().stream().mapToLong(Long::longValue).sum(), + maxNodeBytes.values().stream().mapToLong(Long::longValue).max().orElse(0L))); + } + + private AutoscalingDeciderResult buildDecisionAndRequestRefresh(MlScalingReason.Builder reasonBuilder) { + mlMemoryTracker.asyncRefresh(); + return new AutoscalingDeciderResult(null, reasonBuilder.setSimpleReason(MEMORY_STALE).build()); + } + + private Long getAnalyticsMemoryRequirement(String analyticsId) { + return mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(analyticsId); + } + + private Long getAnalyticsMemoryRequirement(PersistentTask task) { + return getAnalyticsMemoryRequirement(MlTasks.dataFrameAnalyticsId(task.getId())); + } + + private Long getAnomalyMemoryRequirement(String anomalyId) { + return mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(anomalyId); + } + + private Long getAnomalyMemoryRequirement(PersistentTask task) { + return getAnomalyMemoryRequirement(MlTasks.jobId(task.getId())); + } + + Optional checkForScaleDown(List nodes, + ClusterState clusterState, + long largestJob, + NativeMemoryCapacity currentCapacity, + MlScalingReason.Builder reasonBuilder) { + List nodeLoads = new ArrayList<>(); + boolean isMemoryAccurateFlag = true; + for (DiscoveryNode node : nodes) { + NodeLoad nodeLoad = nodeLoadDetector.detectNodeLoad(clusterState, + true, + node, + maxOpenJobs, + maxMachineMemoryPercent, + true, + useAuto); + if (nodeLoad.getError() != null) { + logger.warn("[{}] failed to gather node load limits, failure [{}]. Returning no scale", + node.getId(), + nodeLoad.getError()); + return Optional.empty(); + } + nodeLoads.add(nodeLoad); + isMemoryAccurateFlag = isMemoryAccurateFlag && nodeLoad.isUseMemory(); + } + // Even if we verify that memory usage is up to date before checking node capacity, we could still run into stale information. + // We should not make a decision if the memory usage is stale/inaccurate. + if (isMemoryAccurateFlag == false) { + assert isMemoryAccurateFlag : "view of memory is inaccurate after recent check"; + return Optional.empty(); + } + long currentlyNecessaryTier = nodeLoads.stream().mapToLong(NodeLoad::getAssignedJobMemory).sum(); + // We consider a scale down if we are not fully utilizing the tier + // Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible). + if (currentlyNecessaryTier < currentCapacity.getTier() || largestJob < currentCapacity.getNode()) { + NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity( + currentlyNecessaryTier, + largestJob, + // If our newly suggested native capacity is the same, we can use the previously stored jvm size + largestJob == currentCapacity.getNode() ? currentCapacity.getJvmSize() : null); + return Optional.of( + new AutoscalingDeciderResult( + nativeMemoryCapacity.autoscalingCapacity(maxMachineMemoryPercent, useAuto), + reasonBuilder.setSimpleReason("Requesting scale down as tier and/or node size could be smaller").build() + ) + ); + } + + return Optional.empty(); + } + + @Override + public String name() { + return "ml"; + } + +} + diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingExtension.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingExtension.java new file mode 100644 index 0000000000000..66002b1961605 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingExtension.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.xpack.autoscaling.AutoscalingExtension; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.util.Collection; + +public class MlAutoscalingExtension implements AutoscalingExtension { + private final MachineLearning plugin; + + public MlAutoscalingExtension(MachineLearning plugin) { + this.plugin = plugin; + } + + @Override + public Collection> deciders() { + return plugin.deciders(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java new file mode 100644 index 0000000000000..3164b306905b8 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; + +import java.util.Arrays; +import java.util.List; + +public final class MlAutoscalingNamedWritableProvider { + + private MlAutoscalingNamedWritableProvider() { } + + public static List getNamedWriteables() { + return Arrays.asList( + new NamedWriteableRegistry.Entry(AutoscalingDeciderConfiguration.class, + MlAutoscalingDeciderConfiguration.NAME, + MlAutoscalingDeciderConfiguration::new), + new NamedWriteableRegistry.Entry(AutoscalingDeciderResult.Reason.class, + MlScalingReason.NAME, + MlScalingReason::new) + ); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java new file mode 100644 index 0000000000000..02c71265fcd0b --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReason.java @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class MlScalingReason implements AutoscalingDeciderResult.Reason { + + static final String NAME = "ml"; + static final String WAITING_ANALYTICS_JOBS = "waiting_analytics_jobs"; + static final String WAITING_ANOMALY_JOBS = "waiting_anomaly_jobs"; + static final String CONFIGURATION = "configuration"; + static final String LARGEST_WAITING_ANALYTICS_JOB = "largest_waiting_analytics_job"; + static final String LARGEST_WAITING_ANOMALY_JOB = "largest_waiting_anomaly_job"; + static final String CURRENT_CAPACITY = "current_capacity"; + static final String REASON = "reason"; + + private final List waitingAnalyticsJobs; + private final List waitingAnomalyJobs; + private final AutoscalingDeciderConfiguration passedConfiguration; + private final Long largestWaitingAnalyticsJob; + private final Long largestWaitingAnomalyJob; + private final AutoscalingCapacity currentMlCapacity; + private final String simpleReason; + + public MlScalingReason(StreamInput in) throws IOException { + this.waitingAnalyticsJobs = in.readStringList(); + this.waitingAnomalyJobs = in.readStringList(); + this.passedConfiguration = in.readNamedWriteable(AutoscalingDeciderConfiguration.class); + this.currentMlCapacity = new AutoscalingCapacity(in); + this.largestWaitingAnalyticsJob = in.readOptionalVLong(); + this.largestWaitingAnomalyJob = in.readOptionalVLong(); + this.simpleReason = in.readString(); + } + + MlScalingReason(List waitingAnalyticsJobs, + List waitingAnomalyJobs, + AutoscalingDeciderConfiguration passedConfiguration, + Long largestWaitingAnalyticsJob, + Long largestWaitingAnomalyJob, + AutoscalingCapacity currentMlCapacity, + String simpleReason) { + this.waitingAnalyticsJobs = waitingAnalyticsJobs == null ? Collections.emptyList() : waitingAnalyticsJobs; + this.waitingAnomalyJobs = waitingAnomalyJobs == null ? Collections.emptyList() : waitingAnomalyJobs; + this.passedConfiguration = ExceptionsHelper.requireNonNull(passedConfiguration, CONFIGURATION); + this.largestWaitingAnalyticsJob = largestWaitingAnalyticsJob; + this.largestWaitingAnomalyJob = largestWaitingAnomalyJob; + this.currentMlCapacity = ExceptionsHelper.requireNonNull(currentMlCapacity, CURRENT_CAPACITY); + this.simpleReason = ExceptionsHelper.requireNonNull(simpleReason, REASON); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MlScalingReason that = (MlScalingReason) o; + return Objects.equals(waitingAnalyticsJobs, that.waitingAnalyticsJobs) && + Objects.equals(waitingAnomalyJobs, that.waitingAnomalyJobs) && + Objects.equals(passedConfiguration, that.passedConfiguration) && + Objects.equals(largestWaitingAnalyticsJob, that.largestWaitingAnalyticsJob) && + Objects.equals(largestWaitingAnomalyJob, that.largestWaitingAnomalyJob) && + Objects.equals(currentMlCapacity, that.currentMlCapacity) && + Objects.equals(simpleReason, that.simpleReason); + } + + @Override + public int hashCode() { + return Objects.hash(waitingAnalyticsJobs, + waitingAnomalyJobs, + passedConfiguration, + largestWaitingAnalyticsJob, + largestWaitingAnomalyJob, + currentMlCapacity, + simpleReason); + } + + @Override + public String summary() { + return simpleReason; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringCollection(this.waitingAnalyticsJobs); + out.writeStringCollection(this.waitingAnomalyJobs); + out.writeNamedWriteable(this.passedConfiguration); + this.currentMlCapacity.writeTo(out); + out.writeOptionalVLong(largestWaitingAnalyticsJob); + out.writeOptionalVLong(largestWaitingAnomalyJob); + out.writeString(this.simpleReason); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(WAITING_ANALYTICS_JOBS, waitingAnalyticsJobs); + builder.field(WAITING_ANOMALY_JOBS, waitingAnalyticsJobs); + builder.field(CONFIGURATION, passedConfiguration); + if (largestWaitingAnalyticsJob != null) { + builder.field(LARGEST_WAITING_ANALYTICS_JOB, largestWaitingAnalyticsJob); + } + if (largestWaitingAnomalyJob != null) { + builder.field(LARGEST_WAITING_ANOMALY_JOB, largestWaitingAnomalyJob); + } + builder.field(CURRENT_CAPACITY, currentMlCapacity); + builder.field(REASON, simpleReason); + builder.endObject(); + return builder; + } + + static class Builder { + private List waitingAnalyticsJobs = Collections.emptyList(); + private List waitingAnomalyJobs = Collections.emptyList(); + private AutoscalingDeciderConfiguration passedConfiguration; + private Long largestWaitingAnalyticsJob; + private Long largestWaitingAnomalyJob; + private AutoscalingCapacity currentMlCapacity; + private String simpleReason; + + public Builder setWaitingAnalyticsJobs(List waitingAnalyticsJobs) { + this.waitingAnalyticsJobs = waitingAnalyticsJobs; + return this; + } + + public Builder setWaitingAnomalyJobs(List waitingAnomalyJobs) { + this.waitingAnomalyJobs = waitingAnomalyJobs; + return this; + } + + public Builder setPassedConfiguration(AutoscalingDeciderConfiguration passedConfiguration) { + this.passedConfiguration = passedConfiguration; + return this; + } + + public Builder setLargestWaitingAnalyticsJob(Long largestWaitingAnalyticsJob) { + this.largestWaitingAnalyticsJob = largestWaitingAnalyticsJob; + return this; + } + + public Builder setLargestWaitingAnomalyJob(Long largestWaitingAnomalyJob) { + this.largestWaitingAnomalyJob = largestWaitingAnomalyJob; + return this; + } + + public Builder setCurrentMlCapacity(AutoscalingCapacity currentMlCapacity) { + this.currentMlCapacity = currentMlCapacity; + return this; + } + + public Builder setSimpleReason(String simpleReason) { + this.simpleReason = simpleReason; + return this; + } + + public MlScalingReason build() { + return new MlScalingReason( + waitingAnalyticsJobs, + waitingAnomalyJobs, + passedConfiguration, + largestWaitingAnalyticsJob, + largestWaitingAnomalyJob, + currentMlCapacity, + simpleReason + ); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacity.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacity.java new file mode 100644 index 0000000000000..396d30ba1bc3a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacity.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; +import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator; + +// Used for storing native memory capacity and then transforming it into an autoscaling capacity +// which takes into account the whole node size +public class NativeMemoryCapacity { + + static final NativeMemoryCapacity ZERO = new NativeMemoryCapacity(0L, 0L); + + static NativeMemoryCapacity from(NativeMemoryCapacity capacity) { + return new NativeMemoryCapacity(capacity.tier, capacity.node, capacity.jvmSize); + } + + private long tier; + private long node; + private Long jvmSize; + + NativeMemoryCapacity(long tier, long node, Long jvmSize) { + this.tier = tier; + this.node = node; + this.jvmSize = jvmSize; + } + + NativeMemoryCapacity(long tier, long node) { + this.tier = tier; + this.node = node; + } + + NativeMemoryCapacity merge(NativeMemoryCapacity nativeMemoryCapacity) { + this.tier += nativeMemoryCapacity.tier; + if (nativeMemoryCapacity.node > this.node) { + this.node = nativeMemoryCapacity.node; + // If the new node size is bigger, we have no way of knowing if the JVM size would stay the same + // So null out + this.jvmSize = null; + } + return this; + } + + AutoscalingCapacity autoscalingCapacity(int maxMemoryPercent, boolean useAuto) { + int memoryPercentForMl = jvmSize == null ? + NativeMemoryCalculator.modelMemoryPercent(node, maxMemoryPercent, useAuto) : + // We make the assumption that the JVM size is the same across the entire tier + // This simplifies calculating the tier as it means that each node in the tier + // will have the same dynamic memory calculation. And thus the tier is simply the sum of the memory necessary + // times that scaling factor. + NativeMemoryCalculator.modelMemoryPercent(node, jvmSize, maxMemoryPercent, useAuto); + double inverseScale = maxMemoryPercent <= 0 ? 0 : 100.0 / memoryPercentForMl; + return new AutoscalingCapacity( + new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes((long)Math.ceil(tier * inverseScale))), + new AutoscalingCapacity.AutoscalingResources(null, ByteSizeValue.ofBytes((long)Math.ceil(node * inverseScale)))); + } + + public long getTier() { + return tier; + } + + public long getNode() { + return node; + } + + public Long getJvmSize() { + return jvmSize; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index caa3fea4871b9..68cb0db59ffad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -15,7 +15,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -61,7 +63,11 @@ public class JobNodeSelector { * reasons why a job cannot be assigned to a particular node. May * be null if no such function is needed. */ - public JobNodeSelector(ClusterState clusterState, String jobId, String taskName, MlMemoryTracker memoryTracker, int maxLazyNodes, + public JobNodeSelector(ClusterState clusterState, + String jobId, + String taskName, + MlMemoryTracker memoryTracker, + int maxLazyNodes, Function nodeFilter) { this.jobId = Objects.requireNonNull(jobId); this.taskName = Objects.requireNonNull(taskName); @@ -77,8 +83,11 @@ public JobNodeSelector(ClusterState clusterState, String jobId, String taskName, }; } - public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, int maxConcurrentJobAllocations, - int maxMachineMemoryPercent, boolean isMemoryTrackerRecentlyRefreshed, + public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, + int maxConcurrentJobAllocations, + int maxMachineMemoryPercent, + long maxNodeSize, + boolean isMemoryTrackerRecentlyRefreshed, boolean useAutoMemoryPercentage) { // TODO: remove in 8.0.0 boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_2_0); @@ -105,8 +114,7 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob reasons.add(reason); continue; } - - NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad( + NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad( clusterState, allNodesHaveDynamicMaxWorkers, // Remove in 8.0.0 node, @@ -122,7 +130,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob reasons.add(reason); continue; } - // Assuming the node is eligible at all, check loading allocateByMemory = currentLoad.isUseMemory(); int maxNumberOfOpenJobs = currentLoad.getMaxJobs(); @@ -200,14 +207,38 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob } } } - return createAssignment(allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount, reasons); + + return createAssignment( + allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount, + reasons, + allocateByMemory && maxNodeSize > 0L ? + NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) : + Long.MAX_VALUE); } - private PersistentTasksCustomMetadata.Assignment createAssignment(DiscoveryNode minLoadedNode, List reasons) { + PersistentTasksCustomMetadata.Assignment createAssignment(DiscoveryNode minLoadedNode, + List reasons, + long biggestPossibleJob) { if (minLoadedNode == null) { String explanation = String.join("|", reasons); + PersistentTasksCustomMetadata.Assignment currentAssignment = + new PersistentTasksCustomMetadata.Assignment(null, explanation); logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation); - return considerLazyAssignment(new PersistentTasksCustomMetadata.Assignment(null, explanation)); + Long estimatedMemoryUsage = memoryTracker.getJobMemoryRequirement(taskName, jobId); + if (estimatedMemoryUsage != null + && (MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > biggestPossibleJob) { + ParameterizedMessage message = new ParameterizedMessage( + "[{}] not waiting for node assignment as estimated job size [{}] is greater than largest possible job size [{}]", + jobId, + MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage, + biggestPossibleJob); + logger.info(message); + List newReasons = new ArrayList<>(reasons); + newReasons.add(message.getFormattedMessage()); + explanation = String.join("|", newReasons); + return new PersistentTasksCustomMetadata.Assignment(null, explanation); + } + return considerLazyAssignment(currentAssignment); } logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId); return new PersistentTasksCustomMetadata.Assignment(minLoadedNode.getId(), ""); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java new file mode 100644 index 0000000000000..21ff5dc9442c2 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoad.java @@ -0,0 +1,248 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; + +import java.util.Objects; + +public class NodeLoad { + + private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class); + + private final long maxMemory; + private final int maxJobs; + private final String nodeId; + private final boolean useMemory; + private final String error; + private final long numAssignedJobs; + private final long assignedJobMemory; + private final long numAllocatingJobs; + + NodeLoad(long maxMemory, + int maxJobs, + String nodeId, + boolean useMemory, + String error, + long numAssignedJobs, + long assignedJobMemory, + long numAllocatingJobs) { + this.maxMemory = maxMemory; + this.maxJobs = maxJobs; + this.nodeId = nodeId; + this.useMemory = useMemory; + this.error = error; + this.numAssignedJobs = numAssignedJobs; + this.assignedJobMemory = assignedJobMemory; + this.numAllocatingJobs = numAllocatingJobs; + } + + /** + * @return The total number of assigned jobs + */ + public long getNumAssignedJobs() { + return numAssignedJobs; + } + + /** + * @return The total memory in bytes used by the assigned jobs. + */ + public long getAssignedJobMemory() { + return assignedJobMemory; + } + + /** + * @return The maximum memory on this node for jobs + */ + public long getMaxMlMemory() { + return maxMemory; + } + + /** + * @return The maximum number of jobs allowed on the node + */ + public int getMaxJobs() { + return maxJobs; + } + + /** + * @return returns `true` if the assignedJobMemory number is accurate + */ + public boolean isUseMemory() { + return useMemory; + } + + /** + * @return The node ID + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return Returns a comma delimited string of errors if any were encountered. + */ + @Nullable + public String getError() { + return error; + } + + /** + * @return The current number of jobs allocating to the node + */ + public long getNumAllocatingJobs() { + return numAllocatingJobs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeLoad nodeLoad = (NodeLoad) o; + return maxMemory == nodeLoad.maxMemory && + maxJobs == nodeLoad.maxJobs && + useMemory == nodeLoad.useMemory && + numAssignedJobs == nodeLoad.numAssignedJobs && + assignedJobMemory == nodeLoad.assignedJobMemory && + numAllocatingJobs == nodeLoad.numAllocatingJobs && + Objects.equals(nodeId, nodeLoad.nodeId) && + Objects.equals(error, nodeLoad.error); + } + + @Override + public int hashCode() { + return Objects.hash(maxMemory, maxJobs, nodeId, useMemory, error, numAssignedJobs, assignedJobMemory, numAllocatingJobs); + } + + public static Builder builder(String nodeId) { + return new Builder(nodeId); + } + + public static class Builder { + private long maxMemory; + private int maxJobs; + private final String nodeId; + private boolean useMemory; + private String error; + private long numAssignedJobs; + private long assignedJobMemory; + private long numAllocatingJobs; + + public Builder(String nodeId) { + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + public long getNumAssignedJobs() { + return numAssignedJobs; + } + + public Builder setMaxMemory(long maxMemory) { + this.maxMemory = maxMemory; + return this; + } + + public Builder setMaxJobs(int maxJobs) { + this.maxJobs = maxJobs; + return this; + } + + public Builder setUseMemory(boolean useMemory) { + this.useMemory = useMemory; + return this; + } + + public Builder setError(String error) { + this.error = error; + return this; + } + + public Builder incNumAssignedJobs() { + ++this.numAssignedJobs; + return this; + } + + public Builder incAssignedJobMemory(long assignedJobMemory) { + this.assignedJobMemory += assignedJobMemory; + return this; + } + + public Builder incNumAllocatingJobs() { + ++this.numAllocatingJobs; + return this; + } + + void adjustForAnomalyJob(JobState jobState, + String jobId, + MlMemoryTracker mlMemoryTracker) { + if ((jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) && jobId != null) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + ++numAssignedJobs; + if (jobState == JobState.OPENING) { + ++numAllocatingJobs; + } + Long jobMemoryRequirement = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId); + if (jobMemoryRequirement == null) { + useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + jobId + )); + } else { + assignedJobMemory += jobMemoryRequirement; + } + } + } + + void adjustForAnalyticsJob(PersistentTasksCustomMetadata.PersistentTask assignedTask, + MlMemoryTracker mlMemoryTracker) { + DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); + + // Don't count stopped and failed df-analytics tasks as they don't consume native memory + if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { + // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED + // and REINDEXING states we're committed to using the memory soon, so account for it here + ++numAssignedJobs; + StartDataFrameAnalyticsAction.TaskParams params = + (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); + Long jobMemoryRequirement = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); + if (jobMemoryRequirement == null) { + useMemory = false; + logger.debug(() -> new ParameterizedMessage( + "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", + params.getId() + )); + } else { + assignedJobMemory += jobMemoryRequirement; + } + } + } + + public NodeLoad build() { + return new NodeLoad(maxMemory, + maxJobs, + nodeId, + useMemory, + error, + numAssignedJobs, + assignedJobMemory, + numAllocatingJobs); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java index 7e22c086f10a2..e630558b7e161 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java @@ -5,18 +5,12 @@ */ package org.elasticsearch.xpack.ml.job; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; -import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams; import org.elasticsearch.xpack.ml.MachineLearning; @@ -31,7 +25,6 @@ public class NodeLoadDetector { - private static final Logger logger = LogManager.getLogger(NodeLoadDetector.class); private final MlMemoryTracker mlMemoryTracker; @@ -74,16 +67,18 @@ public NodeLoad detectNodeLoad(ClusterState clusterState, + "] is not a long"); } - NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory.orElse(-1L), maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed); + NodeLoad.Builder nodeLoad = NodeLoad.builder(node.getId()) + .setMaxMemory(maxMlMemory.orElse(-1L)) + .setMaxJobs(maxNumberOfOpenJobs) + .setUseMemory(isMemoryTrackerRecentlyRefreshed); if (errors.isEmpty() == false) { - nodeLoad.error = Strings.collectionToCommaDelimitedString(errors); - return nodeLoad; + return nodeLoad.setError(Strings.collectionToCommaDelimitedString(errors)).build(); } updateLoadGivenTasks(nodeLoad, persistentTasks); - return nodeLoad; + return nodeLoad.build(); } - private void updateLoadGivenTasks(NodeLoad nodeLoad, PersistentTasksCustomMetadata persistentTasks) { + private void updateLoadGivenTasks(NodeLoad.Builder nodeLoad, PersistentTasksCustomMetadata persistentTasks) { if (persistentTasks != null) { // find all the anomaly detector job tasks assigned to this node Collection> assignedAnomalyDetectorTasks = persistentTasks.findTasks( @@ -108,131 +103,10 @@ private void updateLoadGivenTasks(NodeLoad nodeLoad, PersistentTasksCustomMetada } // if any jobs are running then the native code will be loaded, but shared between all jobs, // so increase the total memory usage of the assigned jobs to account for this - if (nodeLoad.numAssignedJobs > 0) { - nodeLoad.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + if (nodeLoad.getNumAssignedJobs() > 0) { + nodeLoad.incAssignedJobMemory(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()); } } } - public static class NodeLoad { - private final long maxMemory; - private final int maxJobs; - private final String nodeId; - private boolean useMemory; - private String error; - private long numAssignedJobs; - private long assignedJobMemory; - private long numAllocatingJobs; - - private NodeLoad(String nodeId, long maxMemory, int maxJobs, boolean useMemory) { - this.maxJobs = maxJobs; - this.maxMemory = maxMemory; - this.nodeId = nodeId; - this.useMemory = useMemory; - } - - private void adjustForAnomalyJob(JobState jobState, - String jobId, - MlMemoryTracker mlMemoryTracker) { - if ((jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) && jobId != null) { - // Don't count CLOSED or FAILED jobs, as they don't consume native memory - ++numAssignedJobs; - if (jobState == JobState.OPENING) { - ++numAllocatingJobs; - } - Long jobMemoryRequirement = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId); - if (jobMemoryRequirement == null) { - useMemory = false; - logger.debug(() -> new ParameterizedMessage( - "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", - jobId - )); - } else { - assignedJobMemory += jobMemoryRequirement; - } - } - } - - private void adjustForAnalyticsJob(PersistentTasksCustomMetadata.PersistentTask assignedTask, - MlMemoryTracker mlMemoryTracker) { - DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask); - - // Don't count stopped and failed df-analytics tasks as they don't consume native memory - if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { - // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED - // and REINDEXING states we're committed to using the memory soon, so account for it here - ++numAssignedJobs; - StartDataFrameAnalyticsAction.TaskParams params = - (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams(); - Long jobMemoryRequirement = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId()); - if (jobMemoryRequirement == null) { - useMemory = false; - logger.debug(() -> new ParameterizedMessage( - "[{}] memory requirement was not available. Calculating load by number of assigned jobs.", - params.getId() - )); - } else { - assignedJobMemory += jobMemoryRequirement; - } - } - } - - /** - * @return The total number of assigned jobs - */ - public long getNumAssignedJobs() { - return numAssignedJobs; - } - - /** - * @return The total memory in bytes used by the assigned jobs. - */ - public long getAssignedJobMemory() { - return assignedJobMemory; - } - - /** - * @return The maximum memory on this node for jobs - */ - public long getMaxMlMemory() { - return maxMemory; - } - - /** - * @return The maximum number of jobs allowed on the node - */ - public int getMaxJobs() { - return maxJobs; - } - - /** - * @return returns `true` if the assignedJobMemory number is accurate - */ - public boolean isUseMemory() { - return useMemory; - } - - /** - * @return The node ID - */ - public String getNodeId() { - return nodeId; - } - - /** - * @return Returns a comma delimited string of errors if any were encountered. - */ - @Nullable - public String getError() { - return error; - } - - /** - * @return The current number of jobs allocating to the node - */ - public long getNumAllocatingJobs() { - return numAllocatingJobs; - } - } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 22a4885ab6822..a5fceda217533 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -93,6 +93,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTas Integer.MAX_VALUE, Integer.MAX_VALUE, maxMachineMemoryPercent, + Long.MAX_VALUE, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index ee5d1ecef5cd5..9ad169fa518c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -100,6 +100,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(OpenJobAction.JobP maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, + maxNodeMemory, isMemoryTrackerRecentlyRefreshed, useAutoMemoryPercentage); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 2b1df88e77bbc..1142ce3f67a27 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -101,6 +101,11 @@ private void setReassignmentRecheckInterval(TimeValue recheckInterval) { @Override public void onMaster() { isMaster = true; + try { + asyncRefresh(); + } catch (Exception ex) { + logger.warn("unexpected failure while attempting asynchronous refresh on new master assignment", ex); + } logger.trace("ML memory tracker on master"); asyncRefresh(); } @@ -137,9 +142,17 @@ public void stop() { * for valid task assignment decisions to be made using it? */ public boolean isRecentlyRefreshed() { + return isRecentlyRefreshed(reassignmentRecheckInterval); + } + + /** + * Is the information in this object sufficiently up to date + * for valid task assignment decisions to be made using it? + */ + public boolean isRecentlyRefreshed(Duration customDuration) { Instant localLastUpdateTime = lastUpdateTime; return localLastUpdateTime != null && - localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(reassignmentRecheckInterval).isAfter(Instant.now()); + localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(customDuration).isAfter(Instant.now()); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index c58cc9f785f10..b20f0cd3294b4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -27,6 +27,7 @@ import java.util.Optional; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; +import static org.elasticsearch.xpack.ml.MachineLearning.MAX_ML_NODE_SIZE; import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT; @@ -68,6 +69,7 @@ public static List verifyIndicesPrimaryShardsAreActive(ClusterState clus protected volatile int maxMachineMemoryPercent; protected volatile int maxLazyMLNodes; protected volatile int maxOpenJobs; + protected final long maxNodeMemory; protected AbstractJobPersistentTasksExecutor(String taskName, String executor, @@ -83,6 +85,7 @@ protected AbstractJobPersistentTasksExecutor(String taskName, this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings); + this.maxNodeMemory = MAX_ML_NODE_SIZE.get(settings).getBytes(); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations); clusterService.getClusterSettings() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java index a25c8dfb34e6f..f6f3ac0911224 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java @@ -41,12 +41,12 @@ public static OptionalLong allowedBytesForMl(DiscoveryNode node, ClusterSettings settings.get(USE_AUTO_MACHINE_MEMORY_PERCENT)); } - public static OptionalLong allowedBytesForMl(DiscoveryNode node, int maxMemoryPercent, boolean useAuto) { + public static OptionalLong allowedBytesForMl(DiscoveryNode node, int maxMemoryPercent, boolean useAutoPercent) { return allowedBytesForMl( node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR), node.getAttributes().get(MAX_JVM_SIZE_NODE_ATTR), maxMemoryPercent, - useAuto); + useAutoPercent); } private static OptionalLong allowedBytesForMl(String nodeBytes, String jvmBytes, int maxMemoryPercent, boolean useAuto) { @@ -70,6 +70,36 @@ private static OptionalLong allowedBytesForMl(String nodeBytes, String jvmBytes, return OptionalLong.of(allowedBytesForMl(machineMemory, jvmMemory, maxMemoryPercent, useAuto)); } + public static int modelMemoryPercent(long machineMemory, long jvmSize, int maxMemoryPercent, boolean useAuto) { + if (useAuto) { + // It is conceivable that there is a machine smaller than 200MB. + // If the administrator wants to use the auto configuration, the node should be larger. + if (machineMemory - jvmSize < OS_OVERHEAD || machineMemory == 0) { + return 0; + } + // This calculation is dynamic and designed to maximally take advantage of the underlying machine for machine learning + // We only allow 200MB for the Operating system itself and take up to 90% of the underlying native memory left + // Example calculations: + // 1GB node -> 41% + // 2GB node -> 66% + // 16GB node -> 87% + // 64GB node -> 90% + return Math.min(90, (int)Math.ceil(((machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory) * 100.0D)); + } + return maxMemoryPercent; + } + + public static int modelMemoryPercent(long machineMemory, double jvmFraction, int maxMemoryPercent, boolean useAuto) { + return modelMemoryPercent(machineMemory, (long) (jvmFraction * machineMemory), maxMemoryPercent, useAuto); + } + + public static int modelMemoryPercent(long machineMemory, int maxMemoryPercent, boolean useAuto) { + return modelMemoryPercent(machineMemory, + useAuto ? dynamicallyCalculateJvm(machineMemory) : machineMemory / 2, + maxMemoryPercent, + useAuto); + } + private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) { if (useAuto && jvmSize != null) { // It is conceivable that there is a machine smaller than 200MB. @@ -91,4 +121,22 @@ private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxM return machineMemory * maxMemoryPercent / 100; } + public static long allowedBytesForMl(long machineMemory, int maxMemoryPercent, boolean useAuto) { + return allowedBytesForMl(machineMemory, + useAuto ? dynamicallyCalculateJvm(machineMemory) : machineMemory / 2, + maxMemoryPercent, + useAuto); + } + + // TODO replace with official ergonomic calculation + private static long dynamicallyCalculateJvm(long nodeSize) { + if (nodeSize < ByteSizeValue.ofGb(2).getBytes()) { + return (long)(nodeSize * 0.40); + } + if (nodeSize < ByteSizeValue.ofGb(8).getBytes()) { + return (long) (nodeSize * 0.25); + } + return ByteSizeValue.ofGb(2).getBytes(); + } + } diff --git a/x-pack/plugin/ml/src/main/resources/META-INF/services/org.elasticsearch.xpack.autoscaling.AutoscalingExtension b/x-pack/plugin/ml/src/main/resources/META-INF/services/org.elasticsearch.xpack.autoscaling.AutoscalingExtension new file mode 100644 index 0000000000000..d832f4887600d --- /dev/null +++ b/x-pack/plugin/ml/src/main/resources/META-INF/services/org.elasticsearch.xpack.autoscaling.AutoscalingExtension @@ -0,0 +1,7 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. +# + +org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingExtension diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfigurationTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfigurationTests.java new file mode 100644 index 0000000000000..f4d6ab69d5cc3 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderConfigurationTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + + +public class MlAutoscalingDeciderConfigurationTests extends AbstractSerializingTestCase { + + public static MlAutoscalingDeciderConfiguration randomInstance() { + MlAutoscalingDeciderConfiguration.Builder builder = MlAutoscalingDeciderConfiguration.builder(); + if (randomBoolean()) { + builder.setNumAnalyticsJobsInQueue(randomIntBetween(0, 10)); + } + if (randomBoolean()) { + builder.setNumAnomalyJobsInQueue(randomIntBetween(0, 10)); + } + if (randomBoolean()) { + builder.setDownScaleDelay(randomTimeValue()); + } + return builder.build(); + } + + @Override + protected MlAutoscalingDeciderConfiguration doParseInstance(XContentParser parser) { + return MlAutoscalingDeciderConfiguration.parse(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return MlAutoscalingDeciderConfiguration::new; + } + + @Override + protected MlAutoscalingDeciderConfiguration createTestInstance() { + return randomInstance(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java new file mode 100644 index 0000000000000..1147ee6ac6085 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.NodeLoad; +import org.elasticsearch.xpack.ml.job.NodeLoadDetector; +import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlAutoscalingDeciderServiceTests extends ESTestCase { + + private static final long DEFAULT_NODE_SIZE = ByteSizeValue.ofGb(2).getBytes(); + private static final long DEFAULT_JVM_SIZE = ByteSizeValue.ofMb((long)(DEFAULT_NODE_SIZE * 0.25)).getBytes(); + private static final long DEFAULT_JOB_SIZE = ByteSizeValue.ofMb(200).getBytes(); + private NodeLoadDetector nodeLoadDetector; + private ClusterService clusterService; + private Settings settings; + private Supplier timeSupplier; + + @Before + public void setup() { + MlMemoryTracker mlMemoryTracker = mock(MlMemoryTracker.class); + when(mlMemoryTracker.asyncRefresh()).thenReturn(true); + when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE); + when(mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE); + nodeLoadDetector = mock(NodeLoadDetector.class); + when(nodeLoadDetector.getMlMemoryTracker()).thenReturn(mlMemoryTracker); + clusterService = mock(ClusterService.class); + settings = Settings.EMPTY; + timeSupplier = System::currentTimeMillis; + ClusterSettings cSettings = new ClusterSettings( + Settings.EMPTY, + Set.of(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + when(clusterService.getClusterSettings()).thenReturn(cSettings); + } + + public void testScale_whenNotOnMaster() { + MlAutoscalingDeciderService service = buildService(); + service.offMaster(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, + () -> service.scale(new MlAutoscalingDeciderConfiguration(0, 0, TimeValue.ZERO), + mock(AutoscalingDeciderContext.class))); + assertThat(iae.getMessage(), equalTo("request for scaling information is only allowed on the master node")); + } + + public void testScaleUp_withNoJobsWaiting() { + MlAutoscalingDeciderService service = buildService(); + service.onMaster(); + + assertThat(service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(0, 0, TimeValue.ZERO), + Collections.emptyList(), + Collections.emptyList(), + null, + NativeMemoryCapacity.ZERO, + MlScalingReason.builder()), + equalTo(Optional.empty())); + } + + public void testScaleUp_withWaitingJobs() { + List jobTasks = Arrays.asList("waiting_job", "waiting_job_2"); + List analytics = Arrays.asList("analytics_waiting"); + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder() + .setPassedConfiguration(new MlAutoscalingDeciderConfiguration.Builder().build()) + .setCurrentMlCapacity(AutoscalingCapacity.ZERO); + MlAutoscalingDeciderService service = buildService(); + service.setMaxMachineMemoryPercent(25); + { // No time in queue + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(0, 0, TimeValue.ZERO), + jobTasks, + analytics, + null, + NativeMemoryCapacity.ZERO, + reasonBuilder); + assertFalse(decision.isEmpty()); + assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); + assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(12 * DEFAULT_JOB_SIZE)); + } + { // we allow one job in the analytics queue + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(0, 1, TimeValue.ZERO), + jobTasks, + analytics, + null, + NativeMemoryCapacity.ZERO, + reasonBuilder); + assertFalse(decision.isEmpty()); + assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); + assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(8 * DEFAULT_JOB_SIZE)); + } + { // we allow one job in the anomaly queue and analytics queue + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(1, 1, TimeValue.ZERO), + jobTasks, + analytics, + null, + NativeMemoryCapacity.ZERO, + reasonBuilder); + assertFalse(decision.isEmpty()); + assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); + assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); + } + } + + public void testScaleUp_withWaitingJobs_WithFutureCapacity() { + List jobTasks = Arrays.asList("waiting_job", "waiting_job_2"); + List analytics = Arrays.asList("analytics_waiting"); + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder() + .setPassedConfiguration(new MlAutoscalingDeciderConfiguration.Builder().build()) + .setCurrentMlCapacity(AutoscalingCapacity.ZERO); + MlAutoscalingDeciderService service = buildService(); + service.setMaxMachineMemoryPercent(25); + { // with null future capacity and current capacity has a small node + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(2, 1, TimeValue.ZERO), + jobTasks, + analytics, + null, + NativeMemoryCapacity.ZERO, + reasonBuilder); + assertFalse(decision.isEmpty()); + assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); + assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); + } + { + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(2, 1, TimeValue.ZERO), + jobTasks, + analytics, + new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), + new NativeMemoryCapacity(ByteSizeValue.ofGb(2).getBytes(), ByteSizeValue.ofGb(2).getBytes()), + reasonBuilder); + assertTrue(decision.isEmpty()); + } + { + Optional decision = service.checkForScaleUp( + new MlAutoscalingDeciderConfiguration(2, 1, TimeValue.ZERO), + jobTasks, + analytics, + new NativeMemoryCapacity(ByteSizeValue.ofMb(1).getBytes(), ByteSizeValue.ofMb(1).getBytes()), + new NativeMemoryCapacity(ByteSizeValue.ofGb(2).getBytes(), ByteSizeValue.ofGb(2).getBytes()), + reasonBuilder); + assertFalse(decision.isEmpty()); + assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(ByteSizeValue.ofGb(8).getBytes())); + assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(ByteSizeValue.ofMb(8992).getBytes())); + } + } + + public void testScaleDown_WithDetectionError() { + List nodes = withMlNodes("foo", "bar", "baz"); + DiscoveryNode badNode = randomFrom(nodes); + NodeLoad badLoad = NodeLoad.builder(badNode.getId()).setError("bad node").build(); + when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(NodeLoad.builder(badNode.getId()).setUseMemory(true).build()); + when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), eq(badNode), anyInt(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(badLoad); + + MlAutoscalingDeciderService service = buildService(); + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(); + assertThat(service.checkForScaleDown(nodes, + ClusterState.EMPTY_STATE, + Long.MAX_VALUE, + new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), + reasonBuilder).isEmpty(), is(true)); + } + + public void testScaleDown_WhenMemoryIsInaccurate() { + List nodes = withMlNodes("foo", "bar", "baz"); + DiscoveryNode badNode = randomFrom(nodes); + NodeLoad badLoad = NodeLoad.builder(badNode.getId()).setUseMemory(false).build(); + when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(NodeLoad.builder(badNode.getId()).setUseMemory(true).build()); + when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), eq(badNode), anyInt(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(badLoad); + + MlAutoscalingDeciderService service = buildService(); + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(); + expectThrows(AssertionError.class, () -> service.checkForScaleDown(nodes, + ClusterState.EMPTY_STATE, + Long.MAX_VALUE, + new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), + reasonBuilder)); + } + + public void testScaleDown() { + List nodes = withMlNodes("foo", "bar", "baz"); + when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(NodeLoad.builder("any") + .setUseMemory(true) + .incAssignedJobMemory(ByteSizeValue.ofGb(1).getBytes()) + .build()); + MlAutoscalingDeciderService service = buildService(); + service.setMaxMachineMemoryPercent(25); + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder() + .setPassedConfiguration(new MlAutoscalingDeciderConfiguration.Builder().build()) + .setCurrentMlCapacity(AutoscalingCapacity.ZERO); + {//Current capacity allows for smaller node + Optional result = service.checkForScaleDown(nodes, + ClusterState.EMPTY_STATE, + ByteSizeValue.ofMb(100).getBytes(), + new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), + reasonBuilder); + assertThat(result.isEmpty(), is(false)); + AutoscalingDeciderResult autoscalingDeciderResult = result.get(); + assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), + equalTo(ByteSizeValue.ofMb(400).getBytes())); + assertThat(autoscalingDeciderResult.requiredCapacity().tier().memory().getBytes(), + equalTo(ByteSizeValue.ofGb(12).getBytes())); + } + {// Current capacity allows for smaller tier + Optional result = service.checkForScaleDown(nodes, + ClusterState.EMPTY_STATE, + ByteSizeValue.ofMb(100).getBytes(), + new NativeMemoryCapacity(ByteSizeValue.ofGb(4).getBytes(), ByteSizeValue.ofMb(100).getBytes()), + reasonBuilder); + assertThat(result.isEmpty(), is(false)); + AutoscalingDeciderResult autoscalingDeciderResult = result.get(); + assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), + equalTo(ByteSizeValue.ofMb(400).getBytes())); + assertThat(autoscalingDeciderResult.requiredCapacity().tier().memory().getBytes(), + equalTo(ByteSizeValue.ofGb(12).getBytes())); + } + {// Scale down is not really possible + Optional result = service.checkForScaleDown(nodes, + ClusterState.EMPTY_STATE, + ByteSizeValue.ofMb(100).getBytes(), + new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofMb(100).getBytes()), + reasonBuilder); + assertThat(result.isEmpty(), is(true)); + } + } + + private MlAutoscalingDeciderService buildService() { + return new MlAutoscalingDeciderService(nodeLoadDetector, settings, clusterService, timeSupplier); + } + + private static List withMlNodes(String... nodeName) { + return Arrays.stream(nodeName) + .map(n -> new DiscoveryNode( + n, + buildNewFakeTransportAddress(), + MapBuilder.newMapBuilder() + .put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, String.valueOf(DEFAULT_NODE_SIZE)) + .put(MachineLearning.MAX_JVM_SIZE_NODE_ATTR, String.valueOf(DEFAULT_JVM_SIZE)) + .map(), + new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE)), + Version.CURRENT)) + .collect(Collectors.toList()); + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReasonTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReasonTests.java new file mode 100644 index 0000000000000..cdbfcd4255771 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlScalingReasonTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MlScalingReasonTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return MlScalingReason::new; + } + + @Override + protected MlScalingReason createTestInstance() { + return new MlScalingReason( + randomBoolean() ? null : Stream.generate(() -> randomAlphaOfLength(10)).limit(5).collect(Collectors.toList()), + randomBoolean() ? null : Stream.generate(() -> randomAlphaOfLength(10)).limit(5).collect(Collectors.toList()), + MlAutoscalingDeciderConfigurationTests.randomInstance(), + randomBoolean() ? null : randomLongBetween(10, ByteSizeValue.ofGb(1).getBytes()), + randomBoolean() ? null : randomLongBetween(10, ByteSizeValue.ofGb(1).getBytes()), + new AutoscalingCapacity(AutoscalingCapacity.AutoscalingResources.ZERO, AutoscalingCapacity.AutoscalingResources.ZERO), + randomAlphaOfLength(10) + ); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(MlAutoscalingNamedWritableProvider.getNamedWriteables()); + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacityTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacityTests.java new file mode 100644 index 0000000000000..b438cf2c8afaf --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/NativeMemoryCapacityTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class NativeMemoryCapacityTests extends ESTestCase { + + public void testMerge() { + NativeMemoryCapacity capacity = new NativeMemoryCapacity(ByteSizeValue.ofGb(1).getBytes(), + ByteSizeValue.ofMb(200).getBytes(), + ByteSizeValue.ofMb(50).getBytes() + ); + capacity.merge(new NativeMemoryCapacity(ByteSizeValue.ofGb(1).getBytes(), + ByteSizeValue.ofMb(100).getBytes())); + assertThat(capacity.getTier(), equalTo(ByteSizeValue.ofGb(1).getBytes() * 2L)); + assertThat(capacity.getNode(), equalTo(ByteSizeValue.ofMb(200).getBytes())); + assertThat(capacity.getJvmSize(), equalTo(ByteSizeValue.ofMb(50).getBytes())); + + capacity.merge(new NativeMemoryCapacity(ByteSizeValue.ofGb(1).getBytes(), + ByteSizeValue.ofMb(300).getBytes())); + + assertThat(capacity.getTier(), equalTo(ByteSizeValue.ofGb(1).getBytes() * 3L)); + assertThat(capacity.getNode(), equalTo(ByteSizeValue.ofMb(300).getBytes())); + assertThat(capacity.getJvmSize(), is(nullValue())); + } + + + public void testAutoscalingCapacity() { + // TODO adjust once future JVM capacity is known + NativeMemoryCapacity capacity = new NativeMemoryCapacity( + ByteSizeValue.ofGb(4).getBytes(), + ByteSizeValue.ofGb(1).getBytes(), + ByteSizeValue.ofMb(50).getBytes() + ); + + { // auto is false + AutoscalingCapacity autoscalingCapacity = capacity.autoscalingCapacity(25, false); + assertThat(autoscalingCapacity.node().memory().getBytes(), equalTo(ByteSizeValue.ofGb(1).getBytes() * 4L)); + assertThat(autoscalingCapacity.tier().memory().getBytes(), equalTo(ByteSizeValue.ofGb(4).getBytes() * 4L)); + } + { // auto is true + AutoscalingCapacity autoscalingCapacity = capacity.autoscalingCapacity(25, true); + assertThat(autoscalingCapacity.node().memory().getBytes(), equalTo(1412818190L)); + assertThat(autoscalingCapacity.tier().memory().getBytes(), equalTo(5651272758L)); + } + { // auto is true with unknown jvm size + capacity = new NativeMemoryCapacity( + ByteSizeValue.ofGb(4).getBytes(), + ByteSizeValue.ofGb(1).getBytes() + ); + AutoscalingCapacity autoscalingCapacity = capacity.autoscalingCapacity(25, true); + assertThat(autoscalingCapacity.node().memory().getBytes(), equalTo(2618882498L)); + assertThat(autoscalingCapacity.tier().memory().getBytes(), equalTo(10475529991L)); + } + + } + + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java index 65c5fdd277bdd..e1fbf0a5e9a1c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java @@ -79,7 +79,7 @@ public void testNodeLoadDetection() { metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); cs.metadata(metadata); - NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, 30, true, false); + NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, 30, true, false); assertThat(load.getAssignedJobMemory(), equalTo(52428800L)); assertThat(load.getNumAllocatingJobs(), equalTo(2L)); assertThat(load.getNumAssignedJobs(), equalTo(2L)); @@ -122,7 +122,7 @@ public void testNodeLoadDetection_withBadMaxOpenJobsAttribute() { Metadata.Builder metadata = Metadata.builder(); cs.metadata(metadata); - NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, 30, true, false); + NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, 30, true, false); assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer")); } @@ -140,7 +140,7 @@ public void testNodeLoadDetection_withBadMachineMemoryAttribute() { Metadata.Builder metadata = Metadata.builder(); cs.metadata(metadata); - NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true, false); + NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true, false); assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index 19374e4d71fb2..9467edaa7b610 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -42,6 +42,7 @@ import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutorTests.jobWithRules; import static org.hamcrest.Matchers.containsString; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +50,7 @@ public class JobNodeSelectorTests extends ESTestCase { // To simplify the logic in this class all jobs have the same memory requirement + private static final long MAX_JOB_BYTES = ByteSizeValue.ofGb(1).getBytes(); private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = ByteSizeValue.ofMb(10); private MlMemoryTracker memoryTracker; @@ -119,7 +121,12 @@ public void testSelectLeastLoadedMlNode_byCount() { Job job = jobBuilder.build(); JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } @@ -140,8 +147,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLim JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -164,8 +175,12 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityCount JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -193,8 +208,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" @@ -217,8 +236,12 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNull JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNotNull(result.getExecutorNode()); } @@ -239,8 +262,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemor JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + "Available memory for ML [" + (firstJobTotalMemory - 1) @@ -270,8 +297,13 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( + maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" @@ -296,8 +328,13 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId))); - PersistentTasksCustomMetadata.Assignment result = - jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( + maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + "Available memory for ML [" + (firstJobTotalMemory - 1) @@ -326,7 +363,13 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(20, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( + 20, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -362,8 +405,15 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { ClusterState cs = csBuilder.build(); JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, job6.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job6)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode( + 10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetadata.builder(tasks); @@ -376,8 +426,14 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date()); jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -391,7 +447,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { cs = csBuilder.build(); jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -403,8 +459,9 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks)); cs = csBuilder.build(); jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job7)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -445,8 +502,14 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() // Allocation won't be possible if the stale failed job is treated as opening JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job7)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetadata.builder(tasks); @@ -458,8 +521,9 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() cs = csBuilder.build(); Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date()); jobNodeSelector = new JobNodeSelector(cs, job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job8)); - result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -492,8 +556,14 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks); cs.metadata(metadata); JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, + node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -525,7 +595,12 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() cs.metadata(metadata); JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertThat(result.getExplanation(), containsString( "because the job's model snapshot requires a node of version [6.3.0] or higher")); assertNull(result.getExecutorNode()); @@ -555,7 +630,12 @@ public void testSelectLeastLoadedMlNode_jobWithRules() { Job job = jobWithRules("job_with_rules"); JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job)); - PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, + 2, + 30, + MAX_JOB_BYTES, + isMemoryTrackerRecentlyRefreshed, + false); assertNotNull(result.getExecutorNode()); } @@ -599,6 +679,37 @@ public void testConsiderLazyAssignmentWithLazyNodes() { assertNull(result.getExecutorNode()); } + public void testMaximumPossibleNodeMemoryTooSmall() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 30; + long machineMemory = (maxRunningJobsPerNode + 1) * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, maxRunningJobsPerNode); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", ByteSizeValue.ofMb(10)).build(new Date()); + when(memoryTracker.getJobMemoryRequirement(anyString(), eq("job_id1000"))).thenReturn(1000L); + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, + randomIntBetween(1, 3), + node -> nodeFilter(node, job)); + PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode, + 2, + maxMachineMemoryPercent, + 10L, + isMemoryTrackerRecentlyRefreshed, + false); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), + containsString("[job_id1000] not waiting for node assignment as estimated job size " + + "[31458280] is greater than largest possible job size [3]")); + } + + private ClusterState.Builder fillNodesWithRunningJobs(Map nodeAttr, int numNodes, int numRunningJobsPerNode) { return fillNodesWithRunningJobs(nodeAttr, numNodes, numRunningJobsPerNode, JobState.OPENED, DataFrameAnalyticsState.STARTED); From 2ebedea54bd8659a38a76917ae6d48713fd4d973 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 17 Nov 2020 12:04:39 -0500 Subject: [PATCH 2/2] adjusting for backport --- .../xpack/ml/MachineLearning.java | 2 ++ .../MlAutoscalingNamedWritableProvider.java | 20 ++++++++++++++-- .../MlAutoscalingDeciderServiceTests.java | 24 +++++++++---------- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 56a2e9ae92c2f..e075392c32218 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -215,6 +215,7 @@ import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService; +import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -1103,6 +1104,7 @@ public List getNamedXContent() { namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers()); namedXContent.addAll(new MlInferenceNamedXContentProvider().getNamedXContentParsers()); namedXContent.addAll(new MlModelSizeNamedXContentProvider().getNamedXContentParsers()); + namedXContent.addAll(MlAutoscalingNamedWritableProvider.getXContentParsers()); return namedXContent; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java index 3164b306905b8..fffb75faac8d3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingNamedWritableProvider.java @@ -6,16 +6,19 @@ package org.elasticsearch.xpack.ml.autoscaling; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import java.util.Arrays; import java.util.List; -public final class MlAutoscalingNamedWritableProvider { +public final class MlAutoscalingNamedWritableProvider implements NamedXContentProvider { - private MlAutoscalingNamedWritableProvider() { } + public MlAutoscalingNamedWritableProvider() { } public static List getNamedWriteables() { return Arrays.asList( @@ -27,4 +30,17 @@ public static List getNamedWriteables() { MlScalingReason::new) ); } + + public static List getXContentParsers() { + return Arrays.asList( + new NamedXContentRegistry.Entry(AutoscalingDeciderConfiguration.class, + new ParseField(MlAutoscalingDeciderConfiguration.NAME), + MlAutoscalingDeciderConfiguration::parse) + ); + } + + @Override + public List getNamedXContentParsers() { + return getXContentParsers(); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java index 1147ee6ac6085..de01747d4575e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; @@ -31,7 +32,6 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -67,7 +67,7 @@ public void setup() { timeSupplier = System::currentTimeMillis; ClusterSettings cSettings = new ClusterSettings( Settings.EMPTY, - Set.of(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + Sets.newHashSet(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_OPEN_JOBS_PER_NODE)); when(clusterService.getClusterSettings()).thenReturn(cSettings); } @@ -110,7 +110,7 @@ public void testScaleUp_withWaitingJobs() { null, NativeMemoryCapacity.ZERO, reasonBuilder); - assertFalse(decision.isEmpty()); + assertTrue(decision.isPresent()); assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(12 * DEFAULT_JOB_SIZE)); } @@ -122,7 +122,7 @@ public void testScaleUp_withWaitingJobs() { null, NativeMemoryCapacity.ZERO, reasonBuilder); - assertFalse(decision.isEmpty()); + assertTrue(decision.isPresent()); assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(8 * DEFAULT_JOB_SIZE)); } @@ -134,7 +134,7 @@ public void testScaleUp_withWaitingJobs() { null, NativeMemoryCapacity.ZERO, reasonBuilder); - assertFalse(decision.isEmpty()); + assertTrue(decision.isPresent()); assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(4 * DEFAULT_JOB_SIZE)); } @@ -156,7 +156,7 @@ public void testScaleUp_withWaitingJobs_WithFutureCapacity() { null, NativeMemoryCapacity.ZERO, reasonBuilder); - assertFalse(decision.isEmpty()); + assertTrue(decision.isPresent()); assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(DEFAULT_JOB_SIZE * 4)); } @@ -168,7 +168,7 @@ public void testScaleUp_withWaitingJobs_WithFutureCapacity() { new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), new NativeMemoryCapacity(ByteSizeValue.ofGb(2).getBytes(), ByteSizeValue.ofGb(2).getBytes()), reasonBuilder); - assertTrue(decision.isEmpty()); + assertFalse(decision.isPresent()); } { Optional decision = service.checkForScaleUp( @@ -178,7 +178,7 @@ public void testScaleUp_withWaitingJobs_WithFutureCapacity() { new NativeMemoryCapacity(ByteSizeValue.ofMb(1).getBytes(), ByteSizeValue.ofMb(1).getBytes()), new NativeMemoryCapacity(ByteSizeValue.ofGb(2).getBytes(), ByteSizeValue.ofGb(2).getBytes()), reasonBuilder); - assertFalse(decision.isEmpty()); + assertTrue(decision.isPresent()); assertThat(decision.get().requiredCapacity().node().memory().getBytes(), equalTo(ByteSizeValue.ofGb(8).getBytes())); assertThat(decision.get().requiredCapacity().tier().memory().getBytes(), equalTo(ByteSizeValue.ofMb(8992).getBytes())); } @@ -199,7 +199,7 @@ public void testScaleDown_WithDetectionError() { ClusterState.EMPTY_STATE, Long.MAX_VALUE, new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), - reasonBuilder).isEmpty(), is(true)); + reasonBuilder).isPresent(), is(false)); } public void testScaleDown_WhenMemoryIsInaccurate() { @@ -238,7 +238,7 @@ public void testScaleDown() { ByteSizeValue.ofMb(100).getBytes(), new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofGb(1).getBytes()), reasonBuilder); - assertThat(result.isEmpty(), is(false)); + assertThat(result.isPresent(), is(true)); AutoscalingDeciderResult autoscalingDeciderResult = result.get(); assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(ByteSizeValue.ofMb(400).getBytes())); @@ -251,7 +251,7 @@ public void testScaleDown() { ByteSizeValue.ofMb(100).getBytes(), new NativeMemoryCapacity(ByteSizeValue.ofGb(4).getBytes(), ByteSizeValue.ofMb(100).getBytes()), reasonBuilder); - assertThat(result.isEmpty(), is(false)); + assertThat(result.isPresent(), is(true)); AutoscalingDeciderResult autoscalingDeciderResult = result.get(); assertThat(autoscalingDeciderResult.requiredCapacity().node().memory().getBytes(), equalTo(ByteSizeValue.ofMb(400).getBytes())); @@ -264,7 +264,7 @@ public void testScaleDown() { ByteSizeValue.ofMb(100).getBytes(), new NativeMemoryCapacity(ByteSizeValue.ofGb(3).getBytes(), ByteSizeValue.ofMb(100).getBytes()), reasonBuilder); - assertThat(result.isEmpty(), is(true)); + assertThat(result.isPresent(), is(false)); } }