From ff11e5065a9395cc92dfdcdc71fe440e364a97c2 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:34:36 +0200 Subject: [PATCH 1/8] simple integration test added --- .../ml/integration/DetectionRulesIT.java | 61 +++++++++++++++++++ .../MlNativeAutodetectIntegTestCase.java | 21 +++++++ 2 files changed, 82 insertions(+) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index fec85730aaf2b..4ed970e52035e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -20,7 +21,10 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.Operator; +import org.elasticsearch.xpack.core.ml.job.config.RuleAction; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; +import org.elasticsearch.xpack.core.ml.job.config.RuleParams; +import org.elasticsearch.xpack.core.ml.job.config.RuleParamsForForceTimeShift; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; @@ -39,6 +43,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.oneOf; @@ -299,6 +304,62 @@ public void testScopeAndCondition() throws IOException { assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222")); } + public void testForceTimeShiftAction() throws Exception { + // The test ensures that the force time shift action works as expected. + + long timeShiftAmount = 3600L; + long timestampStart = 1491004800000L; + long bucketSpanMillis = 3600000L; + long timeShiftTimestamp = timestampStart + bucketSpanMillis; + + int totalBuckets = 2 * 24; + + DetectionRule rule = new DetectionRule.Builder( + Arrays.asList(new RuleCondition(RuleCondition.AppliesTo.TIME, Operator.GTE, timeShiftTimestamp)) + ).setActions(RuleAction.FORCE_TIME_SHIFT).setParams(new RuleParams(new RuleParamsForForceTimeShift(timeShiftAmount))).build(); + + Detector.Builder detector = new Detector.Builder("mean", "value"); + detector.setRules(Arrays.asList(rule)); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMillis(bucketSpanMillis)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + Job.Builder job = new Job.Builder("detection-rules-it-test-force-time-shift"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + putJob(job); + openJob(job.getId()); + + // post some data + int normalValue = 400; + List data = new ArrayList<>(); + long timestamp = timestampStart; + for (int bucket = 0; bucket < totalBuckets; bucket++) { + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("value", normalValue); + data.add(createJsonRecord(record)); + timestamp += bucketSpanMillis; + } + + postData(job.getId(), joinBetween(0, data.size(), data)); + closeJob(job.getId()); + + List annotations = getAnnotations(); + assertThat(annotations.size(), greaterThanOrEqualTo(1)); + + // Check that annotation contain the expected time shift + boolean annotationFound = false; + for (Annotation annotation : annotations) { + if (annotation.getAnnotation().contains("Shifted time by")) { + annotationFound = true; + assertThat(annotation.getAnnotation(), containsString(timeShiftAmount + " seconds")); + break; + } + } + assertThat(annotationFound, equalTo(true)); + } + private String createIpRecord(long timestamp, String ip) throws IOException { Map record = new HashMap<>(); record.put("time", timestamp); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index ce265d0e895aa..2e096f3262cb6 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -287,6 +287,27 @@ protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnota }); } + protected List getAnnotations() throws Exception { + List annotations = new ArrayList<>(); + // Refresh the annotations index so that recently indexed annotation docs are visible. + indicesAdmin().prepareRefresh(AnnotationIndex.LATEST_INDEX_NAME) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .get(); + + SearchRequest searchRequest = new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions( + IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN + ); + assertCheckedResponse(client().search(searchRequest), searchResponse -> { + + for (SearchHit hit : searchResponse.getHits().getHits()) { + try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) { + annotations.add(Annotation.fromXContent(parser, null)); + } + } + }); + return annotations; + } + protected ForecastRequestStats getForecastStats(String jobId, String forecastId) throws Exception { SetOnce forecastRequestStats = new SetOnce<>(); assertCheckedResponse( From 133c7f5affbb6fc1cbecabd28e0eca354d59877a Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Mon, 8 Jul 2024 13:31:17 +0200 Subject: [PATCH 2/8] [ML] Store anomaly detection config file and input on demand (#110582) DO NOT MERGE THIS INTO `main`! This PR enables the storage of data and configuration of an anomaly detection job in files so it can be reproduced using the `autodetect` process without Elasticsearch. To enable the storage, specify `keep_job_data` parameter in the `custom_settings` parameter of the job config: ```json "custom_settings": { "keep_job_data": "true" } ``` Now, start the job and watch for a log message with the autodetect command similar to the following: ```bash [2024-06-19T16:03:38,248][INFO ][o.e.x.m.j.p.a.NativeAutodetectProcessFactory] [Elastic-MBP.fritz.box] Autodetect process command: [./autodetect, --lengthEncodedInput, --maxAnomalyRecords=500, --validElasticLicenseKeyConfirmed=true, --config=/var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/config10764979302390040373.json, --logPipe=/var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/autodetect_test-2_log_45530, --input=/var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/autodetect_test-2_input_45530, --inputIsPipe, --output=/var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/autodetect_test-2_output_45530, --outputIsPipe, --persist=/var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/autodetect_test-2_persist_45530, --persistIsPipe, --namedPipeConnectTimeout=10] ``` and ```bash [2024-06-19T15:29:08,640][INFO ][o.e.x.m.p.w.LengthEncodedWriter] Opening file: /var/folders/_j/gcj6z4b950bdzpw7_fzrmpf40000gn/T/elasticsearch-12668972032551307591/autodetect_test-2_input_45530 for writing. ``` Copy the config file, the persist file from the first message, and the input file from the second message. --- .../xpack/core/ml/job/config/Job.java | 13 ++++++ x-pack/plugin/ml/build.gradle | 3 +- .../process/autodetect/AutodetectBuilder.java | 6 +++ .../NativeAutodetectProcessFactory.java | 12 ++++- .../ml/process/AbstractNativeProcess.java | 13 +++++- .../process/writer/LengthEncodedWriter.java | 45 +++++++++++++++++++ 6 files changed, 89 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 8da0209e10293..00f03c5247162 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -77,6 +77,7 @@ public class Job implements SimpleDiffable, Writeable, ToXContentObject { public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits"); public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField CUSTOM_SETTINGS = new ParseField("custom_settings"); + public static final ParseField KEEP_JOB_DATA = new ParseField("keep_job_data"); public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); @@ -478,6 +479,18 @@ public Map getCustomSettings() { return customSettings; } + public Boolean keepJobData() { + if (customSettings != null && customSettings.containsKey(KEEP_JOB_DATA.getPreferredName())) { + Object value = customSettings.get(KEEP_JOB_DATA.getPreferredName()); + if (value instanceof Boolean) { + return (Boolean) value; + } else if (value instanceof String) { + return "true".equalsIgnoreCase((String) value); + } + } + return false; + } + public String getModelSnapshotId() { return modelSnapshotId; } diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index 706d7ea73aea9..03036891845bc 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -57,7 +57,8 @@ esplugin.bundleSpec.exclude 'platform/licenses/**' } dependencies { - testImplementation project(path: ':x-pack:plugin:inference') + implementation project(path: ':libs:elasticsearch-logging') + testImplementation project(path: ':x-pack:plugin:inference') compileOnly project(':modules:lang-painless:spi') compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('autoscaling')) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 2d4ea308a6693..70fad6a05e277 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -159,6 +159,12 @@ public void build() throws IOException, InterruptedException { buildQuantiles(command); processPipes.addArgs(command); + + // log the command + if (job.keepJobData()) { + // log the command to reproduce the autodetect run + logger.info("Autodetect process command: " + command); + } controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index c25cac48b27d5..223ae532ff9db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -25,10 +25,12 @@ import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessResultsParser; +import org.elasticsearch.xpack.ml.utils.FileUtils; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -110,11 +112,19 @@ public AutodetectProcess createAutodetectProcess( nativeController, processPipes, numberOfFields, - filesToDelete, + (job.keepJobData() == false) ? filesToDelete : new ArrayList<>(), resultsParser, onProcessCrash ); + try { + // check if jobs'custom settings contain the setting 'keep_job_data' + // and if it is set to true, then we create the autodetect controll message file + if (job.keepJobData()) { + FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); + Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); + autodetect.setControlMessageFilePath(controlMsgFile); + } autodetect.start(executorService, stateProcessor); return autodetect; } catch (IOException | EsRejectedExecutionException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index dd71800bd4f90..da57317b0f0db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -61,6 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; + private volatile Path controlMessageFilePath; protected AbstractNativeProcess( String jobId, @@ -107,7 +108,7 @@ public void start(ExecutorService executorService) throws IOException { processPipes.connectOtherStreams(); if (processPipes.getProcessInStream().isPresent()) { processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); - this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); + this.recordWriter.set(new LengthEncodedWriter(processInStream.get(), getControlMessageFilePath())); } processOutStream.set(processPipes.getProcessOutStream().orElse(null)); processRestoreStream.set(processPipes.getRestoreStream().orElse(null)); @@ -348,4 +349,14 @@ public void consumeAndCloseOutputStream() { // Given we are closing down the process there is no point propagating IO exceptions here } } + + @Nullable + public Path getControlMessageFilePath() { + return controlMessageFilePath; + } + + public void setControlMessageFilePath(Path controlMessageFilePath) { + this.controlMessageFilePath = controlMessageFilePath; + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 82a44c4f3b075..8b115288ac24b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -6,12 +6,16 @@ */ package org.elasticsearch.xpack.ml.process.writer; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; /** @@ -27,18 +31,49 @@ */ public class LengthEncodedWriter implements RecordWriter { private OutputStream outputStream; + // In case customer setting "keep_job_data" is set to true, we will write the data to a file + // additionally to the output stream. + private OutputStream fileOutputStream; private ByteBuffer lengthBuffer; + private Logger logger = LogManager.getLogger(LengthEncodedWriter.class); + /** * Create the writer on the OutputStream os. * This object will never close os. */ public LengthEncodedWriter(OutputStream os) { + this(os, null); + } + + public LengthEncodedWriter(OutputStream os, Path filePath) { outputStream = os; + try { + if (filePath != null) { + logger.info("Opening file: " + filePath + " for writing."); + fileOutputStream = Files.newOutputStream(filePath); + } else { + fileOutputStream = null; + } + } catch (IOException e) { + logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e); + fileOutputStream = null; + } // This will be used to convert 32 bit integers to network byte order lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) } + // Add public destructor + public void close() { + if (fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + logger.error("Failed to close file output stream.", e.getMessage(), e); + } + } + } + /** * Convert each String in the record array to a length/value encoded pair * and write to the outputstream. @@ -75,6 +110,9 @@ public void writeNumFields(int numFields) throws IOException { lengthBuffer.clear(); lengthBuffer.putInt(numFields); outputStream.write(lengthBuffer.array()); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + } } /** @@ -87,10 +125,17 @@ public void writeField(String field) throws IOException { lengthBuffer.putInt(utf8Bytes.length); outputStream.write(lengthBuffer.array()); outputStream.write(utf8Bytes); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + fileOutputStream.write(utf8Bytes); + } } @Override public void flush() throws IOException { outputStream.flush(); + if (fileOutputStream != null) { + fileOutputStream.flush(); + } } } From 95726e04b5b877283c2e1c8d28a07a3ae82c2acc Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:24:25 +0200 Subject: [PATCH 3/8] Add integration test --- .../elasticsearch/xpack/ml/integration/DetectionRulesIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 4ed970e52035e..8c01e744aa556 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -310,7 +310,7 @@ public void testForceTimeShiftAction() throws Exception { long timeShiftAmount = 3600L; long timestampStart = 1491004800000L; long bucketSpanMillis = 3600000L; - long timeShiftTimestamp = timestampStart + bucketSpanMillis; + long timeShiftTimestamp = (timestampStart + bucketSpanMillis)/1000; int totalBuckets = 2 * 24; @@ -326,6 +326,7 @@ public void testForceTimeShiftAction() throws Exception { Job.Builder job = new Job.Builder("detection-rules-it-test-force-time-shift"); job.setAnalysisConfig(analysisConfig); job.setDataDescription(dataDescription); +// job.setCustomSettings(Collections.singletonMap("keep_job_data", "true")); putJob(job); openJob(job.getId()); @@ -357,7 +358,7 @@ public void testForceTimeShiftAction() throws Exception { break; } } - assertThat(annotationFound, equalTo(true)); + assertThat("Annotation with time shift not found", annotationFound, equalTo(true)); } private String createIpRecord(long timestamp, String ip) throws IOException { From 17ebaa5e5c5a2698fe64f84842159dd60b0425fe Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:35:10 +0200 Subject: [PATCH 4/8] Revert "[ML] Store anomaly detection config file and input on demand (#110582)" This reverts commit 133c7f5affbb6fc1cbecabd28e0eca354d59877a. --- .../xpack/core/ml/job/config/Job.java | 13 ------ x-pack/plugin/ml/build.gradle | 3 +- .../process/autodetect/AutodetectBuilder.java | 6 --- .../NativeAutodetectProcessFactory.java | 12 +---- .../ml/process/AbstractNativeProcess.java | 13 +----- .../process/writer/LengthEncodedWriter.java | 45 ------------------- 6 files changed, 3 insertions(+), 89 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 00f03c5247162..8da0209e10293 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -77,7 +77,6 @@ public class Job implements SimpleDiffable, Writeable, ToXContentObject { public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits"); public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField CUSTOM_SETTINGS = new ParseField("custom_settings"); - public static final ParseField KEEP_JOB_DATA = new ParseField("keep_job_data"); public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); @@ -479,18 +478,6 @@ public Map getCustomSettings() { return customSettings; } - public Boolean keepJobData() { - if (customSettings != null && customSettings.containsKey(KEEP_JOB_DATA.getPreferredName())) { - Object value = customSettings.get(KEEP_JOB_DATA.getPreferredName()); - if (value instanceof Boolean) { - return (Boolean) value; - } else if (value instanceof String) { - return "true".equalsIgnoreCase((String) value); - } - } - return false; - } - public String getModelSnapshotId() { return modelSnapshotId; } diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index 03036891845bc..706d7ea73aea9 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -57,8 +57,7 @@ esplugin.bundleSpec.exclude 'platform/licenses/**' } dependencies { - implementation project(path: ':libs:elasticsearch-logging') - testImplementation project(path: ':x-pack:plugin:inference') + testImplementation project(path: ':x-pack:plugin:inference') compileOnly project(':modules:lang-painless:spi') compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('autoscaling')) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 70fad6a05e277..2d4ea308a6693 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -159,12 +159,6 @@ public void build() throws IOException, InterruptedException { buildQuantiles(command); processPipes.addArgs(command); - - // log the command - if (job.keepJobData()) { - // log the command to reproduce the autodetect run - logger.info("Autodetect process command: " + command); - } controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 223ae532ff9db..c25cac48b27d5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -25,12 +25,10 @@ import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessResultsParser; -import org.elasticsearch.xpack.ml.utils.FileUtils; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -112,19 +110,11 @@ public AutodetectProcess createAutodetectProcess( nativeController, processPipes, numberOfFields, - (job.keepJobData() == false) ? filesToDelete : new ArrayList<>(), + filesToDelete, resultsParser, onProcessCrash ); - try { - // check if jobs'custom settings contain the setting 'keep_job_data' - // and if it is set to true, then we create the autodetect controll message file - if (job.keepJobData()) { - FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); - Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); - autodetect.setControlMessageFilePath(controlMsgFile); - } autodetect.start(executorService, stateProcessor); return autodetect; } catch (IOException | EsRejectedExecutionException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index da57317b0f0db..dd71800bd4f90 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -61,7 +61,6 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; - private volatile Path controlMessageFilePath; protected AbstractNativeProcess( String jobId, @@ -108,7 +107,7 @@ public void start(ExecutorService executorService) throws IOException { processPipes.connectOtherStreams(); if (processPipes.getProcessInStream().isPresent()) { processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); - this.recordWriter.set(new LengthEncodedWriter(processInStream.get(), getControlMessageFilePath())); + this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); } processOutStream.set(processPipes.getProcessOutStream().orElse(null)); processRestoreStream.set(processPipes.getRestoreStream().orElse(null)); @@ -349,14 +348,4 @@ public void consumeAndCloseOutputStream() { // Given we are closing down the process there is no point propagating IO exceptions here } } - - @Nullable - public Path getControlMessageFilePath() { - return controlMessageFilePath; - } - - public void setControlMessageFilePath(Path controlMessageFilePath) { - this.controlMessageFilePath = controlMessageFilePath; - } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 8b115288ac24b..82a44c4f3b075 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -6,16 +6,12 @@ */ package org.elasticsearch.xpack.ml.process.writer; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.List; /** @@ -31,49 +27,18 @@ */ public class LengthEncodedWriter implements RecordWriter { private OutputStream outputStream; - // In case customer setting "keep_job_data" is set to true, we will write the data to a file - // additionally to the output stream. - private OutputStream fileOutputStream; private ByteBuffer lengthBuffer; - private Logger logger = LogManager.getLogger(LengthEncodedWriter.class); - /** * Create the writer on the OutputStream os. * This object will never close os. */ public LengthEncodedWriter(OutputStream os) { - this(os, null); - } - - public LengthEncodedWriter(OutputStream os, Path filePath) { outputStream = os; - try { - if (filePath != null) { - logger.info("Opening file: " + filePath + " for writing."); - fileOutputStream = Files.newOutputStream(filePath); - } else { - fileOutputStream = null; - } - } catch (IOException e) { - logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e); - fileOutputStream = null; - } // This will be used to convert 32 bit integers to network byte order lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) } - // Add public destructor - public void close() { - if (fileOutputStream != null) { - try { - fileOutputStream.close(); - } catch (IOException e) { - logger.error("Failed to close file output stream.", e.getMessage(), e); - } - } - } - /** * Convert each String in the record array to a length/value encoded pair * and write to the outputstream. @@ -110,9 +75,6 @@ public void writeNumFields(int numFields) throws IOException { lengthBuffer.clear(); lengthBuffer.putInt(numFields); outputStream.write(lengthBuffer.array()); - if (fileOutputStream != null) { - fileOutputStream.write(lengthBuffer.array()); - } } /** @@ -125,17 +87,10 @@ public void writeField(String field) throws IOException { lengthBuffer.putInt(utf8Bytes.length); outputStream.write(lengthBuffer.array()); outputStream.write(utf8Bytes); - if (fileOutputStream != null) { - fileOutputStream.write(lengthBuffer.array()); - fileOutputStream.write(utf8Bytes); - } } @Override public void flush() throws IOException { outputStream.flush(); - if (fileOutputStream != null) { - fileOutputStream.flush(); - } } } From 619250eb1fb329663360591410afc715b39a756a Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:38:22 +0200 Subject: [PATCH 5/8] formatting --- .../elasticsearch/xpack/ml/integration/DetectionRulesIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 8c01e744aa556..d8b2f44b67ab4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -310,7 +310,7 @@ public void testForceTimeShiftAction() throws Exception { long timeShiftAmount = 3600L; long timestampStart = 1491004800000L; long bucketSpanMillis = 3600000L; - long timeShiftTimestamp = (timestampStart + bucketSpanMillis)/1000; + long timeShiftTimestamp = (timestampStart + bucketSpanMillis) / 1000; int totalBuckets = 2 * 24; @@ -326,7 +326,7 @@ public void testForceTimeShiftAction() throws Exception { Job.Builder job = new Job.Builder("detection-rules-it-test-force-time-shift"); job.setAnalysisConfig(analysisConfig); job.setDataDescription(dataDescription); -// job.setCustomSettings(Collections.singletonMap("keep_job_data", "true")); + // job.setCustomSettings(Collections.singletonMap("keep_job_data", "true")); putJob(job); openJob(job.getId()); From efb682fea8760ed6fd79fe8b3a2905ba558b8744 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:49:09 +0200 Subject: [PATCH 6/8] remove dead code --- .../org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index d8b2f44b67ab4..b16eaafdb7024 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -326,7 +326,6 @@ public void testForceTimeShiftAction() throws Exception { Job.Builder job = new Job.Builder("detection-rules-it-test-force-time-shift"); job.setAnalysisConfig(analysisConfig); job.setDataDescription(dataDescription); - // job.setCustomSettings(Collections.singletonMap("keep_job_data", "true")); putJob(job); openJob(job.getId()); From 722c74c32b8b5b1038842d1f5c9aa8fbfd0d60fd Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 12:54:07 +0200 Subject: [PATCH 7/8] Update x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java Co-authored-by: David Kyle --- .../elasticsearch/xpack/ml/integration/DetectionRulesIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index b16eaafdb7024..771a93686b88a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -308,7 +308,7 @@ public void testForceTimeShiftAction() throws Exception { // The test ensures that the force time shift action works as expected. long timeShiftAmount = 3600L; - long timestampStart = 1491004800000L; + long timestampStartMillis= 1491004800000L; long bucketSpanMillis = 3600000L; long timeShiftTimestamp = (timestampStart + bucketSpanMillis) / 1000; From df25777e3a58c072e1dca8326c2c1f65b7904a3d Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:05:12 +0200 Subject: [PATCH 8/8] harden the test --- .../ml/integration/DetectionRulesIT.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 771a93686b88a..0effe5349d43a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -45,6 +45,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.oneOf; /** @@ -308,9 +309,9 @@ public void testForceTimeShiftAction() throws Exception { // The test ensures that the force time shift action works as expected. long timeShiftAmount = 3600L; - long timestampStartMillis= 1491004800000L; + long timestampStartMillis = 1491004800000L; long bucketSpanMillis = 3600000L; - long timeShiftTimestamp = (timestampStart + bucketSpanMillis) / 1000; + long timeShiftTimestamp = (timestampStartMillis + bucketSpanMillis) / 1000; int totalBuckets = 2 * 24; @@ -333,7 +334,7 @@ public void testForceTimeShiftAction() throws Exception { // post some data int normalValue = 400; List data = new ArrayList<>(); - long timestamp = timestampStart; + long timestamp = timestampStartMillis; for (int bucket = 0; bucket < totalBuckets; bucket++) { Map record = new HashMap<>(); record.put("time", timestamp); @@ -347,17 +348,22 @@ public void testForceTimeShiftAction() throws Exception { List annotations = getAnnotations(); assertThat(annotations.size(), greaterThanOrEqualTo(1)); + assertThat(annotations.size(), lessThanOrEqualTo(3)); // Check that annotation contain the expected time shift - boolean annotationFound = false; + boolean countingModelAnnotationFound = false; + boolean individualModelAnnotationFound = false; for (Annotation annotation : annotations) { - if (annotation.getAnnotation().contains("Shifted time by")) { - annotationFound = true; + if (annotation.getAnnotation().contains("Counting model shifted time by")) { + countingModelAnnotationFound = true; + assertThat(annotation.getAnnotation(), containsString(timeShiftAmount + " seconds")); + } else if (annotation.getAnnotation().contains("Model shifted time by")) { + individualModelAnnotationFound = true; assertThat(annotation.getAnnotation(), containsString(timeShiftAmount + " seconds")); - break; } } - assertThat("Annotation with time shift not found", annotationFound, equalTo(true)); + assertThat("Counting model annotation with time shift not found", countingModelAnnotationFound, equalTo(true)); + assertThat("Individual model annotation with time shift not found", individualModelAnnotationFound, equalTo(true)); } private String createIpRecord(long timestamp, String ip) throws IOException {