Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ public class AutodetectCommunicator implements Closeable {
&& job.getAnalysisConfig().getCategorizationFieldName() != null;
}

public void init(ModelSnapshot modelSnapshot) throws IOException {
public void restoreState(ModelSnapshot modelSnapshot) {
autodetectProcess.restoreState(stateStreamer, modelSnapshot);
createProcessWriter(Optional.empty()).writeHeader();
}

private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
Expand All @@ -100,6 +99,17 @@ private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDe
dataCountsReporter, xContentRegistry);
}

/**
* This must be called once before {@link #writeToJob(InputStream, AnalysisRegistry, XContentType, DataLoadParams, BiConsumer)}
* can be used
*/
public void writeHeader() throws IOException {
createProcessWriter(Optional.empty()).writeHeader();
}

/**
* Call {@link #writeHeader()} exactly once before using this method
*/
public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistry, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
submitOperation(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ protected void doRun() {

try {
createProcessAndSetRunning(processContext, job, params, closeHandler);
processContext.getAutodetectCommunicator().init(params.modelSnapshot());
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
// No need to log here as the persistent task framework will log it
Expand Down Expand Up @@ -508,14 +508,15 @@ protected void doRun() {
private void createProcessAndSetRunning(ProcessContext processContext,
Job job,
AutodetectParams params,
BiConsumer<Exception, Boolean> handler) {
BiConsumer<Exception, Boolean> handler) throws IOException {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
// but the actual process is hanging alive.
processContext.tryLock();
try {
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
communicator.writeHeader();
processContext.setRunning(communicator);
} finally {
// Now that the process is running and we have updated its state we can unlock.
Expand Down Expand Up @@ -646,7 +647,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
processContext.tryLock();
try {
if (processContext.setDying() == false) {
logger.debug("Cannot close job [{}] as it has already been closed", jobId);
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
return;
}

Expand Down