Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -25,6 +23,7 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
Expand All @@ -50,31 +49,46 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,

protected final DataFrameAuditor auditor;

protected final DataFrameTransformConfig transformConfig;
protected volatile DataFrameTransformProgress progress;
private final Map<String, String> fieldMappings;

private Pivot pivot;
private int pageSize = 0;

public DataFrameIndexer(Executor executor,
DataFrameAuditor auditor,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerTransformStats jobStats) {
DataFrameIndexerTransformStats jobStats,
DataFrameTransformProgress transformProgress) {
super(executor, initialState, initialPosition, jobStats);
this.auditor = Objects.requireNonNull(auditor);
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
}

protected abstract DataFrameTransformConfig getConfig();

protected abstract Map<String, String> getFieldMappings();

@Nullable
protected abstract DataFrameTransformProgress getProgress();

protected abstract void failIndexer(String message);

public int getPageSize() {
return pageSize;
}

public DataFrameTransformConfig getConfig() {
return transformConfig;
}

public Map<String, String> getFieldMappings() {
return fieldMappings;
}

public DataFrameTransformProgress getProgress() {
return progress;
}

/**
* Request a checkpoint
*/
Expand Down Expand Up @@ -119,8 +133,8 @@ protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchRe
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
agg.afterKey(),
agg.getBuckets().isEmpty());
if (getProgress() != null) {
getProgress().docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
if (progress != null) {
progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
}
return result;
}
Expand Down Expand Up @@ -215,14 +229,14 @@ protected boolean handleCircuitBreakingException(Exception e) {
*/
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = ExceptionsHelper.unwrapCause(e);
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e);

if (unwrappedThrowable instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedThrowable;
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = ExceptionsHelper.unwrapCause(shardFailure.getCause());
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());

if (unwrappedShardFailure instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedShardFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,7 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final DataFrameAuditor auditor;
private final DataFrameTransformTask transformTask;
private final Map<String, String> fieldMappings;
private final DataFrameTransformConfig transformConfig;
private volatile DataFrameTransformProgress progress;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
private final AtomicInteger failureCount;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
Expand All @@ -470,19 +466,18 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
.threadPool
.executor(ThreadPool.Names.GENERIC),
ExceptionsHelper.requireNonNull(auditor, "auditor"),
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
initialPosition,
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats);
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats,
transformProgress);
this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId");
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService,
"transformsCheckpointService");
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.auditor = auditor;
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.transformTask = parentTask;
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.failureCount = new AtomicInteger(0);
}

Expand Down Expand Up @@ -510,21 +505,6 @@ protected void onStart(long now, ActionListener<Void> listener) {
}
}

@Override
protected DataFrameTransformConfig getConfig() {
return transformConfig;
}

@Override
protected Map<String, String> getFieldMappings() {
return fieldMappings;
}

@Override
protected DataFrameTransformProgress getProgress() {
return progress;
}

@Override
protected String getJobId() {
return transformId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import org.junit.Before;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand All @@ -48,13 +46,9 @@
public class DataFrameIndexerTests extends ESTestCase {

private Client client;
private static final String TEST_ORIGIN = "test_origin";
private static final String TEST_INDEX = "test_index";

class MockedDataFrameIndexer extends DataFrameIndexer {

private final DataFrameTransformConfig transformConfig;
private final Map<String, String> fieldMappings;
private final Function<SearchRequest, SearchResponse> searchFunction;
private final Function<BulkRequest, BulkResponse> bulkFunction;
private final Consumer<Exception> failureConsumer;
Expand All @@ -73,9 +67,8 @@ class MockedDataFrameIndexer extends DataFrameIndexer {
Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction,
Consumer<Exception> failureConsumer) {
super(executor, auditor, initialState, initialPosition, jobStats);
this.transformConfig = Objects.requireNonNull(transformConfig);
this.fieldMappings = Objects.requireNonNull(fieldMappings);
super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats,
/* DataFrameTransformProgress */ null);
this.searchFunction = searchFunction;
this.bulkFunction = bulkFunction;
this.failureConsumer = failureConsumer;
Expand All @@ -85,21 +78,6 @@ public CountDownLatch newLatch(int count) {
return latch = new CountDownLatch(count);
}

@Override
protected DataFrameTransformConfig getConfig() {
return transformConfig;
}

@Override
protected Map<String, String> getFieldMappings() {
return fieldMappings;
}

@Override
protected DataFrameTransformProgress getProgress() {
return null;
}

@Override
protected void createCheckpoint(ActionListener<Void> listener) {
listener.onResponse(null);
Expand Down