Skip to content

Commit dfdf9fb

Browse files
committed
[ML][Transform] reset failure count when a transform aggregation page is handled successfully (elastic#76355)
Failure count should not only be reset at checkpoints. Checkpoints could have many pages of data. Consequently, we should reset the failure count once we handle a single composite aggregation page. This way, the transform won't mark itself as failed erroneously when it has actually succeeded searches + indexing results within the same checkpoint. closes elastic#76074
1 parent 3fb3fb3 commit dfdf9fb

File tree

2 files changed

+184
-67
lines changed

2 files changed

+184
-67
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 63 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -133,71 +133,72 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
133133
client,
134134
BulkAction.INSTANCE,
135135
request,
136-
ActionListener.wrap(bulkResponse -> {
137-
if (bulkResponse.hasFailures()) {
138-
int failureCount = 0;
139-
// dedup the failures by the type of the exception, as they most likely have the same cause
140-
Map<String, BulkItemResponse> deduplicatedFailures = new LinkedHashMap<>();
141-
142-
for (BulkItemResponse item : bulkResponse.getItems()) {
143-
if (item.isFailed()) {
144-
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
145-
failureCount++;
146-
}
147-
}
148-
149-
// note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)}
150-
151-
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
152-
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
153-
// the indexing failure counter
154-
// and possibly retries)
155-
Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
156-
deduplicatedFailures.values()
157-
);
158-
if (irrecoverableException == null) {
159-
String failureMessage = getBulkIndexDetailedFailureMessage("Significant failures: ", deduplicatedFailures);
160-
logger.debug("[{}] Bulk index experienced [{}] failures. {}", getJobId(), failureCount, failureMessage);
161-
162-
Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause();
163-
nextPhase.onFailure(
164-
new BulkIndexingException(
165-
"Bulk index experienced [{}] failures. {}",
166-
firstException,
167-
false,
168-
failureCount,
169-
failureMessage
170-
)
171-
);
172-
} else {
173-
deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName());
174-
String failureMessage = getBulkIndexDetailedFailureMessage("Other failures: ", deduplicatedFailures);
175-
irrecoverableException = decorateBulkIndexException(irrecoverableException);
136+
ActionListener.wrap(bulkResponse -> handleBulkResponse(bulkResponse, nextPhase), nextPhase::onFailure)
137+
);
138+
}
176139

177-
logger.debug(
178-
"[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}",
179-
getJobId(),
180-
failureCount,
181-
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
182-
failureMessage
183-
);
140+
protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener<BulkResponse> nextPhase) {
141+
if (bulkResponse.hasFailures() == false) {
142+
// We don't know the of failures that have occurred (searching, processing, indexing, etc.),
143+
// but if we search, process and bulk index then we have
144+
// successfully processed an entire page of the transform and should reset the counter, even if we are in the middle
145+
// of a checkpoint
146+
context.resetReasonAndFailureCounter();
147+
nextPhase.onResponse(bulkResponse);
148+
return;
149+
}
150+
int failureCount = 0;
151+
// dedup the failures by the type of the exception, as they most likely have the same cause
152+
Map<String, BulkItemResponse> deduplicatedFailures = new LinkedHashMap<>();
153+
154+
for (BulkItemResponse item : bulkResponse.getItems()) {
155+
if (item.isFailed()) {
156+
deduplicatedFailures.putIfAbsent(item.getFailure().getCause().getClass().getSimpleName(), item);
157+
failureCount++;
158+
}
159+
}
184160

185-
nextPhase.onFailure(
186-
new BulkIndexingException(
187-
"Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}",
188-
irrecoverableException,
189-
true,
190-
failureCount,
191-
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
192-
failureMessage
193-
)
194-
);
195-
}
196-
} else {
197-
nextPhase.onResponse(bulkResponse);
198-
}
199-
}, nextPhase::onFailure)
161+
// note: bulk failures are audited/logged in {@link TransformIndexer#handleFailure(Exception)}
162+
163+
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
164+
// Determine whether the failure is irrecoverable (transform should go into failed state) or not (transform increments
165+
// the indexing failure counter
166+
// and possibly retries)
167+
Throwable irrecoverableException = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(
168+
deduplicatedFailures.values()
200169
);
170+
if (irrecoverableException == null) {
171+
String failureMessage = getBulkIndexDetailedFailureMessage("Significant failures: ", deduplicatedFailures);
172+
logger.debug("[{}] Bulk index experienced [{}] failures. {}", getJobId(), failureCount, failureMessage);
173+
174+
Exception firstException = deduplicatedFailures.values().iterator().next().getFailure().getCause();
175+
nextPhase.onFailure(
176+
new BulkIndexingException("Bulk index experienced [{}] failures. {}", firstException, false, failureCount, failureMessage)
177+
);
178+
} else {
179+
deduplicatedFailures.remove(irrecoverableException.getClass().getSimpleName());
180+
String failureMessage = getBulkIndexDetailedFailureMessage("Other failures: ", deduplicatedFailures);
181+
irrecoverableException = decorateBulkIndexException(irrecoverableException);
182+
183+
logger.debug(
184+
"[{}] Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}",
185+
getJobId(),
186+
failureCount,
187+
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
188+
failureMessage
189+
);
190+
191+
nextPhase.onFailure(
192+
new BulkIndexingException(
193+
"Bulk index experienced [{}] failures and at least 1 irrecoverable [{}]. {}",
194+
irrecoverableException,
195+
true,
196+
failureCount,
197+
ExceptionRootCauseFinder.getDetailedMessage(irrecoverableException),
198+
failureMessage
199+
)
200+
);
201+
}
201202
}
202203

203204
@Override

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
5151
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
5252
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
53+
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
5354
import org.junit.After;
5455
import org.junit.Before;
5556

@@ -60,6 +61,7 @@
6061
import java.util.concurrent.CountDownLatch;
6162
import java.util.concurrent.TimeUnit;
6263
import java.util.concurrent.atomic.AtomicBoolean;
64+
import java.util.concurrent.atomic.AtomicInteger;
6365
import java.util.concurrent.atomic.AtomicReference;
6466
import java.util.function.Consumer;
6567
import java.util.function.Function;
@@ -86,7 +88,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
8688
private Client client;
8789
private ThreadPool threadPool;
8890

89-
class MockedTransformIndexer extends TransformIndexer {
91+
static class MockedTransformIndexer extends ClientTransformIndexer {
9092

9193
private final Function<SearchRequest, SearchResponse> searchFunction;
9294
private final Function<BulkRequest, BulkResponse> bulkFunction;
@@ -121,11 +123,15 @@ class MockedTransformIndexer extends TransformIndexer {
121123
transformConfig,
122124
initialState,
123125
initialPosition,
126+
mock(Client.class),
124127
jobStats,
128+
transformConfig,
125129
/* TransformProgress */ null,
126130
TransformCheckpoint.EMPTY,
127131
TransformCheckpoint.EMPTY,
128-
context
132+
new SeqNoPrimaryTermAndIndex(1, 1, "foo"),
133+
context,
134+
false
129135
);
130136
this.searchFunction = searchFunction;
131137
this.bulkFunction = bulkFunction;
@@ -180,7 +186,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
180186

181187
try {
182188
BulkResponse response = bulkFunction.apply(request);
183-
nextPhase.onResponse(response);
189+
super.handleBulkResponse(response, nextPhase);
184190
} catch (Exception e) {
185191
nextPhase.onFailure(e);
186192
}
@@ -245,7 +251,7 @@ void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse>
245251
}
246252

247253
@Override
248-
void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<BulkByScrollResponse> responseListener) {
254+
protected void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<BulkByScrollResponse> responseListener) {
249255
try {
250256
BulkByScrollResponse response = deleteByQueryFunction.apply(deleteByQueryRequest);
251257
responseListener.onResponse(response);
@@ -255,7 +261,7 @@ void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<B
255261
}
256262

257263
@Override
258-
void refreshDestinationIndex(ActionListener<RefreshResponse> responseListener) {
264+
protected void refreshDestinationIndex(ActionListener<RefreshResponse> responseListener) {
259265
responseListener.onResponse(new RefreshResponse(1, 1, 0, Collections.emptyList()));
260266
}
261267

@@ -700,6 +706,116 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
700706
assertEquals(1, context.getFailureCount());
701707
}
702708

709+
public void testFailureCounterIsResetOnSuccess() throws Exception {
710+
String transformId = randomAlphaOfLength(10);
711+
TransformConfig config = new TransformConfig(
712+
transformId,
713+
randomSourceConfig(),
714+
randomDestConfig(),
715+
null,
716+
null,
717+
null,
718+
randomPivotConfig(),
719+
null,
720+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
721+
null,
722+
null,
723+
null,
724+
null
725+
);
726+
727+
final SearchResponse searchResponse = new SearchResponse(
728+
new InternalSearchResponse(
729+
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
730+
// Simulate completely null aggs
731+
null,
732+
new Suggest(Collections.emptyList()),
733+
new SearchProfileShardResults(Collections.emptyMap()),
734+
false,
735+
false,
736+
1
737+
),
738+
"",
739+
1,
740+
1,
741+
0,
742+
0,
743+
ShardSearchFailure.EMPTY_ARRAY,
744+
SearchResponse.Clusters.EMPTY
745+
);
746+
747+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
748+
Function<SearchRequest, SearchResponse> searchFunction = new Function<>() {
749+
final AtomicInteger calls = new AtomicInteger(0);
750+
751+
@Override
752+
public SearchResponse apply(SearchRequest searchRequest) {
753+
int call = calls.getAndIncrement();
754+
if (call == 0) {
755+
throw new SearchPhaseExecutionException(
756+
"query",
757+
"Partial shards failure",
758+
new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
759+
);
760+
}
761+
return searchResponse;
762+
}
763+
};
764+
765+
Function<BulkRequest, BulkResponse> bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1);
766+
767+
final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
768+
final AtomicReference<String> failureMessage = new AtomicReference<>();
769+
Consumer<String> failureConsumer = message -> {
770+
failIndexerCalled.compareAndSet(false, true);
771+
failureMessage.compareAndSet(null, message);
772+
};
773+
774+
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
775+
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
776+
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
777+
778+
MockedTransformIndexer indexer = createMockIndexer(
779+
config,
780+
state,
781+
searchFunction,
782+
bulkFunction,
783+
null,
784+
failureConsumer,
785+
threadPool,
786+
ThreadPool.Names.GENERIC,
787+
auditor,
788+
context
789+
);
790+
791+
final CountDownLatch latch = indexer.newLatch(1);
792+
793+
indexer.start();
794+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
795+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
796+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
797+
798+
latch.countDown();
799+
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
800+
assertFalse(failIndexerCalled.get());
801+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
802+
assertEquals(1, context.getFailureCount());
803+
804+
final CountDownLatch secondLatch = indexer.newLatch(1);
805+
806+
indexer.start();
807+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
808+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
809+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
810+
811+
secondLatch.countDown();
812+
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
813+
assertFalse(failIndexerCalled.get());
814+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
815+
auditor.assertAllExpectationsMatched();
816+
assertEquals(0, context.getFailureCount());
817+
}
818+
703819
private MockedTransformIndexer createMockIndexer(
704820
TransformConfig config,
705821
AtomicReference<IndexerState> state,

0 commit comments

Comments
 (0)