Skip to content

Commit e10f779

Browse files
author
Hendrik Muhs
authored
refactor onStart and onFinish to take runnables and executed them guarded by state (#40855)
refactor onStart and onFinish to take action listeners and execute them when indexer is in indexing state.
1 parent 59e9721 commit e10f779

File tree

8 files changed

+131
-68
lines changed

8 files changed

+131
-68
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
2424
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
25-
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
25+
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
2626
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
2727
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
2828
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
@@ -85,13 +85,10 @@ public synchronized IndexerState start() {
8585

8686
/**
8787
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
88-
* running in the background and in such case {@link #onFinish()} will be called
89-
* as soon as the background job detects that the indexer is stopped. If there
90-
* is no job running when this function is called, the state is directly set to
91-
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
88+
* running in the background. If there is no job running when this function is
89+
* called, the state is directly set to {@link IndexerState#STOPPED}.
9290
*
93-
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
94-
* job was already aborted).
91+
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9592
*/
9693
public synchronized IndexerState stop() {
9794
IndexerState currentState = state.updateAndGet(previousState -> {
@@ -148,17 +145,17 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
148145
case STARTED:
149146
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
150147
stats.incrementNumInvocations(1);
151-
onStartJob(now);
152148

153149
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
154150
// fire off the search. Note this is async, the method will return from here
155151
executor.execute(() -> {
156-
try {
152+
onStart(now, ActionListener.wrap(r -> {
157153
stats.markStartSearch();
158154
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
159-
} catch (Exception e) {
160-
finishWithSearchFailure(e);
161-
}
155+
}, e -> {
156+
finishAndSetState();
157+
onFailure(e);
158+
}));
162159
});
163160
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
164161
return true;
@@ -200,8 +197,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
200197
* internal state is {@link IndexerState#STARTED}.
201198
*
202199
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
200+
* @param listener listener to call after done
203201
*/
204-
protected abstract void onStartJob(long now);
202+
protected abstract void onStart(long now, ActionListener<Void> listener);
205203

206204
/**
207205
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
@@ -248,9 +246,12 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
248246
protected abstract void onFailure(Exception exc);
249247

250248
/**
251-
* Called when a background job finishes.
249+
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
250+
* {@link IndexerState#STARTED}.
251+
*
252+
* @param listener listener to call after done
252253
*/
253-
protected abstract void onFinish();
254+
protected abstract void onFinish(ActionListener<Void> listener);
254255

255256
/**
256257
* Called when a background job detects that the indexer is aborted causing the
@@ -315,10 +316,11 @@ private void onSearchResponse(SearchResponse searchResponse) {
315316
if (iterationResult.isDone()) {
316317
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
317318

318-
// Change state first, then try to persist. This prevents in-progress
319-
// STOPPING/ABORTING from
320-
// being persisted as STARTED but then stop the job
321-
doSaveState(finishAndSetState(), position.get(), this::onFinish);
319+
// execute finishing tasks
320+
onFinish(ActionListener.wrap(
321+
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
322+
e -> doSaveState(finishAndSetState(), position.get(), () -> {})));
323+
322324
return;
323325
}
324326

@@ -337,6 +339,8 @@ private void onSearchResponse(SearchResponse searchResponse) {
337339
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
338340
}
339341
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
342+
343+
// check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
340344
if (checkState(getState()) == false) {
341345
return;
342346
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ protected SearchRequest buildSearchRequest() {
7676
}
7777

7878
@Override
79-
protected void onStartJob(long now) {
79+
protected void onStart(long now, ActionListener<Void> listener) {
8080
assertThat(step, equalTo(0));
8181
++step;
82+
listener.onResponse(null);
8283
}
8384

8485
@Override
@@ -98,7 +99,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
9899

99100
@Override
100101
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
101-
assertThat(step, equalTo(4));
102+
assertThat(step, equalTo(5));
102103
++step;
103104
next.run();
104105
}
@@ -109,10 +110,11 @@ protected void onFailure(Exception exc) {
109110
}
110111

111112
@Override
112-
protected void onFinish() {
113-
assertThat(step, equalTo(5));
113+
protected void onFinish(ActionListener<Void> listener) {
114+
assertThat(step, equalTo(4));
114115
++step;
115116
isFinished.set(true);
117+
listener.onResponse(null);
116118
}
117119

118120
@Override
@@ -153,9 +155,10 @@ protected SearchRequest buildSearchRequest() {
153155
}
154156

155157
@Override
156-
protected void onStartJob(long now) {
158+
protected void onStart(long now, ActionListener<Void> listener) {
157159
assertThat(step, equalTo(0));
158160
++step;
161+
listener.onResponse(null);
159162
}
160163

161164
@Override
@@ -170,20 +173,18 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
170173

171174
@Override
172175
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
173-
assertThat(step, equalTo(2));
174-
++step;
175-
next.run();
176+
fail("should not be called");
176177
}
177178

178179
@Override
179180
protected void onFailure(Exception exc) {
180-
assertThat(step, equalTo(3));
181+
assertThat(step, equalTo(2));
181182
++step;
182183
isFinished.set(true);
183184
}
184185

185186
@Override
186-
protected void onFinish() {
187+
protected void onFinish(ActionListener<Void> listener) {
187188
fail("should not be called");
188189
}
189190

@@ -240,8 +241,8 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
240241
indexer.start();
241242
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
242243
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
243-
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get()));
244-
assertThat(indexer.getStep(), equalTo(4));
244+
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
245+
assertThat(indexer.getStep(), equalTo(3));
245246

246247
} finally {
247248
executor.shutdownNow();

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.action.search.SearchRequest;
1314
import org.elasticsearch.action.search.SearchResponse;
@@ -51,10 +52,14 @@ public DataFrameIndexer(Executor executor,
5152
protected abstract Map<String, String> getFieldMappings();
5253

5354
@Override
54-
protected void onStartJob(long now) {
55-
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
56-
57-
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
55+
protected void onStart(long now, ActionListener<Void> listener) {
56+
try {
57+
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
58+
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
59+
listener.onResponse(null);
60+
} catch (Exception e) {
61+
listener.onFailure(e);
62+
}
5863
}
5964

6065
@Override

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
import org.elasticsearch.xpack.core.common.notifications.Auditor;
2828
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
2929
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
30-
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
31-
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
3230
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
3331
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
3432
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
33+
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
3534
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
35+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
3636
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
3737
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
3838
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
@@ -478,9 +478,14 @@ protected void onFailure(Exception exc) {
478478
}
479479

480480
@Override
481-
protected void onFinish() {
482-
auditor.info(transform.getId(), "Finished indexing for data frame transform");
483-
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
481+
protected void onFinish(ActionListener<Void> listener) {
482+
try {
483+
auditor.info(transform.getId(), "Finished indexing for data frame transform");
484+
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
485+
listener.onResponse(null);
486+
} catch (Exception e) {
487+
listener.onFailure(e);
488+
}
484489
}
485490

486491
@Override

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.rollup.job;
77

8+
import org.elasticsearch.action.ActionListener;
89
import org.elasticsearch.action.search.SearchRequest;
910
import org.elasticsearch.action.search.SearchResponse;
1011
import org.elasticsearch.common.unit.TimeValue;
@@ -75,7 +76,21 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
7576
*/
7677
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
7778
AtomicBoolean upgradedDocumentID) {
78-
super(executor, initialState, initialPosition, new RollupIndexerJobStats());
79+
this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
80+
}
81+
82+
/**
83+
* Ctr
84+
* @param executor Executor to use to fire the first request of a background job.
85+
* @param job The rollup job
86+
* @param initialState Initial state for the indexer
87+
* @param initialPosition The last indexed bucket of the task
88+
* @param upgradedDocumentID whether job has updated IDs (for BWC)
89+
* @param jobStats jobstats instance for collecting stats
90+
*/
91+
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
92+
AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
93+
super(executor, initialState, initialPosition, jobStats);
7994
this.job = job;
8095
this.compositeBuilder = createCompositeBuilder(job.getConfig());
8196
this.upgradedDocumentID = upgradedDocumentID;
@@ -94,15 +109,20 @@ protected String getJobId() {
94109
}
95110

96111
@Override
97-
protected void onStartJob(long now) {
98-
// this is needed to exclude buckets that can still receive new documents.
99-
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
100-
long rounded = dateHisto.createRounding().round(now);
101-
if (dateHisto.getDelay() != null) {
102-
// if the job has a delay we filter all documents that appear before it.
103-
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
104-
} else {
105-
maxBoundary = rounded;
112+
protected void onStart(long now, ActionListener<Void> listener) {
113+
try {
114+
// this is needed to exclude buckets that can still receive new documents.
115+
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
116+
long rounded = dateHisto.createRounding().round(now);
117+
if (dateHisto.getDelay() != null) {
118+
// if the job has a delay we filter all documents that appear before it.
119+
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
120+
} else {
121+
maxBoundary = rounded;
122+
}
123+
listener.onResponse(null);
124+
} catch (Exception e) {
125+
listener.onFailure(e);
106126
}
107127
}
108128

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,9 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
138138
}
139139

140140
@Override
141-
protected void onFinish() {
141+
protected void onFinish(ActionListener<Void> listener) {
142142
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
143+
listener.onResponse(null);
143144
}
144145

145146
@Override

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,8 +581,9 @@ class SyncRollupIndexer extends RollupIndexer {
581581
}
582582

583583
@Override
584-
protected void onFinish() {
584+
protected void onFinish(ActionListener<Void> listener) {
585585
latch.countDown();
586+
listener.onResponse(null);
586587
}
587588

588589
@Override

0 commit comments

Comments
 (0)