From fea46b75b685beb778c55c612d09799e5504d66b Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Mon, 10 Jun 2024 19:49:49 +0200 Subject: [PATCH 1/7] [ML] Record AD message and input to file --- .gitignore | 1 + x-pack/plugin/ml/build.gradle | 3 +- .../process/autodetect/AutodetectBuilder.java | 3 ++ .../NativeAutodetectProcessFactory.java | 9 ++++ .../ml/process/AbstractNativeProcess.java | 22 +++++++- .../process/writer/LengthEncodedWriter.java | 52 +++++++++++++++++++ 6 files changed, 87 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 03e2446ec4f13..2329e75501c9c 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,4 @@ testfixtures_shared/ # Generated checkstyle_ide.xml x-pack/plugin/esql/gen/ +benchmarks/ diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index 74600a072ea0d..cb837a40cccee 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -74,7 +74,8 @@ esplugin.bundleSpec.exclude 'platform/licenses/**' } dependencies { - testImplementation project(path: ':x-pack:plugin:inference') + implementation project(path: ':libs:elasticsearch-logging') + testImplementation project(path: ':x-pack:plugin:inference') compileOnly project(':modules:lang-painless:spi') compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('autoscaling')) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 2d4ea308a6693..c8d0219e22623 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -159,6 +159,9 @@ public void build() throws IOException, InterruptedException { buildQuantiles(command); processPipes.addArgs(command); + + // log the command + logger.info("Starting autodetect process with command: " + command); controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index c25cac48b27d5..94619392db823 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -25,10 +25,12 @@ import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessResultsParser; +import org.elasticsearch.xpack.ml.utils.FileUtils; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -114,7 +116,14 @@ public AutodetectProcess createAutodetectProcess( resultsParser, onProcessCrash ); + + try { + + FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); + Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); + + autodetect.setControlMessageFilePath(controlMsgFile.toString()); autodetect.start(executorService, stateProcessor); return autodetect; } catch (IOException | EsRejectedExecutionException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index dd71800bd4f90..75053957b456c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.core.Nullable; +import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; @@ -62,6 +63,18 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processKilled; private volatile boolean isReady; + private boolean DELETE_CONFIG_FILES = false; + + public String getControlMessageFilePath() { + return controlMessageFilePath; + } + + public void setControlMessageFilePath(String controlMessageFilePath) { + this.controlMessageFilePath = controlMessageFilePath; + } + + private String controlMessageFilePath = null; + protected AbstractNativeProcess( String jobId, NativeController nativeController, @@ -75,7 +88,12 @@ protected AbstractNativeProcess( this.processPipes = processPipes; this.startTime = ZonedDateTime.now(); this.numberOfFields = numberOfFields; - this.filesToDelete = filesToDelete; + if (DELETE_CONFIG_FILES) { + this.filesToDelete = filesToDelete; + } else { + LOGGER.info("Following config files will not be deleted: " + filesToDelete); + this.filesToDelete = List.of(); + } this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } @@ -107,7 +125,7 @@ public void start(ExecutorService executorService) throws IOException { processPipes.connectOtherStreams(); if (processPipes.getProcessInStream().isPresent()) { processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); - this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); + this.recordWriter.set(new LengthEncodedWriter(processInStream.get(), getControlMessageFilePath())); } processOutStream.set(processPipes.getProcessOutStream().orElse(null)); processRestoreStream.set(processPipes.getRestoreStream().orElse(null)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 82a44c4f3b075..5bfa91c987301 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -6,12 +6,17 @@ */ package org.elasticsearch.xpack.ml.process.writer; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.logging.LogManager; import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; +import org.elasticsearch.xpack.ml.utils.FileUtils; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; /** @@ -27,8 +32,11 @@ */ public class LengthEncodedWriter implements RecordWriter { private OutputStream outputStream; + private OutputStream fileOutputStream; private ByteBuffer lengthBuffer; + private Logger logger = LogManager.getLogger(LengthEncodedWriter.class); + /** * Create the writer on the OutputStream os. * This object will never close os. @@ -37,6 +45,40 @@ public LengthEncodedWriter(OutputStream os) { outputStream = os; // This will be used to convert 32 bit integers to network byte order lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) + fileOutputStream = null; + } + + public LengthEncodedWriter(OutputStream os, String filePath) { + outputStream = os; + try { + +// FileUtils.recreateTempDirectoryIfNeeded(Path.of(filePath)); +// tmpFilePath = env.c + Path file = Path.of(filePath); + logger.info("Opening file: " + filePath + " for writing."); + fileOutputStream = Files.newOutputStream(file); + } catch (IOException e) { + logger.error("Failed to open file: " + filePath + " for writing."); + // log error message and stack trace + logger.error(e.getMessage(), e); + fileOutputStream = null; + } + // This will be used to convert 32 bit integers to network byte order + lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) + } + + // Add public destructor + public void close() { + // TODO: this should be moved to AbstractNativeProcess + if (fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + logger.error("Failed to close file output stream."); + // log error message and stack trace + logger.error(e.getMessage(), e); + } + } } /** @@ -75,6 +117,9 @@ public void writeNumFields(int numFields) throws IOException { lengthBuffer.clear(); lengthBuffer.putInt(numFields); outputStream.write(lengthBuffer.array()); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + } } /** @@ -87,10 +132,17 @@ public void writeField(String field) throws IOException { lengthBuffer.putInt(utf8Bytes.length); outputStream.write(lengthBuffer.array()); outputStream.write(utf8Bytes); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + fileOutputStream.write(utf8Bytes); + } } @Override public void flush() throws IOException { outputStream.flush(); + if (fileOutputStream != null) { + fileOutputStream.flush(); + } } } From 9b1d9754fd58ce2b9d11a60fc78882fdd31f8303 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 18 Jun 2024 15:05:41 +0200 Subject: [PATCH 2/7] Use custom_settings to trigger file storage --- .../NativeAutodetectProcessFactory.java | 17 ++++++---- .../ml/process/AbstractNativeProcess.java | 31 +++++++------------ .../process/writer/LengthEncodedWriter.java | 10 ++---- 3 files changed, 25 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 94619392db823..3375e202514c0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -83,6 +83,8 @@ public AutodetectProcess createAutodetectProcess( ExecutorService executorService, Consumer onProcessCrash ) { + boolean keepJobData = job.getCustomSettings().containsKey("keep_job_data") + && job.getCustomSettings().get("keep_job_data").equals("true") List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes( env, @@ -112,18 +114,21 @@ public AutodetectProcess createAutodetectProcess( nativeController, processPipes, numberOfFields, - filesToDelete, + (keepJobData == false) ? filesToDelete : new ArrayList<>(), resultsParser, onProcessCrash ); try { - - FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); - Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); - - autodetect.setControlMessageFilePath(controlMsgFile.toString()); + // check if jobs'custom settings contain the setting 'keep_job_data' + // and if it is set to true, then we create the autodetect controll message file + // TODO Valeriy: store "keep_job_data" into a constant + if(keepJobData == true) { + FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); + Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); + autodetect.setControlMessageFilePath(controlMsgFile.toString()); + } autodetect.start(executorService, stateProcessor); return autodetect; } catch (IOException | EsRejectedExecutionException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 75053957b456c..98547496ab860 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.core.Nullable; -import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; @@ -62,18 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; - - private boolean DELETE_CONFIG_FILES = false; - - public String getControlMessageFilePath() { - return controlMessageFilePath; - } - - public void setControlMessageFilePath(String controlMessageFilePath) { - this.controlMessageFilePath = controlMessageFilePath; - } - - private String controlMessageFilePath = null; + private final String controlMessageFilePath; protected AbstractNativeProcess( String jobId, @@ -88,12 +76,7 @@ protected AbstractNativeProcess( this.processPipes = processPipes; this.startTime = ZonedDateTime.now(); this.numberOfFields = numberOfFields; - if (DELETE_CONFIG_FILES) { - this.filesToDelete = filesToDelete; - } else { - LOGGER.info("Following config files will not be deleted: " + filesToDelete); - this.filesToDelete = List.of(); - } + this.filesToDelete = filesToDelete; this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } @@ -366,4 +349,14 @@ public void consumeAndCloseOutputStream() { // Given we are closing down the process there is no point propagating IO exceptions here } } + + @Nullable + public String getControlMessageFilePath() { + return controlMessageFilePath; + } + + public void setControlMessageFilePath(String controlMessageFilePath) { + this.controlMessageFilePath = controlMessageFilePath; + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 5bfa91c987301..df3ce3b54c596 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -51,16 +51,11 @@ public LengthEncodedWriter(OutputStream os) { public LengthEncodedWriter(OutputStream os, String filePath) { outputStream = os; try { - -// FileUtils.recreateTempDirectoryIfNeeded(Path.of(filePath)); -// tmpFilePath = env.c Path file = Path.of(filePath); - logger.info("Opening file: " + filePath + " for writing."); + logger.debug("Opening file: " + filePath + " for writing."); fileOutputStream = Files.newOutputStream(file); } catch (IOException e) { - logger.error("Failed to open file: " + filePath + " for writing."); - // log error message and stack trace - logger.error(e.getMessage(), e); + logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e); fileOutputStream = null; } // This will be used to convert 32 bit integers to network byte order @@ -69,7 +64,6 @@ public LengthEncodedWriter(OutputStream os, String filePath) { // Add public destructor public void close() { - // TODO: this should be moved to AbstractNativeProcess if (fileOutputStream != null) { try { fileOutputStream.close(); From dbd59f4645ec4f78a9ad82b5f6540e356a209bc9 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Tue, 18 Jun 2024 15:14:24 +0200 Subject: [PATCH 3/7] Merge branch 'main' of https://github.com/elastic/elasticsearch into record_ad_values From 7a675ae1ae8b6d60c363d29f7f9259ef4aef7428 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 19 Jun 2024 11:35:51 +0200 Subject: [PATCH 4/7] Formatting and fix syntax error --- .../xpack/ml/job/process/autodetect/AutodetectBuilder.java | 4 ++++ .../process/autodetect/NativeAutodetectProcessFactory.java | 7 +++---- .../xpack/ml/process/writer/LengthEncodedWriter.java | 3 +-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index c8d0219e22623..d733be02a49d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -161,6 +161,10 @@ public void build() throws IOException, InterruptedException { processPipes.addArgs(command); // log the command + if (job.getCustomSettings().containsKey("keep_job_data") && job.getCustomSettings().get("keep_job_data").equals("true")) { + // log the command to reproduce the autodetect run + logger.info("Autodetect process command: " + command); + } logger.info("Starting autodetect process with command: " + command); controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 3375e202514c0..775d17f754a39 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -83,8 +83,8 @@ public AutodetectProcess createAutodetectProcess( ExecutorService executorService, Consumer onProcessCrash ) { - boolean keepJobData = job.getCustomSettings().containsKey("keep_job_data") - && job.getCustomSettings().get("keep_job_data").equals("true") + boolean keepJobData = job.getCustomSettings().containsKey("keep_job_data") + && job.getCustomSettings().get("keep_job_data").equals("true"); List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes( env, @@ -119,12 +119,11 @@ public AutodetectProcess createAutodetectProcess( onProcessCrash ); - try { // check if jobs'custom settings contain the setting 'keep_job_data' // and if it is set to true, then we create the autodetect controll message file // TODO Valeriy: store "keep_job_data" into a constant - if(keepJobData == true) { + if (keepJobData == true) { FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); autodetect.setControlMessageFilePath(controlMsgFile.toString()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index df3ce3b54c596..bb7622640cfd7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -6,10 +6,9 @@ */ package org.elasticsearch.xpack.ml.process.writer; -import org.elasticsearch.logging.Logger; import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; -import org.elasticsearch.xpack.ml.utils.FileUtils; import java.io.IOException; import java.io.OutputStream; From fa1d811a8dff7aedb3a4f016e3575e0ecbcb8d14 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 19 Jun 2024 12:46:44 +0200 Subject: [PATCH 5/7] fix errors --- .../elasticsearch/xpack/ml/process/AbstractNativeProcess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 98547496ab860..5384acda44e82 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -61,7 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; - private final String controlMessageFilePath; + private volatile String controlMessageFilePath; protected AbstractNativeProcess( String jobId, From 12184a05a9e5a7ac597cb90c204d97ae680f187b Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 19 Jun 2024 16:17:12 +0200 Subject: [PATCH 6/7] clean up and fix null pointer error --- .../xpack/core/ml/job/config/Job.java | 13 +++++++++++++ .../job/process/autodetect/AutodetectBuilder.java | 3 +-- .../NativeAutodetectProcessFactory.java | 6 ++---- .../ml/process/writer/LengthEncodedWriter.java | 15 ++++++++------- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 8da0209e10293..00f03c5247162 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -77,6 +77,7 @@ public class Job implements SimpleDiffable, Writeable, ToXContentObject { public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits"); public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField CUSTOM_SETTINGS = new ParseField("custom_settings"); + public static final ParseField KEEP_JOB_DATA = new ParseField("keep_job_data"); public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); @@ -478,6 +479,18 @@ public Map getCustomSettings() { return customSettings; } + public Boolean keepJobData() { + if (customSettings != null && customSettings.containsKey(KEEP_JOB_DATA.getPreferredName())) { + Object value = customSettings.get(KEEP_JOB_DATA.getPreferredName()); + if (value instanceof Boolean) { + return (Boolean) value; + } else if (value instanceof String) { + return "true".equalsIgnoreCase((String) value); + } + } + return false; + } + public String getModelSnapshotId() { return modelSnapshotId; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index d733be02a49d1..70fad6a05e277 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -161,11 +161,10 @@ public void build() throws IOException, InterruptedException { processPipes.addArgs(command); // log the command - if (job.getCustomSettings().containsKey("keep_job_data") && job.getCustomSettings().get("keep_job_data").equals("true")) { + if (job.keepJobData()) { // log the command to reproduce the autodetect run logger.info("Autodetect process command: " + command); } - logger.info("Starting autodetect process with command: " + command); controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 775d17f754a39..85afba0ea766a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -83,8 +83,6 @@ public AutodetectProcess createAutodetectProcess( ExecutorService executorService, Consumer onProcessCrash ) { - boolean keepJobData = job.getCustomSettings().containsKey("keep_job_data") - && job.getCustomSettings().get("keep_job_data").equals("true"); List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes( env, @@ -114,7 +112,7 @@ public AutodetectProcess createAutodetectProcess( nativeController, processPipes, numberOfFields, - (keepJobData == false) ? filesToDelete : new ArrayList<>(), + (job.keepJobData() == false) ? filesToDelete : new ArrayList<>(), resultsParser, onProcessCrash ); @@ -123,7 +121,7 @@ public AutodetectProcess createAutodetectProcess( // check if jobs'custom settings contain the setting 'keep_job_data' // and if it is set to true, then we create the autodetect controll message file // TODO Valeriy: store "keep_job_data" into a constant - if (keepJobData == true) { + if (job.keepJobData()) { FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); autodetect.setControlMessageFilePath(controlMsgFile.toString()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index bb7622640cfd7..949335c1b24b7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -41,18 +41,19 @@ public class LengthEncodedWriter implements RecordWriter { * This object will never close os. */ public LengthEncodedWriter(OutputStream os) { - outputStream = os; - // This will be used to convert 32 bit integers to network byte order - lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) - fileOutputStream = null; + this(os, null); } public LengthEncodedWriter(OutputStream os, String filePath) { outputStream = os; try { - Path file = Path.of(filePath); - logger.debug("Opening file: " + filePath + " for writing."); - fileOutputStream = Files.newOutputStream(file); + if (filePath != null) { + Path file = Path.of(filePath); + logger.info("Opening file: " + filePath + " for writing."); + fileOutputStream = Files.newOutputStream(file); + } else { + fileOutputStream = null; + } } catch (IOException e) { logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e); fileOutputStream = null; From 547655d34ecb918e1b3b970ca10e81dd58472ca7 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:06:44 +0200 Subject: [PATCH 7/7] address reviewer comments and fix forbidden APIs --- .../autodetect/NativeAutodetectProcessFactory.java | 3 +-- .../xpack/ml/process/AbstractNativeProcess.java | 6 +++--- .../xpack/ml/process/writer/LengthEncodedWriter.java | 11 +++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 85afba0ea766a..223ae532ff9db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -120,11 +120,10 @@ public AutodetectProcess createAutodetectProcess( try { // check if jobs'custom settings contain the setting 'keep_job_data' // and if it is set to true, then we create the autodetect controll message file - // TODO Valeriy: store "keep_job_data" into a constant if (job.keepJobData()) { FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); - autodetect.setControlMessageFilePath(controlMsgFile.toString()); + autodetect.setControlMessageFilePath(controlMsgFile); } autodetect.start(executorService, stateProcessor); return autodetect; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 5384acda44e82..da57317b0f0db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -61,7 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; - private volatile String controlMessageFilePath; + private volatile Path controlMessageFilePath; protected AbstractNativeProcess( String jobId, @@ -351,11 +351,11 @@ public void consumeAndCloseOutputStream() { } @Nullable - public String getControlMessageFilePath() { + public Path getControlMessageFilePath() { return controlMessageFilePath; } - public void setControlMessageFilePath(String controlMessageFilePath) { + public void setControlMessageFilePath(Path controlMessageFilePath) { this.controlMessageFilePath = controlMessageFilePath; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 949335c1b24b7..8b115288ac24b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -31,6 +31,8 @@ */ public class LengthEncodedWriter implements RecordWriter { private OutputStream outputStream; + // In case customer setting "keep_job_data" is set to true, we will write the data to a file + // additionally to the output stream. private OutputStream fileOutputStream; private ByteBuffer lengthBuffer; @@ -44,13 +46,12 @@ public LengthEncodedWriter(OutputStream os) { this(os, null); } - public LengthEncodedWriter(OutputStream os, String filePath) { + public LengthEncodedWriter(OutputStream os, Path filePath) { outputStream = os; try { if (filePath != null) { - Path file = Path.of(filePath); logger.info("Opening file: " + filePath + " for writing."); - fileOutputStream = Files.newOutputStream(file); + fileOutputStream = Files.newOutputStream(filePath); } else { fileOutputStream = null; } @@ -68,9 +69,7 @@ public void close() { try { fileOutputStream.close(); } catch (IOException e) { - logger.error("Failed to close file output stream."); - // log error message and stack trace - logger.error(e.getMessage(), e); + logger.error("Failed to close file output stream.", e.getMessage(), e); } } }