diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 5fde9a1cac60e..8fd170520d34d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -8,13 +8,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.Nullable; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -25,6 +23,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; +import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -50,31 +49,46 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer fieldMappings; + private Pivot pivot; private int pageSize = 0; public DataFrameIndexer(Executor executor, DataFrameAuditor auditor, + DataFrameTransformConfig transformConfig, + Map fieldMappings, AtomicReference initialState, Map initialPosition, - DataFrameIndexerTransformStats jobStats) { + DataFrameIndexerTransformStats jobStats, + DataFrameTransformProgress transformProgress) { super(executor, initialState, initialPosition, jobStats); this.auditor = Objects.requireNonNull(auditor); + this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); + this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); + this.progress = transformProgress; } - protected abstract DataFrameTransformConfig getConfig(); - - protected abstract Map getFieldMappings(); - - @Nullable - protected abstract DataFrameTransformProgress getProgress(); - protected abstract void failIndexer(String message); public int getPageSize() { return pageSize; } + public DataFrameTransformConfig getConfig() { + return transformConfig; + } + + public Map getFieldMappings() { + return fieldMappings; + } + + public DataFrameTransformProgress getProgress() { + return progress; + } + /** * Request a checkpoint */ @@ -119,8 +133,8 @@ protected IterationResult> doProcess(SearchResponse searchRe IterationResult> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(), agg.getBuckets().isEmpty()); - if (getProgress() != null) { - getProgress().docsProcessed(getStats().getNumDocuments() - docsBeforeProcess); + if (progress != null) { + progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess); } return result; } @@ -215,14 +229,14 @@ protected boolean handleCircuitBreakingException(Exception e) { */ private static CircuitBreakingException getCircuitBreakingException(Exception e) { // circuit breaking exceptions are at the bottom - Throwable unwrappedThrowable = ExceptionsHelper.unwrapCause(e); + Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e); if (unwrappedThrowable instanceof CircuitBreakingException) { return (CircuitBreakingException) unwrappedThrowable; } else if (unwrappedThrowable instanceof SearchPhaseExecutionException) { SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e; for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) { - Throwable unwrappedShardFailure = ExceptionsHelper.unwrapCause(shardFailure.getCause()); + Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause()); if (unwrappedShardFailure instanceof CircuitBreakingException) { return (CircuitBreakingException) unwrappedShardFailure; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 15a555da48859..2020300a0cf77 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -444,11 +444,7 @@ static class ClientDataFrameIndexer extends DataFrameIndexer { private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; - private final DataFrameAuditor auditor; private final DataFrameTransformTask transformTask; - private final Map fieldMappings; - private final DataFrameTransformConfig transformConfig; - private volatile DataFrameTransformProgress progress; private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null; private final AtomicInteger failureCount; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index @@ -470,19 +466,18 @@ static class ClientDataFrameIndexer extends DataFrameIndexer { .threadPool .executor(ThreadPool.Names.GENERIC), ExceptionsHelper.requireNonNull(auditor, "auditor"), + transformConfig, + fieldMappings, ExceptionsHelper.requireNonNull(initialState, "initialState"), initialPosition, - initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats); + initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats, + transformProgress); this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId"); this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService, "transformsCheckpointService"); this.client = ExceptionsHelper.requireNonNull(client, "client"); - this.auditor = auditor; - this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.transformTask = parentTask; - this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); - this.progress = transformProgress; this.failureCount = new AtomicInteger(0); } @@ -510,21 +505,6 @@ protected void onStart(long now, ActionListener listener) { } } - @Override - protected DataFrameTransformConfig getConfig() { - return transformConfig; - } - - @Override - protected Map getFieldMappings() { - return fieldMappings; - } - - @Override - protected DataFrameTransformProgress getProgress() { - return progress; - } - @Override protected String getJobId() { return transformId; diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index 015eb4b65e3b0..f3f3255f07a6d 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; @@ -32,7 +31,6 @@ import java.util.Collections; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -48,13 +46,9 @@ public class DataFrameIndexerTests extends ESTestCase { private Client client; - private static final String TEST_ORIGIN = "test_origin"; - private static final String TEST_INDEX = "test_index"; class MockedDataFrameIndexer extends DataFrameIndexer { - private final DataFrameTransformConfig transformConfig; - private final Map fieldMappings; private final Function searchFunction; private final Function bulkFunction; private final Consumer failureConsumer; @@ -73,9 +67,8 @@ class MockedDataFrameIndexer extends DataFrameIndexer { Function searchFunction, Function bulkFunction, Consumer failureConsumer) { - super(executor, auditor, initialState, initialPosition, jobStats); - this.transformConfig = Objects.requireNonNull(transformConfig); - this.fieldMappings = Objects.requireNonNull(fieldMappings); + super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats, + /* DataFrameTransformProgress */ null); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; @@ -85,21 +78,6 @@ public CountDownLatch newLatch(int count) { return latch = new CountDownLatch(count); } - @Override - protected DataFrameTransformConfig getConfig() { - return transformConfig; - } - - @Override - protected Map getFieldMappings() { - return fieldMappings; - } - - @Override - protected DataFrameTransformProgress getProgress() { - return null; - } - @Override protected void createCheckpoint(ActionListener listener) { listener.onResponse(null);