Fix reader context leak when query response serialization fails#144708
Fix reader context leak when query response serialization fails#144708drempapis merged 40 commits intoelastic:mainfrom
Conversation
|
@elasticmachine test this please |
|
@elasticmachine run elasticsearch-ci/pr-upgrade-part-1 |
|
@elasticmachine test this please |
|
@elasticmachine run elasticsearch-ci/part-3 |
|
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
| final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(request)); | ||
| completionListenerRef.set(wrapFailureListener(listener, readerContext, markAsUsed)); | ||
| return executeQueryPhase(request, task, readerContext); | ||
| }, ActionListener.wrap(result -> completionListenerRef.get().onResponse(result), e -> completionListenerRef.get().onFailure(e))); |
There was a problem hiding this comment.
this code sequence is hard to read/understand for me with the atomicreference setting and the access in the next argument. Do you have any idea for simplification?
There was a problem hiding this comment.
I’ve added a comment to clarify the understanding of the code and the need of the completionListenerRef
spinscale
left a comment
There was a problem hiding this comment.
Left one comment to think about, but LGTM otherwise.
andreidan
left a comment
There was a problem hiding this comment.
Thanks for fixing this Dimi. Left some minor comments.
|
|
||
| private <T> ActionListener<T> wrapFailureListener(ActionListener<T> listener, ReaderContext context, Releasable releasable) { | ||
| return ActionListener.releaseAfter(ActionListener.wrap(listener::onResponse, e -> { | ||
| processFailure(context, e); |
There was a problem hiding this comment.
are we in trouble if processFailure fails?
There was a problem hiding this comment.
In practice, processFailure cannot thro, the freeReaderContext is just a ConcurrentHashMap.remove, and failShard is already wrapped in its own try/catch, but I agree the code shouldn't silently depend on that. I've added a try/finally so listener.onFailure(e) is unconditionally called regardless of what processFailure does.
| } | ||
|
|
||
| private <T> ActionListener<T> wrapFailureListener(ActionListener<T> listener, ReaderContext context, Releasable releasable) { | ||
| return ActionListener.releaseAfter(ActionListener.wrap(listener::onResponse, e -> { |
There was a problem hiding this comment.
perhaps a personal flaw but I find it very difficult to understand what we mean by
releaseAfter ( wrap ... )
Can you please document a bit the method and code? Would it make sense to make this method package visible and unit test it?
There was a problem hiding this comment.
Thank you, Andrei, for the feedback.
Agreed, I documented the code and added unit tests by making the method package visible.
…csearch into fix/reader_context_leak
…csearch into fix/reader_context_leak
|
@elasticmachine run elasticsearch-ci/part-3 |
* upstream/main: (146 commits) Revert "[Native] Gradle-related tweaks to improve handling of the simdvec native library (elastic#144539)" Fix ArrayIndexOutOfBoundsException in fetch phase with partial results (elastic#144385) ESQL: Correctly manage NULL data type for SUM (elastic#144942) [ESQL] Fixes GroupedTopNBenchmark not executing (elastic#144944) Fix reader context leak when query response serialization fails (elastic#144708) Validate individual offset values in BULK_OFFSETS bounds checks (elastic#144643) Merge main21 source set into main in simdvec (elastic#144921) [TEST] Unmute TsidExtractingIdFieldMapperTests (elastic#144848) [Native] Gradle-related tweaks to improve handling of the simdvec native library (elastic#144539) Fix `ThreadedActionListenerTests#testRejectionHandling` (elastic#144795) Add new DLM Frozen Tier Transition execution plugin and service (elastic#144595) Prometheus: execute query_range via parsed EsqlStatement plan (elastic#144416) Investigate `testBulkIndexingRequestSplitting` failure (elastic#144766) Add test utility for wrapping directories in FilterDirectory layer (elastic#143563) Fix ES|QL decay tests with negative scale (elastic#144657) Fix circuit breaker leak in percolator query construction (elastic#144827) Use XPerFieldDocValuesFormat in AbstractTSDBSyntheticIdCodec (elastic#144744) [DOCS] Document how reindex work in CPS (elastic#144016) Fix Int4 vector library tests failing on Java 21 (elastic#144830) [DiskBBQ] Fix index sorting on flush (elastic#144938) ...
Solves #144598
Problem
PR #143136 introduced
NetworkPathListenerinSearchTransportServiceto defer circuit breaker release until the Netty transport write completes. As part of this, query response serialization was moved insideNetworkPathListener.onResponse().If serialization throws an exception, the exception was caught inside
NetworkPathListenerand forwarded directly to the transport channel's failure handler bypassingSearchService'swrapFailureListener. That listener is the one responsible for callingprocessFailure(context, exc), which frees theReaderContextassociated with the in-flight search.The result: the
ReaderContextis never freed, causing a leak of in-flight search contexts. This surfaced as intermittent failures inTransportSearchIT.testCircuitBreakerReduceFailwith the errorSolution
The fix ensures serialization failures are propagated through the full listener chain rather than short-circuiting to the channel directly.
SearchTransportService: Instead of catching the serialization exception and swallowing it into the channel, the exception is rethrown so it propagates tolistener.onFailure(), which is thewrapFailureListenerthat knows how to clean up theReaderContext.SearchService: RefactoredwrapFailureListenerto useActionListener.releaseAfter()for cleaner resource management, and extractedexecuteQueryPhaseAsync()to make the async execution boundary explicit. TheonFailurepath now correctly invokesprocessFailure(context, exc)regardless of whether the failure originated in the search execution phase or the serialization phase.