diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 01999eff64c61..d0f15197c3cca 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -114,7 +114,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr new DataFrameTransformTask.ClientDataFrameIndexerBuilder() .setAuditor(auditor) .setClient(client) - .setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState()) + .setIndexerState(currentIndexerState(transformState)) .setInitialPosition(transformState == null ? null : transformState.getPosition()) // If the state is `null` that means this is a "first run". We can safely assume the // task will attempt to gather the initial progress information @@ -184,6 +184,26 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener); } + private static IndexerState currentIndexerState(DataFrameTransformState previousState) { + if (previousState == null) { + return IndexerState.STOPPED; + } + switch(previousState.getIndexerState()){ + // If it is STARTED or INDEXING we want to make sure we revert to started + // Otherwise, the internal indexer will never get scheduled and execute + case STARTED: + case INDEXING: + return IndexerState.STARTED; + // If we are STOPPED, STOPPING, or ABORTING and just started executing on this node, + // then it is safe to say we should be STOPPED + case STOPPED: + case STOPPING: + case ABORTING: + default: + return IndexerState.STOPPED; + } + } + private void markAsFailed(DataFrameTransformTask task, String reason) { CountDownLatch latch = new CountDownLatch(1);