Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
new DataFrameTransformTask.ClientDataFrameIndexerBuilder()
.setAuditor(auditor)
.setClient(client)
.setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState())
.setIndexerState(currentIndexerState(transformState))
.setInitialPosition(transformState == null ? null : transformState.getPosition())
// If the state is `null` that means this is a "first run". We can safely assume the
// task will attempt to gather the initial progress information
Expand Down Expand Up @@ -184,6 +184,26 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener);
}

private static IndexerState currentIndexerState(DataFrameTransformState previousState) {
if (previousState == null) {
return IndexerState.STOPPED;
}
switch(previousState.getIndexerState()){
// If it is STARTED or INDEXING we want to make sure we revert to started
// Otherwise, the internal indexer will never get scheduled and execute
case STARTED:
case INDEXING:
return IndexerState.STARTED;
// If we are STOPPED, STOPPING, or ABORTING and just started executing on this node,
// then it is safe to say we should be STOPPED
case STOPPED:
case STOPPING:
case ABORTING:
default:
return IndexerState.STOPPED;
}
}

private void markAsFailed(DataFrameTransformTask task, String reason) {
CountDownLatch latch = new CountDownLatch(1);

Expand Down