Skip to content

Commit bc65be2

Browse files
authored
Reindex: wait for cleanup before responding (#23677)
Changes reindex and friends to wait until the entire request has been "cleaned up" before responding. "Clean up" in this context is clearing the scroll and (for reindex-from-remote) shutting down the client. Failures to clean up are still only logged, not returned to the user. Closes #23653
1 parent f8453ac commit bc65be2

File tree

5 files changed

+55
-21
lines changed

5 files changed

+55
-21
lines changed

core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -465,14 +465,18 @@ protected void finishHim(Exception failure) {
465465
* @param searchFailures any search failures accumulated during the request
466466
* @param timedOut have any of the sub-requests timed out?
467467
*/
468-
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
469-
scrollSource.close();
470-
if (failure == null) {
471-
listener.onResponse(
472-
buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut));
473-
} else {
474-
listener.onFailure(failure);
475-
}
468+
protected void finishHim(Exception failure, List<Failure> indexingFailures,
469+
List<SearchFailure> searchFailures, boolean timedOut) {
470+
scrollSource.close(() -> {
471+
if (failure == null) {
472+
BulkByScrollResponse response = buildResponse(
473+
timeValueNanos(System.nanoTime() - startTime.get()),
474+
indexingFailures, searchFailures, timedOut);
475+
listener.onResponse(response);
476+
} else {
477+
listener.onFailure(failure);
478+
}
479+
});
476480
}
477481

478482
/**

core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ public void onFailure(Exception e) {
113113
}
114114

115115
@Override
116-
protected void cleanup() {
117-
// Nothing to do
116+
protected void cleanup(Runnable onCompletion) {
117+
onCompletion.run();
118118
}
119119

120120
/**

core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
/**
4848
* A scrollable source of results.
4949
*/
50-
public abstract class ScrollableHitSource implements Closeable {
50+
public abstract class ScrollableHitSource {
5151
private final AtomicReference<String> scrollId = new AtomicReference<>();
5252

5353
protected final Logger logger;
@@ -82,25 +82,31 @@ public final void startNextScroll(TimeValue extraKeepAlive, Consumer<Response> o
8282
}
8383
protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse);
8484

85-
@Override
86-
public final void close() {
85+
public final void close(Runnable onCompletion) {
8786
String scrollId = this.scrollId.get();
8887
if (Strings.hasLength(scrollId)) {
89-
clearScroll(scrollId, this::cleanup);
88+
clearScroll(scrollId, () -> cleanup(onCompletion));
9089
} else {
91-
cleanup();
90+
cleanup(onCompletion);
9291
}
9392
}
93+
9494
/**
9595
* Called to clear a scroll id.
96+
*
9697
* @param scrollId the id to clear
97-
* @param onCompletion implementers must call this after completing the clear whether they are successful or not
98+
* @param onCompletion implementers must call this after completing the clear whether they are
99+
* successful or not
98100
*/
99101
protected abstract void clearScroll(String scrollId, Runnable onCompletion);
100102
/**
101-
* Called after the process has been totally finished to clean up any resources the process needed like remote connections.
103+
* Called after the process has been totally finished to clean up any resources the process
104+
* needed like remote connections.
105+
*
106+
* @param onCompletion implementers must call this after completing the cleanup whether they are
107+
* successful or not
102108
*/
103-
protected abstract void cleanup();
109+
protected abstract void cleanup(Runnable onCompletion);
104110

105111
/**
106112
* Set the id of the last scroll. Used for debugging.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,18 @@ private void logFailure(Exception e) {
141141
}
142142

143143
@Override
144-
protected void cleanup() {
145-
/* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to
146-
* close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */
144+
protected void cleanup(Runnable onCompletion) {
145+
/* This is called on the RestClient's thread pool and attempting to close the client on its
146+
* own threadpool causes it to fail to close. So we always shutdown the RestClient
147+
* asynchronously on a thread in Elasticsearch's generic thread pool. */
147148
threadPool.generic().submit(() -> {
148149
try {
149150
client.close();
150151
logger.debug("Shut down remote connection");
151152
} catch (IOException e) {
152153
logger.error("Failed to shutdown the remote connection", e);
154+
} finally {
155+
onCompletion.run();
153156
}
154157
});
155158
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@
8080
import static org.hamcrest.Matchers.hasSize;
8181
import static org.hamcrest.Matchers.instanceOf;
8282
import static org.mockito.Matchers.any;
83+
import static org.mockito.Mockito.doThrow;
8384
import static org.mockito.Mockito.mock;
85+
import static org.mockito.Mockito.verify;
8486
import static org.mockito.Mockito.when;
8587

8688
public class RemoteScrollableHitSourceTests extends ESTestCase {
@@ -478,6 +480,25 @@ public void testUnexpectedJsonThinksRemoveIsNotES() throws IOException {
478480
e.getCause().getCause().getCause().getMessage());
479481
}
480482

483+
public void testCleanupSuccessful() throws Exception {
484+
AtomicBoolean cleanupCallbackCalled = new AtomicBoolean();
485+
RestClient client = mock(RestClient.class);
486+
TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client);
487+
hitSource.cleanup(() -> cleanupCallbackCalled.set(true));
488+
verify(client).close();
489+
assertTrue(cleanupCallbackCalled.get());
490+
}
491+
492+
public void testCleanupFailure() throws Exception {
493+
AtomicBoolean cleanupCallbackCalled = new AtomicBoolean();
494+
RestClient client = mock(RestClient.class);
495+
doThrow(new RuntimeException("test")).when(client).close();
496+
TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client);
497+
hitSource.cleanup(() -> cleanupCallbackCalled.set(true));
498+
verify(client).close();
499+
assertTrue(cleanupCallbackCalled.get());
500+
}
501+
481502
private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception {
482503
return sourceWithMockedRemoteCall(true, ContentType.APPLICATION_JSON, paths);
483504
}

0 commit comments

Comments
 (0)