From aaa7338d31305495e9804257a3ee3858aa79ba0d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 4 Apr 2019 15:04:02 +0200 Subject: [PATCH 1/8] refactor to call beforeFinish before the indexer changes state --- .../core/indexing/AsyncTwoPhaseIndexer.java | 23 +++++++++++-------- .../indexing/AsyncTwoPhaseIndexerTests.java | 8 +++---- .../transforms/DataFrameTransformTask.java | 2 +- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index df8eeb71e61b1..c8344288e618f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,7 +22,7 @@ /** * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). - * Only one background job can run simultaneously and {@link #onFinish()} is called when the job + * Only one background job can run simultaneously and {@link #beforeFinish()} is called when the job * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. @@ -85,13 +85,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background and in such case {@link #onFinish()} will be called - * as soon as the background job detects that the indexer is stopped. If there - * is no job running when this function is called, the state is directly set to - * {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called. + * running in the background. If there is no job running when this function is + * called, the state is directly set to {@link IndexerState#STOPPED}. * - * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the - * job was already aborted). + * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ public synchronized IndexerState stop() { IndexerState currentState = state.updateAndGet(previousState -> { @@ -248,9 +245,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { protected abstract void onFailure(Exception exc); /** - * Called when a background job finishes. + * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to + * {@link IndexerState#STARTED} */ - protected abstract void onFinish(); + protected abstract void beforeFinish(); /** * Called when a background job detects that the indexer is aborted causing the @@ -315,10 +313,13 @@ private void onSearchResponse(SearchResponse searchResponse) { if (iterationResult.isDone()) { logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); + // execute finishing tasks + beforeFinish(); + // Change state first, then try to persist. This prevents in-progress // STOPPING/ABORTING from // being persisted as STARTED but then stop the job - doSaveState(finishAndSetState(), position.get(), this::onFinish); + doSaveState(finishAndSetState(), position.get(), () -> {}); return; } @@ -337,6 +338,8 @@ private void onSearchResponse(SearchResponse searchResponse) { logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage()); } stats.incrementNumOutputDocuments(bulkResponse.getItems().length); + + // check if indexer has been asked to stop, state {@link IndexerState#STOPPING} if (checkState(getState()) == false) { return; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index cfbac18dc9787..0543388f2100c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -98,7 +98,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - assertThat(step, equalTo(4)); + assertThat(step, equalTo(5)); ++step; next.run(); } @@ -109,8 +109,8 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish() { - assertThat(step, equalTo(5)); + protected void beforeFinish() { + assertThat(step, equalTo(4)); ++step; isFinished.set(true); } @@ -183,7 +183,7 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish() { + protected void beforeFinish() { fail("should not be called"); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 23884afec3348..426008dd86018 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -478,7 +478,7 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish() { + protected void beforeFinish() { auditor.info(transform.getId(), "Finished indexing for data frame transform"); logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); } From 7d6d5301291346c8b2c97698e9a25e9ac91731ec Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 4 Apr 2019 16:48:25 +0200 Subject: [PATCH 2/8] fix incomplete refactoring --- .../xpack/rollup/job/RollupJobTask.java | 2 +- .../rollup/job/RollupIndexerIndexingTests.java | 2 +- .../xpack/rollup/job/RollupIndexerStateTests.java | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f545ab049d44d..26a9614244719 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -138,7 +138,7 @@ protected void doSaveState(IndexerState indexerState, Map positi } @Override - protected void onFinish() { + protected void beforeFinish() { logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]"); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index bb3ff17dbc1db..4894bd9acf2f0 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -581,7 +581,7 @@ class SyncRollupIndexer extends RollupIndexer { } @Override - protected void onFinish() { + protected void beforeFinish() { latch.countDown(); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 37b3fd84ef145..4ed558a0ae740 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -124,7 +124,7 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish() {} + protected void beforeFinish() {} } private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer { @@ -214,7 +214,7 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish() {} + protected void beforeFinish() {} } public void testStarted() throws Exception { @@ -248,8 +248,8 @@ public void testIndexing() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { @Override - protected void onFinish() { - super.onFinish(); + protected void beforeFinish() { + super.beforeFinish(); isFinished.set(true); } }; @@ -289,8 +289,8 @@ public void testStateChangeMidTrigger() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { @Override - protected void onFinish() { - super.onFinish(); + protected void beforeFinish() { + super.beforeFinish(); isFinished.set(true); } }; @@ -318,7 +318,7 @@ public void testAbortDuringSearch() throws Exception { try { EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) { @Override - protected void onFinish() { + protected void beforeFinish() { fail("Should not have called onFinish"); } From 3a69a1ed29078be7c559652b2ae0de4e9c05c3b9 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 11:40:52 +0200 Subject: [PATCH 3/8] refactor onStart and onFinish to take runnables --- .../core/indexing/AsyncTwoPhaseIndexer.java | 40 ++++++++++----- .../indexing/AsyncTwoPhaseIndexerTests.java | 11 ++-- .../transforms/DataFrameIndexer.java | 3 +- .../transforms/DataFrameTransformTask.java | 7 +-- .../xpack/rollup/job/RollupIndexer.java | 19 ++++++- .../xpack/rollup/job/RollupJobTask.java | 3 +- .../job/RollupIndexerIndexingTests.java | 3 +- .../rollup/job/RollupIndexerStateTests.java | 50 +++++++++++++------ 8 files changed, 97 insertions(+), 39 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index c8344288e618f..a394ed4155625 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,7 +22,7 @@ /** * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). - * Only one background job can run simultaneously and {@link #beforeFinish()} is called when the job + * Only one background job can run simultaneously and {@link #onFinish()} is called when the job * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. @@ -145,16 +145,18 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { case STARTED: logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]"); stats.incrementNumInvocations(1); - onStartJob(now); if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) { // fire off the search. Note this is async, the method will return from here executor.execute(() -> { try { - stats.markStartSearch(); - doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + onStart(now, () -> { + stats.markStartSearch(); + doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + }); } catch (Exception e) { - finishWithSearchFailure(e); + logger.error("Indexer failed on start", e); + doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)); } }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); @@ -196,9 +198,12 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the * internal state is {@link IndexerState#STARTED}. * + * Implementors MUST ensure that next.run() gets called. + * * @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)} + * @param next Runnable for the next phase */ - protected abstract void onStartJob(long now); + protected abstract void onStart(long now, Runnable next); /** * Executes the {@link SearchRequest} and calls nextPhase with the @@ -246,9 +251,12 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to - * {@link IndexerState#STARTED} + * {@link IndexerState#STARTED}. The passed runnable trigger persisting and changing the state and can be wrapped to + * execute code after the state has changed. + * + * @param finishAndSetState Runnable for finishing and setting state */ - protected abstract void beforeFinish(); + protected abstract void onFinish(Runnable finishAndSetState); /** * Called when a background job detects that the indexer is aborted causing the @@ -314,12 +322,18 @@ private void onSearchResponse(SearchResponse searchResponse) { logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); // execute finishing tasks - beforeFinish(); + try { + onFinish(() -> { + // Change state first, then try to persist. This prevents in-progress + // STOPPING/ABORTING from + // being persisted as STARTED but then stop the job + doSaveState(finishAndSetState(), position.get(), () -> {}); + }); + } catch (Exception e) { + logger.error("Indexer failed on finish", e); + doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)); + } - // Change state first, then try to persist. This prevents in-progress - // STOPPING/ABORTING from - // being persisted as STARTED but then stop the job - doSaveState(finishAndSetState(), position.get(), () -> {}); return; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 0543388f2100c..3c7d6fb161d82 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -76,9 +76,10 @@ protected SearchRequest buildSearchRequest() { } @Override - protected void onStartJob(long now) { + protected void onStart(long now, Runnable next) { assertThat(step, equalTo(0)); ++step; + next.run(); } @Override @@ -109,10 +110,11 @@ protected void onFailure(Exception exc) { } @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { assertThat(step, equalTo(4)); ++step; isFinished.set(true); + finishAndSetState.run(); } @Override @@ -153,9 +155,10 @@ protected SearchRequest buildSearchRequest() { } @Override - protected void onStartJob(long now) { + protected void onStart(long now, Runnable next) { assertThat(step, equalTo(0)); ++step; + next.run(); } @Override @@ -183,7 +186,7 @@ protected void onFailure(Exception exc) { } @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { fail("should not be called"); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 090a9c9cfccc0..d14452714c7a2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -51,10 +51,11 @@ public DataFrameIndexer(Executor executor, protected abstract Map getFieldMappings(); @Override - protected void onStartJob(long now) { + protected void onStart(long now, Runnable next) { QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig()); + next.run(); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 426008dd86018..b2f9a077159c8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -27,12 +27,12 @@ import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; @@ -478,9 +478,10 @@ protected void onFailure(Exception exc) { } @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { auditor.info(transform.getId(), "Finished indexing for data frame transform"); logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); + finishAndSetState.run(); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 1d5f9093a29df..9eaf355ad5471 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -75,7 +75,21 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer initialState, Map initialPosition, AtomicBoolean upgradedDocumentID) { - super(executor, initialState, initialPosition, new RollupIndexerJobStats()); + this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats()); + } + + /** + * Ctr + * @param executor Executor to use to fire the first request of a background job. + * @param job The rollup job + * @param initialState Initial state for the indexer + * @param initialPosition The last indexed bucket of the task + * @param upgradedDocumentID whether job has updated IDs (for BWC) + * @param jobStats jobstats instance for collecting stats + */ + RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, + AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) { + super(executor, initialState, initialPosition, jobStats); this.job = job; this.compositeBuilder = createCompositeBuilder(job.getConfig()); this.upgradedDocumentID = upgradedDocumentID; @@ -94,7 +108,7 @@ protected String getJobId() { } @Override - protected void onStartJob(long now) { + protected void onStart(long now, Runnable next) { // this is needed to exclude buckets that can still receive new documents. DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); long rounded = dateHisto.createRounding().round(now); @@ -104,6 +118,7 @@ protected void onStartJob(long now) { } else { maxBoundary = rounded; } + next.run(); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 26a9614244719..44ddf535a3f04 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -138,8 +138,9 @@ protected void doSaveState(IndexerState indexerState, Map positi } @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]"); + finishAndSetState.run(); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 4894bd9acf2f0..9b14fa9a6c198 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -581,8 +581,9 @@ class SyncRollupIndexer extends RollupIndexer { } @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { latch.countDown(); + finishAndSetState.run(); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 4ed558a0ae740..68d9034e9bd06 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; -import org.elasticsearch.xpack.core.rollup.job.GroupConfig; +import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.mockito.stubbing.Answer; @@ -44,11 +44,18 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; + public class RollupIndexerStateTests extends ESTestCase { private static class EmptyRollupIndexer extends RollupIndexer { + EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition, boolean upgraded, RollupIndexerJobStats stats) { + super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats); + } + EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded) { super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded)); @@ -124,7 +131,9 @@ protected void onFailure(Exception exc) { } @Override - protected void beforeFinish() {} + protected void onFinish(Runnable finishAndSetState) { + finishAndSetState.run(); + } } private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer { @@ -140,6 +149,11 @@ private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer { super(executor, job, initialState, initialPosition, randomBoolean()); } + DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition, RollupIndexerJobStats stats) { + super(executor, job, initialState, initialPosition, randomBoolean(), stats); + } + private CountDownLatch newLatch() { return latch = new CountDownLatch(1); } @@ -214,7 +228,9 @@ protected void onFailure(Exception exc) { } @Override - protected void beforeFinish() {} + protected void onFinish(Runnable finishAndSetState) { + finishAndSetState.run(); + } } public void testStarted() throws Exception { @@ -248,8 +264,8 @@ public void testIndexing() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { @Override - protected void beforeFinish() { - super.beforeFinish(); + protected void onFinish(Runnable finishAndSetState) { + super.onFinish(finishAndSetState); isFinished.set(true); } }; @@ -274,23 +290,29 @@ protected void beforeFinish() { public void testStateChangeMidTrigger() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + + RollupIndexerJobStats stats = new RollupIndexerJobStats(); + RollupIndexerJobStats spyStats = spy(stats); RollupJobConfig config = mock(RollupJobConfig.class); - // We pull the config before a final state check, so this allows us to flip the state + // We call stats before a final state check, so this allows us to flip the state // and make sure the appropriate error is thrown - when(config.getGroupConfig()).then((Answer) invocationOnMock -> { + Answer forwardAndChangeState = invocation -> { + invocation.callRealMethod(); state.set(IndexerState.STOPPED); - return ConfigTestHelpers.randomGroupConfig(random()); - }); + return null; + }; + + doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L); RollupJob job = new RollupJob(config, Collections.emptyMap()); final ExecutorService executor = Executors.newFixedThreadPool(1); try { AtomicBoolean isFinished = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) { @Override - protected void beforeFinish() { - super.beforeFinish(); + protected void onFinish(Runnable finishAndSetState) { + super.onFinish(finishAndSetState); isFinished.set(true); } }; @@ -318,7 +340,7 @@ public void testAbortDuringSearch() throws Exception { try { EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) { @Override - protected void beforeFinish() { + protected void onFinish(Runnable finishAndSetState) { fail("Should not have called onFinish"); } From f8d9ca3516c74e6b00864115162b93290a6efd49 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 12:06:40 +0200 Subject: [PATCH 4/8] fix doc link --- .../elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index a394ed4155625..7919144bbf7c9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,7 +22,7 @@ /** * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). - * Only one background job can run simultaneously and {@link #onFinish()} is called when the job + * Only one background job can run simultaneously and {@link #onFinish(Runnable)} is called when the job * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. From 803860838ff4213c419c724a117fdb551ae8e42d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 17:41:15 +0200 Subject: [PATCH 5/8] replace runables with actionlisteners --- .../core/indexing/AsyncTwoPhaseIndexer.java | 42 ++++++------------- .../indexing/AsyncTwoPhaseIndexerTests.java | 14 +++---- .../transforms/DataFrameIndexer.java | 14 ++++--- .../transforms/DataFrameTransformTask.java | 12 ++++-- .../xpack/rollup/job/RollupIndexer.java | 25 ++++++----- .../xpack/rollup/job/RollupJobTask.java | 4 +- .../job/RollupIndexerIndexingTests.java | 4 +- .../rollup/job/RollupIndexerStateTests.java | 26 +++++++----- 8 files changed, 71 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 7919144bbf7c9..4ee6c4b8fca18 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,7 +22,7 @@ /** * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). - * Only one background job can run simultaneously and {@link #onFinish(Runnable)} is called when the job + * Only one background job can run simultaneously and {@link #onFinish} is called when the job * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. @@ -149,15 +149,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) { // fire off the search. Note this is async, the method will return from here executor.execute(() -> { - try { - onStart(now, () -> { - stats.markStartSearch(); - doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); - }); - } catch (Exception e) { - logger.error("Indexer failed on start", e); - doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)); - } + onStart(now, ActionListener.wrap(r -> { + stats.markStartSearch(); + doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + }, e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)))); }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); return true; @@ -198,12 +193,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the * internal state is {@link IndexerState#STARTED}. * - * Implementors MUST ensure that next.run() gets called. - * * @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)} - * @param next Runnable for the next phase + * @param listener listener to call after done */ - protected abstract void onStart(long now, Runnable next); + protected abstract void onStart(long now, ActionListener listener); /** * Executes the {@link SearchRequest} and calls nextPhase with the @@ -251,12 +244,11 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to - * {@link IndexerState#STARTED}. The passed runnable trigger persisting and changing the state and can be wrapped to - * execute code after the state has changed. + * {@link IndexerState#STARTED}. * - * @param finishAndSetState Runnable for finishing and setting state + * @param listener listener to call after done */ - protected abstract void onFinish(Runnable finishAndSetState); + protected abstract void onFinish(ActionListener listener); /** * Called when a background job detects that the indexer is aborted causing the @@ -322,17 +314,9 @@ private void onSearchResponse(SearchResponse searchResponse) { logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); // execute finishing tasks - try { - onFinish(() -> { - // Change state first, then try to persist. This prevents in-progress - // STOPPING/ABORTING from - // being persisted as STARTED but then stop the job - doSaveState(finishAndSetState(), position.get(), () -> {}); - }); - } catch (Exception e) { - logger.error("Indexer failed on finish", e); - doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)); - } + onFinish(ActionListener.wrap( + r -> doSaveState(finishAndSetState(), position.get(), () -> {}), + e -> doSaveState(finishAndSetState(), position.get(), () -> {}))); return; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 3c7d6fb161d82..255c6ae8aee49 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -76,10 +76,10 @@ protected SearchRequest buildSearchRequest() { } @Override - protected void onStart(long now, Runnable next) { + protected void onStart(long now, ActionListener listener) { assertThat(step, equalTo(0)); ++step; - next.run(); + listener.onResponse(null); } @Override @@ -110,11 +110,11 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish(Runnable finishAndSetState) { + protected void onFinish(ActionListener listener) { assertThat(step, equalTo(4)); ++step; isFinished.set(true); - finishAndSetState.run(); + listener.onResponse(null); } @Override @@ -155,10 +155,10 @@ protected SearchRequest buildSearchRequest() { } @Override - protected void onStart(long now, Runnable next) { + protected void onStart(long now, ActionListener listener) { assertThat(step, equalTo(0)); ++step; - next.run(); + listener.onResponse(null); } @Override @@ -186,7 +186,7 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish(Runnable finishAndSetState) { + protected void onFinish(ActionListener listener) { fail("should not be called"); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index d14452714c7a2..238e531bba93d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -51,11 +52,14 @@ public DataFrameIndexer(Executor executor, protected abstract Map getFieldMappings(); @Override - protected void onStart(long now, Runnable next) { - QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); - - pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig()); - next.run(); + protected void onStart(long now, ActionListener listener) { + try { + QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); + pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig()); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b2f9a077159c8..a4f3df8cbe5d0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -478,10 +478,14 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish(Runnable finishAndSetState) { - auditor.info(transform.getId(), "Finished indexing for data frame transform"); - logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); - finishAndSetState.run(); + protected void onFinish(ActionListener listener) { + try { + auditor.info(transform.getId(), "Finished indexing for data frame transform"); + logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 9eaf355ad5471..e051e912c482b 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.rollup.job; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; @@ -108,17 +109,21 @@ protected String getJobId() { } @Override - protected void onStart(long now, Runnable next) { - // this is needed to exclude buckets that can still receive new documents. - DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); - long rounded = dateHisto.createRounding().round(now); - if (dateHisto.getDelay() != null) { - // if the job has a delay we filter all documents that appear before it. - maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis(); - } else { - maxBoundary = rounded; + protected void onStart(long now, ActionListener listener) { + try { + // this is needed to exclude buckets that can still receive new documents. + DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); + long rounded = dateHisto.createRounding().round(now); + if (dateHisto.getDelay() != null) { + // if the job has a delay we filter all documents that appear before it. + maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis(); + } else { + maxBoundary = rounded; + } + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); } - next.run(); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 44ddf535a3f04..fecda3a2ce24b 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -138,9 +138,9 @@ protected void doSaveState(IndexerState indexerState, Map positi } @Override - protected void onFinish(Runnable finishAndSetState) { + protected void onFinish(ActionListener listener) { logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]"); - finishAndSetState.run(); + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 9b14fa9a6c198..743d1d94e6040 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -581,9 +581,9 @@ class SyncRollupIndexer extends RollupIndexer { } @Override - protected void onFinish(Runnable finishAndSetState) { + protected void onFinish(ActionListener listener) { latch.countDown(); - finishAndSetState.run(); + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 68d9034e9bd06..b529a87027ee0 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -131,8 +131,8 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish(Runnable finishAndSetState) { - finishAndSetState.run(); + protected void onFinish(ActionListener listener) { + listener.onResponse(null); } } @@ -228,8 +228,8 @@ protected void onFailure(Exception exc) { } @Override - protected void onFinish(Runnable finishAndSetState) { - finishAndSetState.run(); + protected void onFinish(ActionListener listener) { + listener.onResponse(null); } } @@ -264,9 +264,11 @@ public void testIndexing() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { @Override - protected void onFinish(Runnable finishAndSetState) { - super.onFinish(finishAndSetState); - isFinished.set(true); + protected void onFinish(ActionListener listener) { + super.onFinish(ActionListener.wrap(r -> { + isFinished.set(true); + listener.onResponse(r); + }, listener::onFailure)); } }; final CountDownLatch latch = indexer.newLatch(); @@ -311,9 +313,11 @@ public void testStateChangeMidTrigger() throws Exception { AtomicBoolean isFinished = new AtomicBoolean(false); DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) { @Override - protected void onFinish(Runnable finishAndSetState) { - super.onFinish(finishAndSetState); - isFinished.set(true); + protected void onFinish(ActionListener listener) { + super.onFinish(ActionListener.wrap(r -> { + isFinished.set(true); + listener.onResponse(r); + }, listener::onFailure)); } }; final CountDownLatch latch = indexer.newLatch(); @@ -340,7 +344,7 @@ public void testAbortDuringSearch() throws Exception { try { EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) { @Override - protected void onFinish(Runnable finishAndSetState) { + protected void onFinish(ActionListener listener) { fail("Should not have called onFinish"); } From 673150b098ef6332d1923dc2a9e0f373ca270c7b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 17:58:43 +0200 Subject: [PATCH 6/8] do not save state if failure happens on start, call on failure if failure happens on finish --- .../xpack/core/indexing/AsyncTwoPhaseIndexer.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 4ee6c4b8fca18..da174e1c9cf57 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -152,7 +152,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { onStart(now, ActionListener.wrap(r -> { stats.markStartSearch(); doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); - }, e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)))); + }, e -> { + finishAndSetState(); + onFailure(e); + })); }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); return true; @@ -316,7 +319,7 @@ private void onSearchResponse(SearchResponse searchResponse) { // execute finishing tasks onFinish(ActionListener.wrap( r -> doSaveState(finishAndSetState(), position.get(), () -> {}), - e -> doSaveState(finishAndSetState(), position.get(), () -> {}))); + e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)))); return; } From ceb6948b143cfc26384e012d91061dfab98d94c4 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 18:19:58 +0200 Subject: [PATCH 7/8] do not call onFailure after a failure in onFinish --- .../elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index da174e1c9cf57..e859e0db754ba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -319,7 +319,7 @@ private void onSearchResponse(SearchResponse searchResponse) { // execute finishing tasks onFinish(ActionListener.wrap( r -> doSaveState(finishAndSetState(), position.get(), () -> {}), - e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e)))); + e -> doSaveState(finishAndSetState(), position.get(), () -> {}))); return; } From 088cafc52cc116b147b02ae3efb9bd7ba785999a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 5 Apr 2019 19:02:05 +0200 Subject: [PATCH 8/8] fix unit test --- .../xpack/core/indexing/AsyncTwoPhaseIndexerTests.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 255c6ae8aee49..9e79912f85115 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -173,14 +173,12 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - assertThat(step, equalTo(2)); - ++step; - next.run(); + fail("should not be called"); } @Override protected void onFailure(Exception exc) { - assertThat(step, equalTo(3)); + assertThat(step, equalTo(2)); ++step; isFinished.set(true); } @@ -243,8 +241,8 @@ public void testStateMachineBrokenSearch() throws InterruptedException { indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertTrue(ESTestCase.awaitBusy(() -> isFinished.get())); - assertThat(indexer.getStep(), equalTo(4)); + assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); + assertThat(indexer.getStep(), equalTo(3)); } finally { executor.shutdownNow();