Skip to content

Commit 6628b5c

Browse files
committed
[ML][Transforms] allow executor to call start on started task
1 parent a4ed7b1 commit 6628b5c

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ private void startTask(DataFrameTransformTask buildTask,
305305
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
306306
buildTask.initializeIndexer(indexerBuilder);
307307
// DataFrameTransformTask#start will fail if the task state is FAILED
308-
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
308+
// Will continue to attempt to start the indexer, even if the state is STARTED
309+
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
309310
}
310311

311312
private void setNumFailureRetries(int numFailureRetries) {

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,13 +219,8 @@ public void getCheckpointingInfo(DataFrameTransformsCheckpointService transforms
219219
));
220220
}
221221

222-
/**
223-
* Start the background indexer and set the task's state to started
224-
* @param startingCheckpoint Set the current checkpoint to this value. If null the
225-
* current checkpoint is not set
226-
* @param listener Started listener
227-
*/
228-
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
222+
// Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
223+
synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
229224
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
230225
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
231226
listener.onFailure(new ElasticsearchStatusException(
@@ -249,7 +244,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
249244
return;
250245
}
251246
// If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
252-
if (taskState.get() == DataFrameTransformTaskState.STARTED) {
247+
if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) {
253248
listener.onFailure(new ElasticsearchStatusException(
254249
"Cannot start transform [{}] as it is already STARTED.",
255250
RestStatus.CONFLICT,
@@ -260,7 +255,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
260255
final IndexerState newState = getIndexer().start();
261256
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
262257
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
263-
transform.getId(), newState));
258+
transform.getId(), newState));
264259
return;
265260
}
266261
stateReason.set(null);
@@ -298,10 +293,20 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
298293
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
299294
getIndexer().stop();
300295
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
301-
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
296+
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
302297
}
303298
));
304299
}
300+
/**
301+
* Start the background indexer and set the task's state to started
302+
* @param startingCheckpoint Set the current checkpoint to this value. If null the
303+
* current checkpoint is not set
304+
* @param force Whether to force start a failed task or not
305+
* @param listener Started listener
306+
*/
307+
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
308+
start(startingCheckpoint, force, true, listener);
309+
}
305310

306311
public synchronized void stop(boolean force) {
307312
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.apache.http.entity.ContentType;
99
import org.apache.http.entity.StringEntity;
10-
import org.apache.lucene.util.LuceneTestCase;
1110
import org.elasticsearch.Version;
1211
import org.elasticsearch.client.Request;
1312
import org.elasticsearch.client.Response;
@@ -52,7 +51,6 @@
5251
import static org.hamcrest.Matchers.hasSize;
5352
import static org.hamcrest.Matchers.oneOf;
5453

55-
@LuceneTestCase.AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/46341")
5654
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
5755

5856
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));

0 commit comments

Comments
 (0)