diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index f3d081d3cb792..413b07b7b7d15 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -142,71 +142,72 @@ protected void doNextBulk(BulkRequest request, ActionListener next client, BulkAction.INSTANCE, request, - ActionListener.wrap(bulkResponse -> { - if (bulkResponse.hasFailures()) { - int failureCount = 0; - // dedup the failures by the type of the exception, as they most likely have the same cause - Map deduplicatedFailures = new LinkedHashMap<>(); - - for (BulkItemResponse item : bulkResponse.getItems()) { - if (item.isFailed()) { - deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item); - failureCount++; - } - } - - // note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)} - - // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure - // Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments - // the indexing failure counter - // and possibly retries) - Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses( - deduplicatedFailures.values() - ); - if (irrecoverableException == null) { - String failureMessage = getBulkIndexDetailedFailureMessage("Significant failures: ", deduplicatedFailures); - logger.debug("[{}] Bulk index experienced [{}] failures. {}", getJobId(), failureCount, failureMessage); - - Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause(); - nextPhase.onFailure( - new BulkIndexingException( - "Bulk index experienced [{}] failures. {}", - firstException, - false, - failureCount, - failureMessage - ) - ); - } else { - deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName()); - String failureMessage = getBulkIndexDetailedFailureMessage("Other failures: ", deduplicatedFailures); - irrecoverableException = decorateBulkIndexException(irrecoverableException); + ActionListener.wrap(bulkResponse -> handleBulkResponse(bulkResponse, nextPhase), nextPhase::onFailure) + ); + } - logger.debug( - "[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}", - getJobId(), - failureCount, - ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), - failureMessage - ); + protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener nextPhase) { + if (bulkResponse.hasFailures() == false) { + // We don't know the of failures that have occurred (searching, processing, indexing, etc.), + // but if we search, process and bulk index then we have + // successfully processed an entire page of the transform and should reset the counter, even if we are in the middle + // of a checkpoint + context.resetReasonAndFailureCounter(); + nextPhase.onResponse(bulkResponse); + return; + } + int failureCount = 0; + // dedup the failures by the type of the exception, as they most likely have the same cause + Map deduplicatedFailures = new LinkedHashMap<>(); + + for (BulkItemResponse item : bulkResponse.getItems()) { + if (item.isFailed()) { + deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item); + failureCount++; + } + } - nextPhase.onFailure( - new BulkIndexingException( - "Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}", - irrecoverableException, - true, - failureCount, - ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), - failureMessage - ) - ); - } - } else { - nextPhase.onResponse(bulkResponse); - } - }, nextPhase::onFailure) + // note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)} + + // This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure + // Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments + // the indexing failure counter + // and possibly retries) + Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses( + deduplicatedFailures.values() ); + if (irrecoverableException == null) { + String failureMessage = getBulkIndexDetailedFailureMessage("Significant failures: ", deduplicatedFailures); + logger.debug("[{}] Bulk index experienced [{}] failures. {}", getJobId(), failureCount, failureMessage); + + Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause(); + nextPhase.onFailure( + new BulkIndexingException("Bulk index experienced [{}] failures. {}", firstException, false, failureCount, failureMessage) + ); + } else { + deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName()); + String failureMessage = getBulkIndexDetailedFailureMessage("Other failures: ", deduplicatedFailures); + irrecoverableException = decorateBulkIndexException(irrecoverableException); + + logger.debug( + "[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}", + getJobId(), + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ); + + nextPhase.onFailure( + new BulkIndexingException( + "Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}", + irrecoverableException, + true, + failureCount, + ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException), + failureMessage + ) + ); + } } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index ba5cbf943109b..798950f57bada 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -146,7 +146,7 @@ public TransformIndexer( this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); - this.progress = progress != null ? progress : new TransformProgress(); + this.progress = transformProgress != null ? transformProgress : new TransformProgress(); this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index d8225e93a428a..ec19616095ee3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; +import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.junit.After; import org.junit.Before; @@ -64,6 +65,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -91,7 +93,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { private Client client; private ThreadPool threadPool; - class MockedTransformIndexer extends TransformIndexer { + static class MockedTransformIndexer extends ClientTransformIndexer { private final Function searchFunction; private final Function bulkFunction; @@ -127,14 +129,17 @@ class MockedTransformIndexer extends TransformIndexer { mock(SchedulerEngine.class) ), checkpointProvider, - transformConfig, initialState, initialPosition, + mock(Client.class), jobStats, + transformConfig, /* TransformProgress */ null, TransformCheckpoint.EMPTY, TransformCheckpoint.EMPTY, - context + new SeqNoPrimaryTermAndIndex(1, 1, "foo"), + context, + false ); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; @@ -189,7 +194,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next try { BulkResponse response = bulkFunction.apply(request); - nextPhase.onResponse(response); + super.handleBulkResponse(response, nextPhase); } catch (Exception e) { nextPhase.onFailure(e); } @@ -254,7 +259,7 @@ void doGetInitialProgress(SearchRequest request, ActionListener } @Override - void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener responseListener) { + protected void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener responseListener) { try { BulkByScrollResponse response = deleteByQueryFunction.apply(deleteByQueryRequest); responseListener.onResponse(response); @@ -264,7 +269,7 @@ void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener responseListener) { + protected void refreshDestinationIndex(ActionListener responseListener) { responseListener.onResponse(new RefreshResponse(1, 1, 0, Collections.emptyList())); } @@ -274,7 +279,7 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene } @Override - void persistState(TransformState state, ActionListener listener) { + protected void persistState(TransformState state, ActionListener listener) { listener.onResponse(null); } } @@ -711,6 +716,116 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce assertEquals(1, context.getFailureCount()); } + public void testFailureCounterIsResetOnSuccess() throws Exception { + String transformId = randomAlphaOfLength(10); + TransformConfig config = new TransformConfig( + transformId, + randomSourceConfig(), + randomDestConfig(), + null, + null, + null, + randomPivotConfig(), + null, + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + null, + null, + null, + null + ); + + final SearchResponse searchResponse = new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + "", + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = new Function<>() { + final AtomicInteger calls = new AtomicInteger(0); + + @Override + public SearchResponse apply(SearchRequest searchRequest) { + int call = calls.getAndIncrement(); + if (call == 0) { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } + ); + } + return searchResponse; + } + }; + + Function bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1); + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + Consumer failureConsumer = message -> { + failIndexerCalled.compareAndSet(false, true); + failureMessage.compareAndSet(null, message); + }; + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = mock(TransformContext.Listener.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + failureConsumer, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); + + final CountDownLatch latch = indexer.newLatch(1); + + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertEquals(1, context.getFailureCount()); + + final CountDownLatch secondLatch = indexer.newLatch(1); + + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + + secondLatch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + auditor.assertAllExpectationsMatched(); + assertEquals(0, context.getFailureCount()); + } + private MockedTransformIndexer createMockIndexer( TransformConfig config, AtomicReference state,