Skip to content

Commit d811fdf

Browse files
author
David Roberts
authored
[ML] Propagate xpack.ml.min_disk_space_off_heap to native process (#64733)
This change is the Java-side companion to elastic/ml-cpp#1556. It ensures that customisations to xpack.ml.min_disk_space_off_heap are reflected in the disk space check done in the C++ code as well as in the disk space check done in the Java code. The check done in the C++ code runs later than the check in the Java code, and only requires that 80% of the disk space required in the Java code is available. (The defaults are 5GB and 4GB.) Backport of #64720
1 parent 653a547 commit d811fdf

File tree

6 files changed

+72
-12
lines changed

6 files changed

+72
-12
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,7 @@ public void testMemoryStatus() {
214214

215215
public void testOverflowToDisk() throws Exception {
216216
assumeFalse("https://github.com/elastic/elasticsearch/issues/44609", Constants.WINDOWS);
217-
// This test repeatedly fails in encryption-at-rest (EAR) builds, and
218-
// the only way to detect such a build appears to be the CI job name
219-
assumeFalse("https://github.com/elastic/elasticsearch/issues/58806",
220-
System.getenv().getOrDefault("JOB_NAME", "not a CI build").contains("EAR"));
217+
221218
Detector.Builder detector = new Detector.Builder("mean", "value");
222219
detector.setByFieldName("clientIP");
223220

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.cluster.service.ClusterService;
1515
import org.elasticsearch.common.inject.Inject;
16+
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.common.unit.ByteSizeValue;
1718
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.threadpool.ThreadPool;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.xpack.core.ml.job.config.Job;
2526
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
2627
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
28+
import org.elasticsearch.xpack.ml.MachineLearning;
2729
import org.elasticsearch.xpack.ml.job.JobManager;
2830
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2931
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@@ -48,20 +50,23 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
4850
private final JobManager jobManager;
4951
private final NativeStorageProvider nativeStorageProvider;
5052
private final AnomalyDetectionAuditor auditor;
53+
private final long cppMinAvailableDiskSpaceBytes;
5154

5255
@Inject
53-
public TransportForecastJobAction(TransportService transportService,
56+
public TransportForecastJobAction(TransportService transportService, Settings settings,
5457
ClusterService clusterService, ActionFilters actionFilters,
5558
JobResultsProvider jobResultsProvider, AutodetectProcessManager processManager,
5659
JobManager jobManager, NativeStorageProvider nativeStorageProvider, AnomalyDetectionAuditor auditor) {
5760
super(ForecastJobAction.NAME, clusterService, transportService, actionFilters,
5861
ForecastJobAction.Request::new, ForecastJobAction.Response::new,
59-
ThreadPool.Names.SAME, processManager);
62+
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
63+
ThreadPool.Names.SAME, processManager);
6064
this.jobResultsProvider = jobResultsProvider;
6165
this.jobManager = jobManager;
6266
this.nativeStorageProvider = nativeStorageProvider;
6367
this.auditor = auditor;
64-
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
68+
// The C++ enforces 80% of the free disk space that the Java enforces
69+
this.cppMinAvailableDiskSpaceBytes = MachineLearning.MIN_DISK_SPACE_OFF_HEAP.get(settings).getBytes() / 5 * 4;
6570
}
6671

6772
@Override
@@ -92,6 +97,10 @@ protected void taskOperation(ForecastJobAction.Request request, JobTask task, Ac
9297
paramsBuilder.tmpStorage(tmpStorage.toString());
9398
}
9499

100+
if (cppMinAvailableDiskSpaceBytes >= 0) {
101+
paramsBuilder.minAvailableDiskSpace(cppMinAvailableDiskSpaceBytes);
102+
}
103+
95104
ForecastParams params = paramsBuilder.build();
96105
processManager.forecastJob(task, params, e -> {
97106
if (e == null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ public class ForecastParams {
1818
private final long expiresIn;
1919
private final String tmpStorage;
2020
private final Long maxModelMemory;
21+
private final Long minAvailableDiskSpace;
2122

22-
private ForecastParams(String forecastId, long createTime, long duration, long expiresIn, String tmpStorage, Long maxModelMemory) {
23+
private ForecastParams(String forecastId, long createTime, long duration, long expiresIn, String tmpStorage, Long maxModelMemory,
24+
Long minAvailableDiskSpace) {
2325
this.forecastId = forecastId;
2426
this.createTime = createTime;
2527
this.duration = duration;
2628
this.expiresIn = expiresIn;
2729
this.tmpStorage = tmpStorage;
2830
this.maxModelMemory = maxModelMemory;
31+
this.minAvailableDiskSpace = minAvailableDiskSpace;
2932
}
3033

3134
public String getForecastId() {
@@ -69,9 +72,13 @@ public Long getMaxModelMemory() {
6972
return maxModelMemory;
7073
}
7174

75+
public Long getMinAvailableDiskSpace() {
76+
return minAvailableDiskSpace;
77+
}
78+
7279
@Override
7380
public int hashCode() {
74-
return Objects.hash(forecastId, createTime, duration, expiresIn, tmpStorage, maxModelMemory);
81+
return Objects.hash(forecastId, createTime, duration, expiresIn, tmpStorage, maxModelMemory, minAvailableDiskSpace);
7582
}
7683

7784
@Override
@@ -88,7 +95,8 @@ public boolean equals(Object obj) {
8895
&& Objects.equals(duration, other.duration)
8996
&& Objects.equals(expiresIn, other.expiresIn)
9097
&& Objects.equals(tmpStorage, other.tmpStorage)
91-
&& Objects.equals(maxModelMemory, other.maxModelMemory);
98+
&& Objects.equals(maxModelMemory, other.maxModelMemory)
99+
&& Objects.equals(minAvailableDiskSpace, other.minAvailableDiskSpace);
92100
}
93101

94102
public static Builder builder() {
@@ -101,6 +109,7 @@ public static class Builder {
101109
private long durationSecs;
102110
private long expiresInSecs;
103111
private Long maxModelMemory;
112+
private Long minAvailableDiskSpace;
104113
private String tmpStorage;
105114

106115
private Builder() {
@@ -132,8 +141,14 @@ public Builder maxModelMemory(long maxModelMemory) {
132141
return this;
133142
}
134143

144+
public Builder minAvailableDiskSpace(long minAvailableDiskSpace) {
145+
this.minAvailableDiskSpace = minAvailableDiskSpace;
146+
return this;
147+
}
148+
135149
public ForecastParams build() {
136-
return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs, tmpStorage, maxModelMemory);
150+
return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs, tmpStorage, maxModelMemory,
151+
minAvailableDiskSpace);
137152
}
138153
}
139154
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ public void writeForecastMessage(ForecastParams params) throws IOException {
166166
if (params.getMaxModelMemory() != null) {
167167
builder.field("max_model_memory", params.getMaxModelMemory());
168168
}
169+
if (params.getMinAvailableDiskSpace() != null) {
170+
builder.field("min_available_disk_space", params.getMinAvailableDiskSpace());
171+
}
169172
builder.endObject();
170173

171174
writeMessage(FORECAST_MESSAGE_CODE + Strings.toString(builder));

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
public class ForecastParamsTests extends ESTestCase {
1818

19-
private static ParseField DURATION = new ParseField("duration");
19+
private static final ParseField DURATION = new ParseField("duration");
2020

2121
public void testForecastIdsAreUnique() {
2222
Set<String> ids = new HashSet<>();

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
1616
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
1717
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
18+
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
1819
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
1920
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;
2021
import org.junit.Before;
@@ -30,7 +31,9 @@
3031
import java.util.Optional;
3132
import java.util.stream.IntStream;
3233

34+
import static org.hamcrest.Matchers.endsWith;
3335
import static org.hamcrest.Matchers.equalTo;
36+
import static org.hamcrest.Matchers.startsWith;
3437
import static org.mockito.Mockito.inOrder;
3538
import static org.mockito.Mockito.times;
3639
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -286,6 +289,39 @@ public void testWriteStartBackgroundPersistMessage() throws IOException {
286289
verifyNoMoreInteractions(lengthEncodedWriter);
287290
}
288291

292+
public void testWriteForecastParamsMessage() throws IOException {
293+
AutodetectControlMsgWriter writer = new AutodetectControlMsgWriter(lengthEncodedWriter, 2);
294+
295+
ForecastParams params = ForecastParams.builder()
296+
.duration(TimeValue.timeValueHours(3))
297+
.expiresIn(TimeValue.timeValueDays(4))
298+
.tmpStorage("/my_temp_dir")
299+
.maxModelMemory(12345)
300+
.minAvailableDiskSpace(98765)
301+
.build();
302+
303+
writer.writeForecastMessage(params);
304+
305+
InOrder inOrder = inOrder(lengthEncodedWriter);
306+
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
307+
inOrder.verify(lengthEncodedWriter).writeField("");
308+
ArgumentCaptor<String> capturedMessage = ArgumentCaptor.forClass(String.class);
309+
inOrder.verify(lengthEncodedWriter).writeField(capturedMessage.capture());
310+
311+
assertThat(capturedMessage.getValue(), startsWith("p{\"forecast_id\":\""));
312+
assertThat(capturedMessage.getValue(), endsWith("\"duration\":10800,\"expires_in\":345600,\"tmp_storage\":\"/my_temp_dir\","
313+
+"\"max_model_memory\":12345,\"min_available_disk_space\":98765}"));
314+
315+
inOrder.verify(lengthEncodedWriter).writeNumFields(2);
316+
inOrder.verify(lengthEncodedWriter).writeField("");
317+
StringBuilder spaces = new StringBuilder();
318+
IntStream.rangeClosed(1, AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH).forEach(i -> spaces.append(' '));
319+
inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
320+
inOrder.verify(lengthEncodedWriter).flush();
321+
322+
verifyNoMoreInteractions(lengthEncodedWriter);
323+
}
324+
289325
private static List<RuleCondition> createRule(double value) {
290326
return Collections.singletonList(new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, value));
291327
}

0 commit comments

Comments
 (0)