Delay circuit breaker release until fetch response is sent#139243
Delay circuit breaker release until fetch response is sent#139243drempapis merged 34 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
|
@elasticmachine run elasticsearch-ci/part-1 |
|
@elasticmachine run elasticsearch-ci/part-2 |
|
@elasticmachine run elasticsearch-ci/part-1 |
andreidan
left a comment
There was a problem hiding this comment.
Thanks for working on this Dimi (and wow, sorry for the late review 🙏 )
I had an initial look over this and left a few comments.
Did you test this locally with a running Elasticsearch? (i.e. issue some requests without the fix and see Elasticsearch OOM, then the same with the fix in place and see Elasticsearch 429 instead?)
| private void createSmallIndex() throws IOException { | ||
| createIndex(INDEX_SMALL); | ||
| populateIndex(INDEX_SMALL, 50, 10_000); | ||
| } | ||
|
|
||
| private void createLargeIndex() throws IOException { | ||
| createIndex(INDEX_LARGE); | ||
| populateIndex(INDEX_LARGE, 50, 100_000); | ||
| } | ||
|
|
||
| private void createIndex(String indexName) { | ||
| assertAcked( | ||
| prepareCreate(indexName).setMapping( | ||
| SORT_FIELD, | ||
| "type=long", | ||
| "text", | ||
| "type=text,store=true", | ||
| "large_text_1", | ||
| "type=text,store=false", | ||
| "large_text_2", | ||
| "type=text,store=false", | ||
| "large_text_3", | ||
| "type=text,store=false", | ||
| "keyword", | ||
| "type=keyword" | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| private void populateIndex(String indexName, int nDocs, int textSize) throws IOException { |
There was a problem hiding this comment.
I think we usually have private methods at the bottom to enhance readability
| ensureSearchable(INDEX_SMALL); | ||
| } | ||
|
|
||
| private void createSmallIndex() throws IOException { |
There was a problem hiding this comment.
nit: I think this rather shallow method hinders readability (and it's only used once)
Would it be easier to just call the code directly?
| populateIndex(INDEX_SMALL, 50, 10_000); | ||
| } | ||
|
|
||
| private void createLargeIndex() throws IOException { |
| } | ||
|
|
||
| private long getRequestBreakerUsed() { | ||
| CircuitBreakerService breakerService = internalCluster().getInstance(CircuitBreakerService.class); |
There was a problem hiding this comment.
I think this gets the CircuitBreakerService from a random node in the cluster so it might create test flakiness?
Do we need to be more specific about which CircuitBreakerService are we retrieving?
There was a problem hiding this comment.
That's a good point!!
I updated the code to use
@ClusterScope(scope = TEST, numDataNodes = 0, numClientNodes = 0)
to start with an empty cluster and explicitly start a data node and coordinator node per test.
So now I pass the data node name to getRequestBreakerUsed(String node) to retrieve the circuit breaker from the specific node where shards are allocated and fetch phase executes.
|
|
||
| private SearchHits hits; | ||
|
|
||
| private transient long circuitBreakerBytes = 0L; |
There was a problem hiding this comment.
Shall we name this something that indicates what the field is? (i.e. searchHitsSizeBytes or something like that? - it's currently named based on what it is used for, not what it is)
| private static class SearchHitsWithBreakerBytes { | ||
| final SearchHits hits; | ||
| final long circuitBreakerBytes; | ||
|
|
||
| SearchHitsWithBreakerBytes(SearchHits hits, long circuitBreakerBytes) { | ||
| this.hits = hits; | ||
| this.circuitBreakerBytes = circuitBreakerBytes; | ||
| } |
There was a problem hiding this comment.
should we rename this to SearchHitsAndBytesSize ?
| ? Profiler.NOOP | ||
| : Profilers.startProfilingFetchPhase(); | ||
| SearchHits hits = null; | ||
| long circuitBreakerBytes = 0L; |
There was a problem hiding this comment.
| long circuitBreakerBytes = 0L; | |
| long searchHitsBytesSize = 0L; |
There was a problem hiding this comment.
That's a good suggestion, updated!
| /** | ||
| * Test the circuit breaker release helper for QueryFetchSearchResult. | ||
| */ |
There was a problem hiding this comment.
nit: these comments are a bit confusing for me (what helper?) I'd prefer if we remove them and bake the logic of what we're testing in the test name
There was a problem hiding this comment.
Removed the comments
| assertThat( | ||
| "Circuit breaker should not grow after multiple searches (no leaks)", | ||
| getRequestBreakerUsed(), | ||
| lessThanOrEqualTo(initialBreaker + ByteSizeValue.ofKb(100).getBytes()) // Allow small variance |
There was a problem hiding this comment.
would the variance not mask a leak here?
There was a problem hiding this comment.
You’re right; I removed that and added it within an assertBusy block
| /** | ||
| * Test circuit breaker release with scroll search (scroll fetch path). | ||
| */ |
There was a problem hiding this comment.
test name is descriptive enough IMO
There was a problem hiding this comment.
I added the comment to stay consistent with the rest of the tests. Do you think we should remove these comments across all tests for consistency? The tests code is fairly straightforward to read.
There was a problem hiding this comment.
Yes please, unless you find them useful (they currently mostly just restate what the test method names are saying)
@andreidan thank you for the review. I tested this locally against a running Elasticsearch cluster during implementation. Before applying the request circuit breaker, I was able to reproduce the problematic behavior by issuing large fetch requests, which caused the data node to run out of memory during the fetch phase. I used a profiler to monitor heap usage while the fetch was executing and observed the OOM. With the fix applied, the same requests fail with a CircuitBreakingException and are mapped to 429, without destabilizing the node. In this PR, I added integration tests that intentionally lower indices.breaker.request.limit and assert that large fetches now trip the request breaker and return 429s, as well as that breaker usage is properly released afterward. |
andreidan
left a comment
There was a problem hiding this comment.
Thanks for working on this Dimi.
LGTM
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Andrei Dan <andrei.dan@elastic.co>
Circuit breaker bytes were being released too early in the
fetch phase. The bytes were released immediately after buildingSearchHitsinFetchPhase.buildSearchHits(), but before the response was serialized and sent to the coordinator node. This created a timing window where:This PR fixes the timing issue by delaying circuit breaker release until after the response has been successfully sent to the coordinator. We achieve this by:
FetchSearchResultwhen hits are builtlistener.onResponse()completes