Skip to content

Commit 0d6cbf9

Browse files
davidkylebenwtrent
authored andcommitted
[ML Data Frame] Refactor stop logic (#42644)
* Revert "invalid test" This reverts commit 9dd8b52. * Testing * mend * Revert "[ML Data Frame] Mute Data Frame tests" This reverts commit 5d837fa. * Call onStop and onAbort outside atomic update * Don’t update CS * Tidying up * Remove invalid test that asserted logic that has been removed * Add stopped event * Revert "Add stopped event" This reverts commit 02ba992. * Adding check for STOPPED in saveState
1 parent 04cf4a1 commit 0d6cbf9

File tree

12 files changed

+87
-87
lines changed

12 files changed

+87
-87
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,28 +90,21 @@ public synchronized IndexerState start() {
9090
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
9191
* running in the background, {@link #onStop()} will be called when the background job
9292
* detects that the indexer is stopped.
93-
* If there is no job running when this function is called
94-
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
93+
* If there is no job running when this function is called the returned
94+
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
9595
*
9696
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9797
*/
9898
public synchronized IndexerState stop() {
99-
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
100-
IndexerState currentState = state.updateAndGet(previousState -> {
99+
return state.updateAndGet(previousState -> {
101100
if (previousState == IndexerState.INDEXING) {
102101
return IndexerState.STOPPING;
103102
} else if (previousState == IndexerState.STARTED) {
104-
wasStartedAndSetStopped.set(true);
105103
return IndexerState.STOPPED;
106104
} else {
107105
return previousState;
108106
}
109107
});
110-
111-
if (wasStartedAndSetStopped.get()) {
112-
onStop();
113-
}
114-
return currentState;
115108
}
116109

117110
/**
@@ -288,20 +281,22 @@ private void finishWithIndexingFailure(Exception exc) {
288281
}
289282

290283
private IndexerState finishAndSetState() {
291-
return state.updateAndGet(prev -> {
284+
AtomicBoolean callOnStop = new AtomicBoolean(false);
285+
AtomicBoolean callOnAbort = new AtomicBoolean(false);
286+
IndexerState updatedState = state.updateAndGet(prev -> {
292287
switch (prev) {
293288
case INDEXING:
294289
// ready for another job
295290
return IndexerState.STARTED;
296291

297292
case STOPPING:
293+
callOnStop.set(true);
298294
// must be started again
299-
onStop();
300295
return IndexerState.STOPPED;
301296

302297
case ABORTING:
298+
callOnAbort.set(true);
303299
// abort and exit
304-
onAbort();
305300
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
306301

307302
case STOPPED:
@@ -316,6 +311,14 @@ private IndexerState finishAndSetState() {
316311
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
317312
}
318313
});
314+
315+
if (callOnStop.get()) {
316+
onStop();
317+
} else if (callOnAbort.get()) {
318+
onAbort();
319+
}
320+
321+
return updatedState;
319322
}
320323

321324
private void onSearchResponse(SearchResponse searchResponse) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
268268
}
269269
}
270270

271-
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
272-
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
273-
final ExecutorService executor = Executors.newFixedThreadPool(1);
274-
try {
275-
CountDownLatch countDownLatch = new CountDownLatch(1);
276-
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
277-
indexer.start();
278-
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
279-
countDownLatch.countDown();
280-
assertTrue(awaitBusy(() -> isFinished.get()));
281-
282-
indexer.stop();
283-
assertTrue(isStopped.get());
284-
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
285-
} finally {
286-
executor.shutdownNow();
287-
}
288-
}
289-
290271
public void testStop_WhileIndexing() throws InterruptedException {
291272
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
292273
final ExecutorService executor = Executors.newFixedThreadPool(1);

x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException {
3030
cleanUp();
3131
}
3232

33-
@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
3433
public void testDataFrameTransformCrud() throws Exception {
3534
createReviewsIndex();
3635

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameAuditorIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.client.Request;
1110
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
1211
import org.junit.Before;
@@ -23,7 +22,6 @@
2322
import static org.hamcrest.Matchers.empty;
2423
import static org.hamcrest.Matchers.is;
2524

26-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2725
public class DataFrameAuditorIT extends DataFrameRestTestCase {
2826

2927
private static final String TEST_USER_NAME = "df_admin_plus_data";

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.apache.http.entity.ContentType;
1010
import org.apache.http.entity.StringEntity;
11-
import org.apache.lucene.util.LuceneTestCase;
1211
import org.elasticsearch.client.Request;
1312
import org.elasticsearch.client.Response;
1413
import org.elasticsearch.client.ResponseException;
@@ -23,7 +22,6 @@
2322

2423
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
2524

26-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2725
public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {
2826

2927
/**

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.client.Request;
1110
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1211
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
@@ -22,7 +21,6 @@
2221
import static org.hamcrest.Matchers.equalTo;
2322
import static org.hamcrest.Matchers.greaterThan;
2423

25-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2624
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
2725

2826
private static final String TEST_USER_NAME = "df_user";

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.Version;
1110
import org.elasticsearch.client.Request;
1211
import org.elasticsearch.client.Response;
@@ -16,7 +15,6 @@
1615
import java.io.IOException;
1716
import java.util.Map;
1817

19-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2018
public class DataFrameMetaDataIT extends DataFrameRestTestCase {
2119

2220
private boolean indicesCreated = false;

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.client.Request;
1110
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1211
import org.junit.Before;
@@ -22,7 +21,6 @@
2221
import static org.hamcrest.Matchers.containsString;
2322
import static org.hamcrest.Matchers.equalTo;
2423

25-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2624
public class DataFramePivotRestIT extends DataFrameRestTestCase {
2725

2826
private static final String TEST_USER_NAME = "df_admin_plus_data";

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.client.ResponseException;
1110
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1211
import org.elasticsearch.rest.RestStatus;
@@ -20,7 +19,6 @@
2019
import static org.hamcrest.CoreMatchers.nullValue;
2120
import static org.hamcrest.Matchers.equalTo;
2221

23-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2422
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
2523

2624
public void testDummy() {

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9-
import org.apache.lucene.util.LuceneTestCase;
109
import org.elasticsearch.client.Request;
1110
import org.elasticsearch.client.Response;
1211
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -23,7 +22,6 @@
2322
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
2423
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;
2524

26-
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
2725
public class DataFrameUsageIT extends DataFrameRestTestCase {
2826
private boolean indicesCreated = false;
2927

0 commit comments

Comments
 (0)