Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
Expand Down Expand Up @@ -85,13 +85,10 @@ public synchronized IndexerState start() {

/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background and in such case {@link #onFinish()} will be called
* as soon as the background job detects that the indexer is stopped. If there
* is no job running when this function is called, the state is directly set to
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
* job was already aborted).
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
IndexerState currentState = state.updateAndGet(previousState -> {
Expand Down Expand Up @@ -148,17 +145,17 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
case STARTED:
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
stats.incrementNumInvocations(1);
onStartJob(now);

if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
onStart(now, ActionListener.wrap(r -> {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} catch (Exception e) {
finishWithSearchFailure(e);
}
}, e -> {
finishAndSetState();
onFailure(e);
}));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
Expand Down Expand Up @@ -200,8 +197,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
* internal state is {@link IndexerState#STARTED}.
*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
* @param listener listener to call after done
*/
protected abstract void onStartJob(long now);
protected abstract void onStart(long now, ActionListener<Void> listener);

/**
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
Expand Down Expand Up @@ -248,9 +246,12 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
protected abstract void onFailure(Exception exc);

/**
* Called when a background job finishes.
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
* {@link IndexerState#STARTED}.
*
* @param listener listener to call after done
*/
protected abstract void onFinish();
protected abstract void onFinish(ActionListener<Void> listener);

/**
* Called when a background job detects that the indexer is aborted causing the
Expand Down Expand Up @@ -315,10 +316,11 @@ private void onSearchResponse(SearchResponse searchResponse) {
if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");

// Change state first, then try to persist. This prevents in-progress
// STOPPING/ABORTING from
// being persisted as STARTED but then stop the job
doSaveState(finishAndSetState(), position.get(), this::onFinish);
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> onFailure(e))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we already called onFinish I don't think we should call onFailure if saving the state throws an error. It's not ideal but I'd prefer that we ignore the error and just finish the job.


return;
}

Expand All @@ -337,6 +339,8 @@ private void onSearchResponse(SearchResponse searchResponse) {
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);

// check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
if (checkState(getState()) == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ protected SearchRequest buildSearchRequest() {
}

@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
}

@Override
Expand All @@ -98,7 +99,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next

@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(4));
assertThat(step, equalTo(5));
++step;
next.run();
}
Expand All @@ -109,10 +110,11 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
assertThat(step, equalTo(5));
protected void onFinish(ActionListener<Void> listener) {
assertThat(step, equalTo(4));
++step;
isFinished.set(true);
listener.onResponse(null);
}

@Override
Expand Down Expand Up @@ -153,9 +155,10 @@ protected SearchRequest buildSearchRequest() {
}

@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
}

@Override
Expand Down Expand Up @@ -183,7 +186,7 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
fail("should not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -51,10 +52,14 @@ public DataFrameIndexer(Executor executor,
protected abstract Map<String, String> getFieldMappings();

@Override
protected void onStartJob(long now) {
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();

pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
protected void onStart(long now, ActionListener<Void> listener) {
try {
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
Expand Down Expand Up @@ -478,9 +478,14 @@ protected void onFailure(Exception exc) {
}

@Override
protected void onFinish() {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
protected void onFinish(ActionListener<Void> listener) {
try {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup.job;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -75,7 +76,21 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID) {
super(executor, initialState, initialPosition, new RollupIndexerJobStats());
this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
}

/**
* Ctr
* @param executor Executor to use to fire the first request of a background job.
* @param job The rollup job
* @param initialState Initial state for the indexer
* @param initialPosition The last indexed bucket of the task
* @param upgradedDocumentID whether job has updated IDs (for BWC)
* @param jobStats jobstats instance for collecting stats
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
super(executor, initialState, initialPosition, jobStats);
this.job = job;
this.compositeBuilder = createCompositeBuilder(job.getConfig());
this.upgradedDocumentID = upgradedDocumentID;
Expand All @@ -94,15 +109,20 @@ protected String getJobId() {
}

@Override
protected void onStartJob(long now) {
// this is needed to exclude buckets that can still receive new documents.
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
long rounded = dateHisto.createRounding().round(now);
if (dateHisto.getDelay() != null) {
// if the job has a delay we filter all documents that appear before it.
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
} else {
maxBoundary = rounded;
protected void onStart(long now, ActionListener<Void> listener) {
try {
// this is needed to exclude buckets that can still receive new documents.
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
long rounded = dateHisto.createRounding().round(now);
if (dateHisto.getDelay() != null) {
// if the job has a delay we filter all documents that appear before it.
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
} else {
maxBoundary = rounded;
}
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
}

@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ class SyncRollupIndexer extends RollupIndexer {
}

@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
latch.countDown();
listener.onResponse(null);
}

@Override
Expand Down
Loading