Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {

public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB);
public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB);
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB);
/**
* This includes the overhead of thread stacks and data structures that the program might use that
* are not instrumented. But it does NOT include the memory used by loading the executable code.
*/
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB);

public static final ParseField ID = new ParseField("id");
public static final ParseField DESCRIPTION = new ParseField("description");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);

public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1);
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB);

/**
* This includes the overhead of thread stacks and data structures that the program might use that
* are not instrumented. (For the <code>autodetect</code> process categorization is not instrumented,
* and the <code>normalize</code> process is not instrumented at all.) But this overhead does NOT
* include the memory used by loading the executable code.
*/
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB);

public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ public Set<DiscoveryNodeRole> getRoles() {
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
/**
* The amount of memory needed to load the ML native code shared libraries. The assumption is that the first
* ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the
* same already-loaded code.
*/
public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB);
// Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable.
// Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately
// staggered, so with large numbers of jobs few will generally be persisting state at the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob
continue;
}

// Assuming the node is elligible at all, check loading
// Assuming the node is eligible at all, check loading
CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
allocateByMemory = currentLoad.allocateByMemory;

Expand Down Expand Up @@ -170,6 +170,11 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint != null) {
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.numberOfAssignedJobs == 0) {
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
if (estimatedMemoryFootprint > availableMemory) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
Expand Down Expand Up @@ -283,6 +288,11 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa
}
}
}
// if any jobs are running then the native code will be loaded, but shared between all jobs,
// so increase the total memory usage of the assigned jobs to account for this
if (result.numberOfAssignedJobs > 0) {
result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
// Be careful if changing this - in order for the error message to be exactly as expected
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
int maxMachineMemoryPercent = 40;
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
int maxMachineMemoryPercent = 20;
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
Expand All @@ -193,19 +196,46 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
}

public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
int maxMachineMemoryPercent = 20;
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));

ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);

Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());

JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker,
0, node -> TransportOpenJobAction.nodeFilter(node, job));
PersistentTasksCustomMetaData.Assignment result =
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
}

public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
// Be careful if changing this - in order for the error message to be exactly as expected
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
int maxMachineMemoryPercent = 40;
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
int maxMachineMemoryPercent = 20;
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
Expand All @@ -222,9 +252,34 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
}

public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
int maxMachineMemoryPercent = 20;
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));

ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);

String dataFrameAnalyticsId = "data_frame_analytics_id1000";

JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId));
PersistentTasksCustomMetaData.Assignment result =
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
}

public void testSelectLeastLoadedMlNode_noMlNodes() {
Expand Down