Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public static String jobTaskId(String jobId) {
return JOB_TASK_ID_PREFIX + jobId;
}

public static String jobId(String jobTaskId) {
return jobTaskId.substring(JOB_TASK_ID_PREFIX.length());
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
Expand All @@ -67,6 +71,10 @@ public static String dataFrameAnalyticsTaskId(String id) {
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
}

public static String dataFrameAnalyticsId(String taskId) {
return taskId.substring(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX.length());
}

@Nullable
public static PersistentTasksCustomMetadata.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugin/ml/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ esplugin {
description 'Elasticsearch Expanded Pack Plugin - Machine Learning'
classname 'org.elasticsearch.xpack.ml.MachineLearning'
hasNativeController true
extendedPlugins = ['x-pack-core', 'lang-painless']
extendedPlugins = ['x-pack-autoscaling', 'lang-painless']
}


Expand Down Expand Up @@ -50,6 +50,7 @@ dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
testImplementation project(path: xpackModule('ilm'), configuration: 'default')
compileOnly project(path: xpackModule('autoscaling'), configuration: 'default')
testImplementation project(path: xpackModule('data-streams'), configuration: 'default')
// This should not be here
testImplementation project(path: xpackModule('security'), configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -212,6 +214,8 @@
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand Down Expand Up @@ -413,7 +417,7 @@ public Set<DiscoveryNodeRole> getRoles() {
// controls the types of jobs that can be created, and each job alone is considerably smaller than what each node
// can handle.
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
/**
* This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
*
Expand Down Expand Up @@ -469,6 +473,17 @@ public Set<DiscoveryNodeRole> getRoles() {
Property.NodeScope
);

/**
* This is the maximum possible node size for a machine learning node. It is useful when determining if a job could ever be opened
* on the cluster.
*
* If the value is the default special case of `0b`, that means the value is ignored when assigning jobs.
*/
public static final Setting<ByteSizeValue> MAX_ML_NODE_SIZE = Setting.byteSizeSetting(
"xpack.ml.max_ml_node_size",
ByteSizeValue.ZERO,
Property.NodeScope);

private static final Logger logger = LogManager.getLogger(MachineLearning.class);

private final Settings settings;
Expand All @@ -483,6 +498,7 @@ public Set<DiscoveryNodeRole> getRoles() {
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
private final SetOnce<CircuitBreaker> inferenceModelBreaker = new SetOnce<>();
private final SetOnce<ModelLoadingService> modelLoadingService = new SetOnce<>();
private final SetOnce<MlAutoscalingDeciderService> mlAutoscalingDeciderService = new SetOnce<>();

public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
Expand Down Expand Up @@ -520,7 +536,8 @@ public List<Setting<?>> getSettings() {
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
USE_AUTO_MACHINE_MEMORY_PERCENT));
USE_AUTO_MACHINE_MEMORY_PERCENT,
MAX_ML_NODE_SIZE));
}

public Settings additionalSettings() {
Expand Down Expand Up @@ -754,6 +771,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// Perform node startup operations
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();

mlAutoscalingDeciderService.set(new MlAutoscalingDeciderService(memoryTracker, settings, clusterService));

return Arrays.asList(
mlLifeCycleService,
jobResultsProvider,
Expand Down Expand Up @@ -1085,6 +1104,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers());
namedXContent.addAll(new MlInferenceNamedXContentProvider().getNamedXContentParsers());
namedXContent.addAll(new MlModelSizeNamedXContentProvider().getNamedXContentParsers());
namedXContent.addAll(MlAutoscalingNamedWritableProvider.getXContentParsers());
return namedXContent;
}

Expand Down Expand Up @@ -1115,4 +1135,13 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME);
this.inferenceModelBreaker.set(circuitBreaker);
}

public Collection<AutoscalingDeciderService<? extends AutoscalingDeciderConfiguration>> deciders() {
if (enabled) {
assert mlAutoscalingDeciderService.get() != null;
return Collections.singletonList(mlAutoscalingDeciderService.get());
} else {
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params,
maxOpenJobs,
Integer.MAX_VALUE,
maxMachineMemoryPercent,
maxNodeMemory,
isMemoryTrackerRecentlyRefreshed,
useAutoMemoryPercentage
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.autoscaling;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderConfiguration;

import java.io.IOException;
import java.util.Objects;

public class MlAutoscalingDeciderConfiguration implements AutoscalingDeciderConfiguration {
static final String NAME = "ml";

private static final int DEFAULT_ANOMALY_JOBS_IN_QUEUE = 0;
private static final int DEFAULT_ANALYTICS_JOBS_IN_QUEUE = 0;

private static final ParseField NUM_ANOMALY_JOBS_IN_QUEUE = new ParseField("num_anomaly_jobs_in_queue");
private static final ParseField NUM_ANALYTICS_JOBS_IN_QUEUE = new ParseField("num_analytics_jobs_in_queue");
private static final ParseField DOWN_SCALE_DELAY = new ParseField("down_scale_delay");

private static final ObjectParser<MlAutoscalingDeciderConfiguration.Builder, Void> PARSER = new ObjectParser<>(NAME,
MlAutoscalingDeciderConfiguration.Builder::new);

static {
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnomalyJobsInQueue, NUM_ANOMALY_JOBS_IN_QUEUE);
PARSER.declareInt(MlAutoscalingDeciderConfiguration.Builder::setNumAnalyticsJobsInQueue, NUM_ANALYTICS_JOBS_IN_QUEUE);
PARSER.declareString(MlAutoscalingDeciderConfiguration.Builder::setDownScaleDelay, DOWN_SCALE_DELAY);
}

public static MlAutoscalingDeciderConfiguration parse(final XContentParser parser) {
return PARSER.apply(parser, null).build();
}

private final int numAnomalyJobsInQueue;
private final int numAnalyticsJobsInQueue;
private final TimeValue downScaleDelay;

MlAutoscalingDeciderConfiguration(int numAnomalyJobsInQueue, int numAnalyticsJobsInQueue, TimeValue downScaleDelay) {
if (numAnomalyJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANOMALY_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
if (numAnalyticsJobsInQueue < 0) {
throw new IllegalArgumentException("[" + NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName() + "] must be non-negative");
}
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
this.downScaleDelay = downScaleDelay;
}

public MlAutoscalingDeciderConfiguration(StreamInput in) throws IOException {
numAnomalyJobsInQueue = in.readVInt();
numAnalyticsJobsInQueue = in.readVInt();
downScaleDelay = in.readTimeValue();
}

@Override
public String name() {
return NAME;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numAnomalyJobsInQueue);
out.writeVInt(numAnalyticsJobsInQueue);
out.writeTimeValue(downScaleDelay);
}

public int getNumAnomalyJobsInQueue() {
return numAnomalyJobsInQueue;
}

public int getNumAnalyticsJobsInQueue() {
return numAnalyticsJobsInQueue;
}

public TimeValue getDownScaleDelay() {
return downScaleDelay;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MlAutoscalingDeciderConfiguration that = (MlAutoscalingDeciderConfiguration) o;
return numAnomalyJobsInQueue == that.numAnomalyJobsInQueue &&
numAnalyticsJobsInQueue == that.numAnalyticsJobsInQueue &&
Objects.equals(downScaleDelay, that.downScaleDelay);
}

@Override
public int hashCode() {
return Objects.hash(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NUM_ANOMALY_JOBS_IN_QUEUE .getPreferredName(), numAnomalyJobsInQueue);
builder.field(NUM_ANALYTICS_JOBS_IN_QUEUE.getPreferredName(), numAnalyticsJobsInQueue);
builder.field(DOWN_SCALE_DELAY.getPreferredName(), downScaleDelay.getStringRep());
builder.endObject();
return builder;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private int numAnomalyJobsInQueue = DEFAULT_ANOMALY_JOBS_IN_QUEUE;
private int numAnalyticsJobsInQueue = DEFAULT_ANALYTICS_JOBS_IN_QUEUE;
private TimeValue downScaleDelay = TimeValue.ZERO;

public Builder setNumAnomalyJobsInQueue(int numAnomalyJobsInQueue) {
this.numAnomalyJobsInQueue = numAnomalyJobsInQueue;
return this;
}

public Builder setNumAnalyticsJobsInQueue(int numAnalyticsJobsInQueue) {
this.numAnalyticsJobsInQueue = numAnalyticsJobsInQueue;
return this;
}

Builder setDownScaleDelay(String unparsedTimeValue) {
return setDownScaleDelay(TimeValue.parseTimeValue(unparsedTimeValue, DOWN_SCALE_DELAY.getPreferredName()));
}

public Builder setDownScaleDelay(TimeValue downScaleDelay) {
this.downScaleDelay = Objects.requireNonNull(downScaleDelay);
return this;
}

public MlAutoscalingDeciderConfiguration build() {
return new MlAutoscalingDeciderConfiguration(numAnomalyJobsInQueue, numAnalyticsJobsInQueue, downScaleDelay);
}
}

}
Loading