Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
* Only one background job can run simultaneously and {@link #onFinish(Runnable)} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
Expand Down Expand Up @@ -85,13 +85,10 @@ public synchronized IndexerState start() {

/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background and in such case {@link #onFinish()} will be called
* as soon as the background job detects that the indexer is stopped. If there
* is no job running when this function is called, the state is directly set to
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
* job was already aborted).
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
IndexerState currentState = state.updateAndGet(previousState -> {
Expand Down Expand Up @@ -148,16 +145,18 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
case STARTED:
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
stats.incrementNumInvocations(1);
onStartJob(now);

if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
onStart(now, () -> {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should catch the exception here, if an error occurs in onStart we should provide an ActionListener to let the caller deals with the error (e.g. call listener.onFailure(e)):

onStart(now,  ActionListener.wrap((o) -> {
                            stats.markStartSearch();
                            doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
                        }, (e) -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we should call onFailure before we save the state to be consistent with onFinish : onFailure(Exception e, Runnable finishAndSaveState) ?

finishWithSearchFailure(e);
logger.error("Indexer failed on start", e);
doSaveState(finishAndSetState(), position.get(), () -> onFailure(e));
}
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
Expand Down Expand Up @@ -199,9 +198,12 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
* internal state is {@link IndexerState#STARTED}.
*
* Implementors MUST ensure that next.run() gets called.
*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
* @param next Runnable for the next phase
*/
protected abstract void onStartJob(long now);
protected abstract void onStart(long now, Runnable next);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're expecting async calls to be fired in onStart we should provide an ActionListener<Void> rather than a Runnable ? This way the implementation can fail the job nicely if an error occurs ?


/**
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
Expand Down Expand Up @@ -248,9 +250,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
protected abstract void onFailure(Exception exc);

/**
* Called when a background job finishes.
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
* {@link IndexerState#STARTED}. The passed runnable trigger persisting and changing the state and can be wrapped to
* execute code after the state has changed.
*
* @param finishAndSetState Runnable for finishing and setting state
*/
protected abstract void onFinish();
protected abstract void onFinish(Runnable finishAndSetState);

/**
* Called when a background job detects that the indexer is aborted causing the
Expand Down Expand Up @@ -315,10 +321,19 @@ private void onSearchResponse(SearchResponse searchResponse) {
if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");

// Change state first, then try to persist. This prevents in-progress
// STOPPING/ABORTING from
// being persisted as STARTED but then stop the job
doSaveState(finishAndSetState(), position.get(), this::onFinish);
// execute finishing tasks
try {
onFinish(() -> {
// Change state first, then try to persist. This prevents in-progress
// STOPPING/ABORTING from
// being persisted as STARTED but then stop the job
doSaveState(finishAndSetState(), position.get(), () -> {});
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rather document the fact that the Runnable provided with onFinish (and maybe onFailure) should always run (even on errors). If we don't expect async calls in onFinish we can also keep the current code and just switch the execution order of onFinish and doSaveState.

logger.error("Indexer failed on finish", e);
doSaveState(finishAndSetState(), position.get(), () -> onFailure(e));
}

return;
}

Expand All @@ -337,6 +352,8 @@ private void onSearchResponse(SearchResponse searchResponse) {
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);

// check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
if (checkState(getState()) == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ protected SearchRequest buildSearchRequest() {
}

@Override
protected void onStartJob(long now) {
protected void onStart(long now, Runnable next) {
assertThat(step, equalTo(0));
++step;
next.run();
}

@Override
Expand All @@ -98,7 +99,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next

@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(4));
assertThat(step, equalTo(5));
++step;
next.run();
}
Expand All @@ -109,10 +110,11 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
assertThat(step, equalTo(5));
protected void onFinish(Runnable finishAndSetState) {
assertThat(step, equalTo(4));
++step;
isFinished.set(true);
finishAndSetState.run();
}

@Override
Expand Down Expand Up @@ -153,9 +155,10 @@ protected SearchRequest buildSearchRequest() {
}

@Override
protected void onStartJob(long now) {
protected void onStart(long now, Runnable next) {
assertThat(step, equalTo(0));
++step;
next.run();
}

@Override
Expand Down Expand Up @@ -183,7 +186,7 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
protected void onFinish(Runnable finishAndSetState) {
fail("should not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ public DataFrameIndexer(Executor executor,
protected abstract Map<String, String> getFieldMappings();

@Override
protected void onStartJob(long now) {
protected void onStart(long now, Runnable next) {
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();

pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
next.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
Expand Down Expand Up @@ -478,9 +478,10 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
protected void onFinish(Runnable finishAndSetState) {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
finishAndSetState.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,21 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID) {
super(executor, initialState, initialPosition, new RollupIndexerJobStats());
this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
}

/**
* Ctr
* @param executor Executor to use to fire the first request of a background job.
* @param job The rollup job
* @param initialState Initial state for the indexer
* @param initialPosition The last indexed bucket of the task
* @param upgradedDocumentID whether job has updated IDs (for BWC)
* @param jobStats jobstats instance for collecting stats
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
super(executor, initialState, initialPosition, jobStats);
this.job = job;
this.compositeBuilder = createCompositeBuilder(job.getConfig());
this.upgradedDocumentID = upgradedDocumentID;
Expand All @@ -94,7 +108,7 @@ protected String getJobId() {
}

@Override
protected void onStartJob(long now) {
protected void onStart(long now, Runnable next) {
// this is needed to exclude buckets that can still receive new documents.
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
long rounded = dateHisto.createRounding().round(now);
Expand All @@ -104,6 +118,7 @@ protected void onStartJob(long now) {
} else {
maxBoundary = rounded;
}
next.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
}

@Override
protected void onFinish() {
protected void onFinish(Runnable finishAndSetState) {
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
finishAndSetState.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ class SyncRollupIndexer extends RollupIndexer {
}

@Override
protected void onFinish() {
protected void onFinish(Runnable finishAndSetState) {
latch.countDown();
finishAndSetState.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.mockito.stubbing.Answer;
Expand All @@ -44,11 +44,18 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;


public class RollupIndexerStateTests extends ESTestCase {
private static class EmptyRollupIndexer extends RollupIndexer {
EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, boolean upgraded, RollupIndexerJobStats stats) {
super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats);
}

EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, boolean upgraded) {
super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded));
Expand Down Expand Up @@ -124,7 +131,9 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {}
protected void onFinish(Runnable finishAndSetState) {
finishAndSetState.run();
}
}

private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer {
Expand All @@ -140,6 +149,11 @@ private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer {
super(executor, job, initialState, initialPosition, randomBoolean());
}

DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, RollupIndexerJobStats stats) {
super(executor, job, initialState, initialPosition, randomBoolean(), stats);
}

private CountDownLatch newLatch() {
return latch = new CountDownLatch(1);
}
Expand Down Expand Up @@ -214,7 +228,9 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {}
protected void onFinish(Runnable finishAndSetState) {
finishAndSetState.run();
}
}

public void testStarted() throws Exception {
Expand Down Expand Up @@ -248,8 +264,8 @@ public void testIndexing() throws Exception {
AtomicBoolean isFinished = new AtomicBoolean(false);
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
@Override
protected void onFinish() {
super.onFinish();
protected void onFinish(Runnable finishAndSetState) {
super.onFinish(finishAndSetState);
isFinished.set(true);
}
};
Expand All @@ -274,23 +290,29 @@ protected void onFinish() {

public void testStateChangeMidTrigger() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);

RollupIndexerJobStats stats = new RollupIndexerJobStats();
RollupIndexerJobStats spyStats = spy(stats);
RollupJobConfig config = mock(RollupJobConfig.class);

// We pull the config before a final state check, so this allows us to flip the state
// We call stats before a final state check, so this allows us to flip the state
// and make sure the appropriate error is thrown
when(config.getGroupConfig()).then((Answer<GroupConfig>) invocationOnMock -> {
Answer<?> forwardAndChangeState = invocation -> {
invocation.callRealMethod();
state.set(IndexerState.STOPPED);
return ConfigTestHelpers.randomGroupConfig(random());
});
return null;
};

doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L);
RollupJob job = new RollupJob(config, Collections.emptyMap());

final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicBoolean isFinished = new AtomicBoolean(false);
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) {
@Override
protected void onFinish() {
super.onFinish();
protected void onFinish(Runnable finishAndSetState) {
super.onFinish(finishAndSetState);
isFinished.set(true);
}
};
Expand Down Expand Up @@ -318,7 +340,7 @@ public void testAbortDuringSearch() throws Exception {
try {
EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) {
@Override
protected void onFinish() {
protected void onFinish(Runnable finishAndSetState) {
fail("Should not have called onFinish");
}

Expand Down