Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ad168f7
Fix reader context leak when query response serialization fails
drempapis Mar 22, 2026
0af5d96
remove comment
drempapis Mar 22, 2026
7ced78f
[CI] Auto commit changes from spotless
Mar 22, 2026
7392231
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 22, 2026
c489112
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 22, 2026
38625df
update code to prove ci
drempapis Mar 22, 2026
31d17d1
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 22, 2026
faa8065
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 23, 2026
cbd6513
add test
drempapis Mar 23, 2026
65d2576
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 23, 2026
e7201ab
[CI] Auto commit changes from spotless
Mar 23, 2026
28bb96d
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 23, 2026
2f13bfd
update
drempapis Mar 23, 2026
2f1268f
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 23, 2026
0b43c91
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 23, 2026
f3c906d
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 24, 2026
d91aab2
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 24, 2026
3a96957
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 24, 2026
b9c9d88
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
da2502e
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
c544194
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
c46290a
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
a318fff
update after review
drempapis Mar 25, 2026
422086d
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
bbb3b05
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
7a99b23
[CI] Auto commit changes from spotless
Mar 25, 2026
5ae4e33
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
5ffde5c
update after review
drempapis Mar 25, 2026
06a53c2
Merge branch 'fix/reader_context_leak' of github.com:drempapis/elasti…
drempapis Mar 25, 2026
13be3b6
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
4d289a2
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
5337d43
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
8133e6e
[CI] Auto commit changes from spotless
Mar 25, 2026
a1a5383
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
b367d84
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 25, 2026
cf22a41
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 26, 2026
4fb1e87
simplify code
drempapis Mar 26, 2026
5d58fda
Merge branch 'fix/reader_context_leak' of github.com:drempapis/elasti…
drempapis Mar 26, 2026
acf0d40
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 26, 2026
d7d20a7
Merge branch 'main' into fix/reader_context_leak
drempapis Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/144587
- class: org.elasticsearch.action.search.TransportSearchIT
method: testCircuitBreakerReduceFail
issue: https://github.com/elastic/elasticsearch/issues/144598
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/144587
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
Expand Down Expand Up @@ -541,13 +542,28 @@ public void onFailure(Exception exc) {
);
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
assertBusy(MockSearchService::assertNoInFlightContext);
} finally {
updateClusterSettings(
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
);
}
}

public void testReaderContextFreedOnSerializationFailure() throws Exception {
String coordinatingNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
indexSomeDocs("test", 1, 3);
ensureGreen("test");

updateClusterSettings(Settings.builder().put("indices.breaker.request.limit", "1b"));
try {
expectThrows(Exception.class, client(coordinatingNode).prepareSearch("test")::get);
assertBusy(MockSearchService::assertNoInFlightContext);
} finally {
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
}
}

public void testCircuitBreakerFetchFail() throws Exception {
int numShards = randomIntBetween(1, 10);
int numDocs = numShards * 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -738,8 +739,8 @@ public void onResponse(T response) {
response.writeTo(out);
bytesRef = out.moveToBytesReference();
} catch (Exception e) {
channelListener.onFailure(e);
return;
// Propagate to caller so wrapFailureListener in SearchService can free the reader context.
throw ExceptionsHelper.convertToRuntime(e);
}
// respondAndRelease releases the bytes once the transport layer completes.
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(bytesRef, transportVersion));
Expand Down
71 changes: 45 additions & 26 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -754,11 +755,31 @@ public void executeQueryPhase(ShardSearchRequest request, CancellableTask task,
}
// TODO: i think it makes sense to always do a canMatch here and
// return an empty response (not null response) in case canMatch is false?
ensureAfterSeqNoRefreshed(shard, orig, () -> executeQueryPhase(orig, task), l);
executeQueryPhaseAsync(shard, orig, task, l);
})
);
}

private void executeQueryPhaseAsync(
IndexShard shard,
ShardSearchRequest request,
CancellableTask task,
ActionListener<SearchPhaseResult> listener
) {
// wrapFailureListener requires readerContext and markAsUsed, but those are created inside the supplier
// lambda below. The ActionListener.wrap callbacks are constructed (before the supplier runs) and must
// therefore read the listener indirectly. completionListenerRef starts as the plain listener so that any
// failure before the supplier runs is still forwarded. Once the supplier sets it to the wrapped version,
// the ActionListener.wrap callbacks will invoke the one that handles readerContext cleanup on failure.
final var completionListenerRef = new AtomicReference<>(listener);
ensureAfterSeqNoRefreshed(shard, request, () -> {
final ReaderContext readerContext = createOrGetReaderContext(request);
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(request));
completionListenerRef.set(wrapFailureListener(listener, readerContext, markAsUsed));
return executeQueryPhase(request, task, readerContext);
}, ActionListener.wrap(result -> completionListenerRef.get().onResponse(result), e -> completionListenerRef.get().onFailure(e)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code sequence is hard to read/understand for me with the atomicreference setting and the access in the next argument. Do you have any idea for simplification?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve added a comment to clarify the understanding of the code and the need of the completionListenerRef

}

private <T extends RefCounted> void ensureAfterSeqNoRefreshed(
IndexShard shard,
ShardSearchRequest request,
Expand Down Expand Up @@ -908,11 +929,10 @@ private static <T extends RefCounted> void runAsync(
* It is the responsibility of the caller to ensure that the ref count is correctly decremented
* when the object is no longer needed.
*/
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, CancellableTask task) throws Exception {
final ReaderContext readerContext = createOrGetReaderContext(request);
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, CancellableTask task, ReaderContext readerContext)
throws Exception {
try (
Releasable scope = tracer.withScope(task);
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, ResultsType.QUERY, true)
) {
tracer.startTrace("executeQueryPhase", Map.of());
Expand Down Expand Up @@ -967,7 +987,6 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
: new ElasticsearchException(e.getCause());
}
logger.trace("Query phase failed", e);
processFailure(readerContext, e);
throw e;
}
}
Expand Down Expand Up @@ -1651,36 +1670,36 @@ private <T> ActionListener<T> releaseCircuitBreakerOnResponse(
ActionListener<T> listener,
Function<T, FetchSearchResult> fetchResultExtractor
) {
return ActionListener.wrap(response -> {
try {
listener.onResponse(response);
} finally {
// Release bytes after the response handler completes
FetchSearchResult fetchResult = fetchResultExtractor.apply(response);
if (fetchResult != null) {
fetchResult.releaseCircuitBreakerBytes(circuitBreaker);
}
}
}, listener::onFailure);
}

private <T> ActionListener<T> wrapFailureListener(ActionListener<T> listener, ReaderContext context, Releasable releasable) {
return new ActionListener<>() {
@Override
public void onResponse(T resp) {
Releasables.close(releasable);
listener.onResponse(resp);
public void onResponse(T response) {
try {
listener.onResponse(response);
} finally {
// Release bytes after the response handler completes, even if it throws.
// Exceptions are intentionally allowed to propagate so that wrapFailureListener
// can observe them and free the reader context via processFailure.
FetchSearchResult fetchResult = fetchResultExtractor.apply(response);
if (fetchResult != null) {
fetchResult.releaseCircuitBreakerBytes(circuitBreaker);
}
}
}

@Override
public void onFailure(Exception exc) {
processFailure(context, exc);
Releasables.close(releasable);
listener.onFailure(exc);
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
}

private <T> ActionListener<T> wrapFailureListener(ActionListener<T> listener, ReaderContext context, Releasable releasable) {
return ActionListener.releaseAfter(ActionListener.wrap(listener::onResponse, e -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps a personal flaw but I find it very difficult to understand what we mean by
releaseAfter ( wrap ... )

Can you please document a bit the method and code? Would it make sense to make this method package visible and unit test it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Andrei, for the feedback.

Agreed, I documented the code and added unit tests by making the method package visible.

processFailure(context, e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we in trouble if processFailure fails?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, processFailure cannot thro, the freeReaderContext is just a ConcurrentHashMap.remove, and failShard is already wrapped in its own try/catch, but I agree the code shouldn't silently depend on that. I've added a try/finally so listener.onFailure(e) is unconditionally called regardless of what processFailure does.

listener.onFailure(e);
}), releasable);
}

private static boolean isScrollContext(ReaderContext context) {
return context instanceof LegacyReaderContext && context.singleSession() == false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ public void testNetworkPathBytesResponseRoundTrip() throws Exception {
}
}

public void testNetworkPathSerializationFailureSendsFailure() {
var sentException = new AtomicReference<Exception>();

public void testNetworkPathSerializationFailurePropagates() {
var channel = new TestTransportChannel(
ActionListener.wrap(resp -> fail("should not succeed when serialization fails"), sentException::set)
ActionListener.wrap(
resp -> fail("should not succeed when serialization fails"),
e -> fail("should not send failure to channel; caller handles it")
)
);

ActionListener<FailingTestResponse> listener = SearchTransportService.channelListener(
Expand All @@ -103,11 +104,9 @@ public void testNetworkPathSerializationFailureSendsFailure() {
newLimitedBreaker(ByteSizeValue.ofMb(100))
);

listener.onResponse(new FailingTestResponse());

assertThat(sentException.get(), notNullValue());
assertThat(sentException.get(), instanceOf(IOException.class));
assertThat(sentException.get().getMessage(), equalTo("simulated serialization failure"));
var ex = expectThrows(RuntimeException.class, () -> listener.onResponse(new FailingTestResponse()));
assertThat(ex.getCause(), instanceOf(IOException.class));
assertThat(ex.getCause().getMessage(), equalTo("simulated serialization failure"));
}

public void testNetworkPathOnFailureForwardsFailure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.ClusterState.VERSION_INTRODUCING_TRANSPORT_VERSIONS;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
Expand Down Expand Up @@ -174,15 +175,17 @@ public static EsRelation relation() {
// Common methods / assertions
//

public static void assertNoSearchContexts(RestClient client) throws IOException {
Map<String, Object> stats = searchStats(client);
@SuppressWarnings("unchecked")
Map<String, Object> indicesStats = (Map<String, Object>) stats.get("indices");
for (String index : indicesStats.keySet()) {
if (index.startsWith(".") == false) { // We are not interested in internal indices
assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index));
public static void assertNoSearchContexts(RestClient client) throws Exception {
assertBusy(() -> {
Map<String, Object> stats = searchStats(client);
@SuppressWarnings("unchecked")
Map<String, Object> indicesStats = (Map<String, Object>) stats.get("indices");
for (String index : indicesStats.keySet()) {
if (index.startsWith(".") == false) { // We are not interested in internal indices
assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index));
}
}
}
});
}

public static int getNumberOfSearchContexts(RestClient client, String index) throws IOException {
Expand Down
Loading