Skip to content

Commit 2e405af

Browse files
author
David Roberts
authored
[Transform] Don't search if all indices are unchanged between checkpoints (#77204) (#77245)
When every index that a transform is configured to search has remained completely unchanged between checkpoints the transform should not do a search at all. Following #75839 there was a problem where the scenario of all indices being unchanged between checkpoints could cause an empty list of indices to be searched, which Elasticsearch treats as meaning _all_ indices. This change should prevent that happening in future. Fixes #77137
1 parent 4d853fb commit 2e405af

File tree

5 files changed

+125
-59
lines changed

5 files changed

+125
-59
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,16 @@ private void onSearchResponse(SearchResponse searchResponse) {
462462
return;
463463
}
464464

465+
// searchResponse may be null if the search was optimized to a noop
466+
if (searchResponse == null) {
467+
logger.debug("No indexing necessary for job [{}], saving state and shutting down.", getJobId());
468+
// execute finishing tasks
469+
onFinish(ActionListener.wrap(
470+
r -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure),
471+
e -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure)));
472+
return;
473+
}
474+
465475
// allowPartialSearchResults is set to false, so we should never see shard failures here
466476
assert (searchResponse.getShardFailures().length == 0);
467477
stats.markStartProcessing();

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242

4343
public class AsyncTwoPhaseIndexerTests extends ESTestCase {
4444

45-
AtomicBoolean isFinished = new AtomicBoolean(false);
46-
AtomicBoolean isStopped = new AtomicBoolean(false);
45+
private final AtomicBoolean isFinished = new AtomicBoolean(false);
46+
private final AtomicBoolean isStopped = new AtomicBoolean(false);
4747

4848
@Before
4949
public void reset() {
@@ -57,12 +57,14 @@ private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
5757
// test the execution order
5858
private volatile int step;
5959
private final boolean stoppedBeforeFinished;
60+
private final boolean noIndices;
6061

6162
protected MockIndexer(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
62-
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) {
63+
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished, boolean noIndices) {
6364
super(threadPool, initialState, initialPosition, new MockJobStats());
6465
this.latch = latch;
6566
this.stoppedBeforeFinished = stoppedBeforeFinished;
67+
this.noIndices = noIndices;
6668
}
6769

6870
@Override
@@ -97,12 +99,19 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
9799
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
98100
assertThat(step, equalTo(1));
99101
++step;
100-
final SearchResponseSections sections = new SearchResponseSections(
101-
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
102-
null, false, null, null, 1);
103102

104103
// block till latch has been counted down, simulating network latency
105104
awaitForLatch();
105+
106+
if (noIndices) {
107+
// simulate no indices being searched due to optimizations
108+
nextPhase.onResponse(null);
109+
return;
110+
}
111+
112+
final SearchResponseSections sections = new SearchResponseSections(
113+
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
114+
null, false, null, null, 1);
106115
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
107116
}
108117

@@ -115,7 +124,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
115124
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
116125
// for stop before finished we do not know if its stopped before are after the search
117126
if (stoppedBeforeFinished == false) {
118-
assertThat(step, equalTo(4));
127+
assertThat(step, equalTo(noIndices ? 3 : 4));
119128
}
120129
++step;
121130
next.run();
@@ -128,7 +137,7 @@ protected void onFailure(Exception exc) {
128137

129138
@Override
130139
protected void onFinish(ActionListener<Void> listener) {
131-
assertThat(step, equalTo(3));
140+
assertThat(step, equalTo(noIndices ? 2 : 3));
132141
++step;
133142
listener.onResponse(null);
134143
assertTrue(isFinished.compareAndSet(false, true));
@@ -356,9 +365,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
356365
}
357366
}
358367

359-
private class MockThreadPool extends TestThreadPool {
368+
private static class MockThreadPool extends TestThreadPool {
360369

361-
private List<TimeValue> delays = new ArrayList<>();
370+
private final List<TimeValue> delays = new ArrayList<>();
362371

363372
MockThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
364373
super(name, Settings.EMPTY, customBuilders);
@@ -381,7 +390,7 @@ public void testStateMachine() throws Exception {
381390
final ThreadPool threadPool = new TestThreadPool(getTestName());
382391
try {
383392
CountDownLatch countDownLatch = new CountDownLatch(1);
384-
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false);
393+
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false, false);
385394
indexer.start();
386395
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
387396
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -419,12 +428,39 @@ public void testStateMachineBrokenSearch() throws Exception {
419428
}
420429
}
421430

431+
public void testZeroIndicesWhileIndexing() throws Exception {
432+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
433+
final ThreadPool threadPool = new TestThreadPool(getTestName());
434+
try {
435+
CountDownLatch countDownLatch = new CountDownLatch(1);
436+
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false, true);
437+
indexer.start();
438+
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
439+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
440+
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
441+
assertBusy(() -> assertThat(indexer.getPosition(), equalTo(2)));
442+
443+
countDownLatch.countDown();
444+
assertBusy(() -> assertTrue(isFinished.get()));
445+
assertThat(indexer.getPosition(), equalTo(2));
446+
447+
assertFalse(isStopped.get());
448+
assertThat(indexer.getStep(), equalTo(4));
449+
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
450+
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
451+
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
452+
assertTrue(indexer.abort());
453+
} finally {
454+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
455+
}
456+
}
457+
422458
public void testStop_WhileIndexing() throws Exception {
423459
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
424460
final ThreadPool threadPool = new TestThreadPool(getTestName());
425461
try {
426462
CountDownLatch countDownLatch = new CountDownLatch(1);
427-
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true);
463+
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true, false);
428464
indexer.start();
429465
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
430466
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
127127

128128
injectPointInTimeIfNeeded(
129129
buildSearchRequest(),
130-
ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure)
130+
ActionListener.wrap(pitSearchRequest -> doSearch(pitSearchRequest, nextPhase), nextPhase::onFailure)
131131
);
132132
}
133133

@@ -393,12 +393,12 @@ private void injectPointInTimeIfNeeded(
393393
Tuple<String, SearchRequest> namedSearchRequest,
394394
ActionListener<Tuple<String, SearchRequest>> listener
395395
) {
396-
if (disablePit) {
396+
SearchRequest searchRequest = namedSearchRequest.v2();
397+
if (disablePit || searchRequest.indices().length == 0) {
397398
listener.onResponse(namedSearchRequest);
398399
return;
399400
}
400401

401-
SearchRequest searchRequest = namedSearchRequest.v2();
402402
PointInTimeBuilder pit = namedPits.get(namedSearchRequest.v1());
403403
if (pit != null) {
404404
searchRequest.source().pointInTimeBuilder(pit);
@@ -455,22 +455,30 @@ private void injectPointInTimeIfNeeded(
455455
);
456456
}
457457

458-
private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
459-
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", namedSearchRequest.v2()));
458+
void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
459+
String name = namedSearchRequest.v1();
460+
SearchRequest searchRequest = namedSearchRequest.v2();
461+
// We want to treat a request to search 0 indices as a request to do nothing, not a request to search all indices
462+
if (searchRequest.indices().length == 0) {
463+
logger.debug("[{}] Search request [{}] optimized to noop; searchRequest [{}]", getJobId(), name, searchRequest);
464+
listener.onResponse(null);
465+
return;
466+
}
467+
logger.trace("searchRequest: [{}]", searchRequest);
460468

461-
PointInTimeBuilder pit = namedSearchRequest.v2().pointInTimeBuilder();
469+
PointInTimeBuilder pit = searchRequest.pointInTimeBuilder();
462470

463471
ClientHelper.executeWithHeadersAsync(
464472
transformConfig.getHeaders(),
465473
ClientHelper.TRANSFORM_ORIGIN,
466474
client,
467475
SearchAction.INSTANCE,
468-
namedSearchRequest.v2(),
476+
searchRequest,
469477
ActionListener.wrap(response -> {
470478
// did the pit change?
471-
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
472-
namedPits.put(namedSearchRequest.v1(), new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
473-
logger.trace("point in time handle has changed; request [{}]", namedSearchRequest.v1());
479+
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId().equals(pit.getEncodedId())) == false) {
480+
namedPits.put(name, new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
481+
logger.trace("point in time handle has changed; request [{}]", name);
474482
}
475483

476484
listener.onResponse(response);
@@ -484,18 +492,18 @@ private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionLis
484492
new ParameterizedMessage(
485493
"[{}] Search context missing, falling back to normal search; request [{}]",
486494
getJobId(),
487-
namedSearchRequest.v1()
495+
name
488496
),
489497
e
490498
);
491-
namedPits.remove(namedSearchRequest.v1());
492-
namedSearchRequest.v2().source().pointInTimeBuilder(null);
499+
namedPits.remove(name);
500+
searchRequest.source().pointInTimeBuilder(null);
493501
ClientHelper.executeWithHeadersAsync(
494502
transformConfig.getHeaders(),
495503
ClientHelper.TRANSFORM_ORIGIN,
496504
client,
497505
SearchAction.INSTANCE,
498-
namedSearchRequest.v2(),
506+
searchRequest,
499507
listener
500508
);
501509
return;

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -523,10 +523,8 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
523523
}
524524

525525
if (lastCheckpoint != null) {
526-
long docsIndexed = 0;
527-
long docsProcessed = 0;
528-
docsIndexed = progress.getDocumentsIndexed();
529-
docsProcessed = progress.getDocumentsProcessed();
526+
long docsIndexed = progress.getDocumentsIndexed();
527+
long docsProcessed = progress.getDocumentsProcessed();
530528

531529
long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp();
532530
getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed);
@@ -1131,7 +1129,7 @@ private SearchRequest buildQueryToFindChanges() {
11311129
// TODO: if buildChangesQuery changes the query it get overwritten
11321130
sourceBuilder.query(filteredQuery);
11331131

1134-
logger.debug("[{}] Querying for changes: {}", getJobId(), sourceBuilder);
1132+
logger.debug("[{}] Querying {} for changes: {}", getJobId(), request.indices(), sourceBuilder);
11351133
return request.source(sourceBuilder);
11361134
}
11371135

@@ -1168,7 +1166,7 @@ private SearchRequest buildQueryToUpdateDestinationIndex() {
11681166
}
11691167

11701168
sourceBuilder.query(queryBuilder);
1171-
logger.debug(() -> new ParameterizedMessage("[{}] Querying for data: {}", getJobId(), sourceBuilder));
1169+
logger.debug("[{}] Querying {} for data: {}", getJobId(), request.indices(), sourceBuilder);
11721170

11731171
return request.source(sourceBuilder)
11741172
.allowPartialSearchResults(false) // shard failures should fail the request

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,8 @@
6868

6969
public class ClientTransformIndexerTests extends ESTestCase {
7070

71-
public void testAudiOnFinishFrequency() {
72-
ThreadPool threadPool = mock(ThreadPool.class);
73-
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
74-
75-
ClientTransformIndexer indexer = new ClientTransformIndexer(
76-
mock(ThreadPool.class),
77-
new TransformServices(
78-
mock(IndexBasedTransformConfigManager.class),
79-
mock(TransformCheckpointService.class),
80-
mock(TransformAuditor.class),
81-
mock(SchedulerEngine.class)
82-
),
83-
mock(CheckpointProvider.class),
84-
new AtomicReference<>(IndexerState.STOPPED),
85-
null,
86-
mock(Client.class),
87-
mock(TransformIndexerStats.class),
88-
mock(TransformConfig.class),
89-
null,
90-
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
91-
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
92-
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
93-
mock(TransformContext.class),
94-
false
95-
);
96-
71+
public void testAuditOnFinishFrequency() {
72+
ClientTransformIndexer indexer = createTestIndexer();
9773
List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());
9874

9975
// Audit every checkpoint for the first 10
@@ -125,6 +101,17 @@ public void testAudiOnFinishFrequency() {
125101
assertFalse(shouldAudit.get(11_999));
126102
}
127103

104+
public void testDoSearchGivenNoIndices() {
105+
ClientTransformIndexer indexer = createTestIndexer();
106+
SearchRequest searchRequest = new SearchRequest(new String[0]);
107+
Tuple<String, SearchRequest> namedSearchRequest = new Tuple<>("test", searchRequest);
108+
indexer.doSearch(namedSearchRequest, ActionListener.wrap(
109+
// A search of zero indices should return null rather than attempt to search all indices
110+
ESTestCase::assertNull,
111+
e -> fail(e.getMessage())
112+
));
113+
}
114+
128115
public void testPitInjection() throws InterruptedException {
129116
TransformConfig config = TransformConfigTests.randomTransformConfig();
130117

@@ -334,7 +321,7 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer {
334321

335322
@Override
336323
protected Tuple<String, SearchRequest> buildSearchRequest() {
337-
return new Tuple<>("mock", new SearchRequest().source(new SearchSourceBuilder()));
324+
return new Tuple<>("mock", new SearchRequest("source_index").source(new SearchSourceBuilder()));
338325
}
339326
}
340327

@@ -427,4 +414,31 @@ private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> f
427414
function.accept(listener);
428415
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
429416
}
417+
418+
private ClientTransformIndexer createTestIndexer() {
419+
ThreadPool threadPool = mock(ThreadPool.class);
420+
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
421+
422+
return new ClientTransformIndexer(
423+
mock(ThreadPool.class),
424+
new TransformServices(
425+
mock(IndexBasedTransformConfigManager.class),
426+
mock(TransformCheckpointService.class),
427+
mock(TransformAuditor.class),
428+
mock(SchedulerEngine.class)
429+
),
430+
mock(CheckpointProvider.class),
431+
new AtomicReference<>(IndexerState.STOPPED),
432+
null,
433+
mock(Client.class),
434+
mock(TransformIndexerStats.class),
435+
mock(TransformConfig.class),
436+
null,
437+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
438+
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
439+
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
440+
mock(TransformContext.class),
441+
false
442+
);
443+
}
430444
}

0 commit comments

Comments
 (0)