-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Fix SearchContext CB memory accounting #138002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
19ae0a7
15e8e90
4b32644
0bb77a0
7b40abe
9ed0188
38e68f0
4103834
b36faf0
47c45c3
ddfbaf0
51ea8a6
c15fa9f
801d6d0
221f947
93775d8
bbf097a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 138002 | ||
| summary: Fix `SearchContext` CB memory accounting | ||
| area: Aggregations | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| package org.elasticsearch.search.aggregations.metrics; | ||
|
|
||
| import org.apache.logging.log4j.util.Strings; | ||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.action.search.SearchRequestBuilder; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.search.aggregations.bucket.terms.Terms; | ||
| import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; | ||
| import org.elasticsearch.search.sort.SortBuilders; | ||
| import org.elasticsearch.search.sort.SortOrder; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; | ||
| import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; | ||
| import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; | ||
| import static org.hamcrest.Matchers.containsString; | ||
| import static org.hamcrest.Matchers.notNullValue; | ||
|
|
||
| @ESIntegTestCase.SuiteScopeTestCase() | ||
| public class LargeTopHitsIT extends ESIntegTestCase { | ||
|
|
||
| private static final String TERMS_AGGS_FIELD_1 = "terms1"; | ||
| private static final String TERMS_AGGS_FIELD_2 = "terms2"; | ||
| private static final String TERMS_AGGS_FIELD_3 = "terms3"; | ||
| private static final String SORT_FIELD = "sort"; | ||
|
|
||
| @Override | ||
| protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
| return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("indices.breaker.request.type", "memory").build(); | ||
| } | ||
|
|
||
| public static String randomExecutionHint() { | ||
| return randomBoolean() ? null : randomFrom(ExecutionMode.values()).toString(); | ||
| } | ||
|
|
||
| @Override | ||
| public void setupSuiteScopeCluster() throws Exception { | ||
| initSmallIdx(); | ||
| ensureSearchable(); | ||
| } | ||
|
|
||
| private void initSmallIdx() throws IOException { | ||
| createIndex("small_idx"); | ||
| ensureGreen("small_idx"); | ||
| populateIndex("small_idx", 5, 40_000); | ||
| } | ||
|
|
||
| private void initLargeIdx() throws IOException { | ||
| createIndex("large_idx"); | ||
| ensureGreen("large_idx"); | ||
| populateIndex("large_idx", 70, 50_000); | ||
| } | ||
|
|
||
| public void testSimple() { | ||
| assertNoFailuresAndResponse(query("small_idx"), response -> { | ||
| Terms terms = response.getAggregations().get("terms"); | ||
| assertThat(terms, notNullValue()); | ||
| }); | ||
| } | ||
|
|
||
| public void test500Queries() { | ||
| for (int i = 0; i < 500; i++) { | ||
| // make sure we are not leaking memory over multiple queries | ||
| assertNoFailuresAndResponse(query("small_idx"), response -> { | ||
| Terms terms = response.getAggregations().get("terms"); | ||
| assertThat(terms, notNullValue()); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // This works most of the time, but it's not consistent: it still triggers OOM sometimes. | ||
| // The test env is too small and non-deterministic to hold all these data and results. | ||
| @AwaitsFix(bugUrl = "see comment above") | ||
| public void testBreakAndRecover() throws IOException { | ||
| initLargeIdx(); | ||
| assertNoFailuresAndResponse(query("small_idx"), response -> { | ||
| Terms terms = response.getAggregations().get("terms"); | ||
| assertThat(terms, notNullValue()); | ||
| }); | ||
|
|
||
| assertFailures(query("large_idx"), RestStatus.TOO_MANY_REQUESTS, containsString("Data too large")); | ||
|
|
||
| assertNoFailuresAndResponse(query("small_idx"), response -> { | ||
| Terms terms = response.getAggregations().get("terms"); | ||
| assertThat(terms, notNullValue()); | ||
| }); | ||
| } | ||
|
|
||
| private void createIndex(String idxName) { | ||
| assertAcked( | ||
| prepareCreate(idxName).setMapping( | ||
| TERMS_AGGS_FIELD_1, | ||
| "type=keyword", | ||
| TERMS_AGGS_FIELD_2, | ||
| "type=keyword", | ||
| TERMS_AGGS_FIELD_3, | ||
| "type=keyword", | ||
| "text", | ||
| "type=text,store=true", | ||
| "large_text_1", | ||
| "type=text,store=false", | ||
| "large_text_2", | ||
| "type=text,store=false", | ||
| "large_text_3", | ||
| "type=text,store=false", | ||
| "large_text_4", | ||
| "type=text,store=false", | ||
| "large_text_5", | ||
| "type=text,store=false" | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| private void populateIndex(String idxName, int nDocs, int size) throws IOException { | ||
| for (int i = 0; i < nDocs; i++) { | ||
| List<IndexRequestBuilder> builders = new ArrayList<>(); | ||
| builders.add( | ||
| prepareIndex(idxName).setId(Integer.toString(i)) | ||
| .setSource( | ||
| jsonBuilder().startObject() | ||
| .field(TERMS_AGGS_FIELD_1, "val" + i % 53) | ||
| .field(TERMS_AGGS_FIELD_2, "val" + i % 23) | ||
| .field(TERMS_AGGS_FIELD_3, "val" + i % 10) | ||
| .field(SORT_FIELD, i) | ||
| .field("text", "some text to entertain") | ||
| .field("large_text_1", Strings.repeat("this is a text field 1 ", size)) | ||
| .field("large_text_2", Strings.repeat("this is a text field 2 ", size)) | ||
| .field("large_text_3", Strings.repeat("this is a text field 3 ", size)) | ||
| .field("large_text_4", Strings.repeat("this is a text field 4 ", size)) | ||
| .field("large_text_5", Strings.repeat("this is a text field 5 ", size)) | ||
| .field("field1", 5) | ||
| .field("field2", 2.71) | ||
| .endObject() | ||
| ) | ||
| ); | ||
|
|
||
| indexRandom(true, builders); | ||
| } | ||
| } | ||
|
|
||
| private static SearchRequestBuilder query(String indexName) { | ||
| return prepareSearch(indexName).addAggregation( | ||
| terms("terms").executionHint(randomExecutionHint()) | ||
| .field(TERMS_AGGS_FIELD_1) | ||
| .subAggregation( | ||
| terms("terms").executionHint(randomExecutionHint()) | ||
| .field(TERMS_AGGS_FIELD_2) | ||
| .subAggregation( | ||
| terms("terms").executionHint(randomExecutionHint()) | ||
| .field(TERMS_AGGS_FIELD_2) | ||
| .subAggregation(topHits("hits").sort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC))) | ||
| ) | ||
| ) | ||
| ); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.IntConsumer; | ||
|
|
||
| class TopHitsAggregator extends MetricsAggregator { | ||
|
|
||
|
|
@@ -198,7 +199,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE | |
| for (int i = 0; i < topDocs.scoreDocs.length; i++) { | ||
| docIdsToLoad[i] = topDocs.scoreDocs[i].doc; | ||
| } | ||
| FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad); | ||
| FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad, this::addRequestCircuitBreakerBytes); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we batch here and avoid invoking the CB for every document? I suspect that fetching source is way more expensive than invoking the CB, so I'm not sure we want more complication here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe worth checking an esrally benchmark here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll run some aggs nightlies and see what happens
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran nyc-taxis track, and I see no slowdown (actually it seems faster). |
||
| if (fetchProfiles != null) { | ||
| fetchProfiles.add(fetchResult.profileResult()); | ||
| } | ||
|
|
@@ -222,7 +223,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE | |
| ); | ||
| } | ||
|
|
||
| private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) { | ||
| private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad, IntConsumer memoryChecker) { | ||
| // Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet. | ||
| SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext()); | ||
| // InnerHitSubContext is not thread-safe, so we fork it as well to support concurrent execution | ||
|
|
@@ -242,7 +243,7 @@ public InnerHitsContext innerHits() { | |
| } | ||
| }; | ||
|
|
||
| fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null); | ||
| fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null, memoryChecker); | ||
| return fetchSubSearchContext.fetchResult(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| import org.apache.lucene.index.LeafReaderContext; | ||
| import org.apache.lucene.search.TotalHits; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader; | ||
| import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; | ||
| import org.elasticsearch.index.mapper.IdLoader; | ||
|
|
@@ -47,6 +48,7 @@ | |
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.IntConsumer; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.index.get.ShardGetService.maybeExcludeVectorFields; | ||
|
|
@@ -67,6 +69,17 @@ public FetchPhase(List<FetchSubPhase> fetchSubPhases) { | |
| } | ||
|
|
||
| public void execute(SearchContext context, int[] docIdsToLoad, RankDocShardInfo rankDocs) { | ||
| execute(context, docIdsToLoad, rankDocs, null); | ||
| } | ||
|
|
||
| /** | ||
| * | ||
| * @param context | ||
| * @param docIdsToLoad | ||
| * @param rankDocs | ||
| * @param memoryChecker if not provided, the fetch phase will use the circuit breaker to check memory usage | ||
| */ | ||
| public void execute(SearchContext context, int[] docIdsToLoad, RankDocShardInfo rankDocs, @Nullable IntConsumer memoryChecker) { | ||
| if (LOGGER.isTraceEnabled()) { | ||
| LOGGER.trace("{}", new SearchContextSourcePrinter(context)); | ||
| } | ||
|
|
@@ -88,7 +101,7 @@ public void execute(SearchContext context, int[] docIdsToLoad, RankDocShardInfo | |
| : Profilers.startProfilingFetchPhase(); | ||
| SearchHits hits = null; | ||
| try { | ||
| hits = buildSearchHits(context, docIdsToLoad, profiler, rankDocs); | ||
| hits = buildSearchHits(context, docIdsToLoad, profiler, rankDocs, memoryChecker); | ||
| } finally { | ||
| try { | ||
| // Always finish profiling | ||
|
|
@@ -116,7 +129,13 @@ public Source getSource(LeafReaderContext ctx, int doc) { | |
| } | ||
| } | ||
|
|
||
| private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Profiler profiler, RankDocShardInfo rankDocs) { | ||
| private SearchHits buildSearchHits( | ||
| SearchContext context, | ||
| int[] docIdsToLoad, | ||
| Profiler profiler, | ||
| RankDocShardInfo rankDocs, | ||
| IntConsumer memoryChecker | ||
| ) { | ||
| var lookup = context.getSearchExecutionContext().getMappingLookup(); | ||
|
|
||
| // Optionally remove sparse and dense vector fields early to: | ||
|
|
@@ -180,6 +199,14 @@ private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Pr | |
| SourceLoader.Leaf leafSourceLoader; | ||
| IdLoader.Leaf leafIdLoader; | ||
|
|
||
| IntConsumer memChecker = memoryChecker != null ? memoryChecker : bytes -> { | ||
| locallyAccumulatedBytes[0] += bytes; | ||
| if (context.checkCircuitBreaker(locallyAccumulatedBytes[0], "fetch source")) { | ||
| addRequestBreakerBytes(locallyAccumulatedBytes[0]); | ||
| locallyAccumulatedBytes[0] = 0; | ||
| } | ||
| }; | ||
|
|
||
| @Override | ||
| protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException { | ||
| Timer timer = profiler.startNextReader(); | ||
|
|
@@ -206,10 +233,6 @@ protected SearchHit nextDoc(int doc) throws IOException { | |
| if (context.isCancelled()) { | ||
| throw new TaskCancelledException("cancelled"); | ||
| } | ||
| if (context.checkRealMemoryCB(locallyAccumulatedBytes[0], "fetch source")) { | ||
| // if we checked the real memory breaker, we restart our local accounting | ||
| locallyAccumulatedBytes[0] = 0; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the time these batches were too small, so this didn't trigger. |
||
| } | ||
|
|
||
| HitContext hit = prepareHitContext( | ||
| context, | ||
|
|
@@ -233,7 +256,9 @@ protected SearchHit nextDoc(int doc) throws IOException { | |
|
|
||
| BytesReference sourceRef = hit.hit().getSourceRef(); | ||
| if (sourceRef != null) { | ||
| locallyAccumulatedBytes[0] += sourceRef.length(); | ||
| // This is an empirical value that seems to work well. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To simplify the logic, we could create an and simply call We should double check if that works
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @drempapis, I'll give it a try
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made this change and yes, the code looks much better now 👍 |
||
| // Deserializing a large source would also mean serializing it to HTTP response later on, so x2 seems reasonable | ||
| memChecker.accept(sourceRef.length() * 2); | ||
| } | ||
| success = true; | ||
| return hit.hit(); | ||
|
|
@@ -245,24 +270,31 @@ protected SearchHit nextDoc(int doc) throws IOException { | |
| } | ||
| }; | ||
|
|
||
| SearchHit[] hits = docsIterator.iterate( | ||
| context.shardTarget(), | ||
| context.searcher().getIndexReader(), | ||
| docIdsToLoad, | ||
| context.request().allowPartialSearchResults(), | ||
| context.queryResult() | ||
| ); | ||
| try { | ||
| SearchHit[] hits = docsIterator.iterate( | ||
| context.shardTarget(), | ||
| context.searcher().getIndexReader(), | ||
| docIdsToLoad, | ||
| context.request().allowPartialSearchResults(), | ||
| context.queryResult() | ||
| ); | ||
|
|
||
| if (context.isCancelled()) { | ||
| for (SearchHit hit : hits) { | ||
| // release all hits that would otherwise become owned and eventually released by SearchHits below | ||
| hit.decRef(); | ||
| if (context.isCancelled()) { | ||
| for (SearchHit hit : hits) { | ||
| // release all hits that would otherwise become owned and eventually released by SearchHits below | ||
| hit.decRef(); | ||
| } | ||
| throw new TaskCancelledException("cancelled"); | ||
| } | ||
| throw new TaskCancelledException("cancelled"); | ||
| } | ||
|
|
||
| TotalHits totalHits = context.getTotalHits(); | ||
| return new SearchHits(hits, totalHits, context.getMaxScore()); | ||
| TotalHits totalHits = context.getTotalHits(); | ||
| return new SearchHits(hits, totalHits, context.getMaxScore()); | ||
| } finally { | ||
| long bytes = docsIterator.getRequestBreakerBytes(); | ||
| if (bytes > 0L) { | ||
| context.circuitBreaker().addWithoutBreaking(-bytes); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @luigidellaquila @drempapis Sorry for the late comment here. Thanks for iterating on this. I think we're releasing the bytes from the circuit breaker too soon here? The response has not been sent to the client yet so these hits are still in memory.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreidan I agree, we should do better here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreidan, we missed it! I’ll work on it in parallel and make it a priority, given the complexity of the alternative.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the additional context. While #139124 (which looks like an excellent way to move forward) is under development, how confident are we with this change ? Do we have an overview of which queries and scenarios it improves over the previous implementation? and perhaps more importantly where it might fall a bit short to what we had previously? From the PR description it seems that the thing that we fixed was: but we didn't have to replace the real CB to achieve this. Did we consider triggering the real memory CB on a doc count too? (i.e. accumulated 1MiB in source size or retrieved 1024 documents)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreidan looking at previous discussions (eg. this and the internal ES-12944), my impression is that we don't want to rely too much on the real memory CB. I started this PR mostly to address memory problems with TopHits (as for the comment you mentioned above); my original implementation was intended to use the request breaker when the fetch phase was part of Aggs, but based on the discussions with @drempapis (again, see the internal ES-12944) we ended up using it also for the general case (ie. for Search). I'll let @drempapis comment on this and on the possible regressions, as he probably has more visibility the Search side. For Aggs we are definitely in a much better position now.
Not that I know
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In ES-12944 we analyzed a core issue: on nodes with small For Even when the request breaker is active, its default limit (60 percent of heap) is often too high to protect a small-heap node once baseline memory usage is included. The parent-memory breaker only fires at very high global heap occupancy and usually reacts too late, after most of the large objects are already live. I opened a PR to mitigate the issue of releasing bytes from the circuit breaker too early while the searchHits are still resident in memory. This is a short-term fix until a more complete solution will be implemented.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah excellent, thanks for working on #139243 Dimi ❤️ Awesome we're jumping right on it to fix it! |
||
| } | ||
| } | ||
| } | ||
|
|
||
| List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContext context, Profiler profiler) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,20 @@ | |
| */ | ||
| abstract class FetchPhaseDocsIterator { | ||
|
|
||
| /** | ||
| * Accounts for FetchPhase memory usage. | ||
| * It gets cleaned up after each fetch phase and should not be accessed/modified by subclasses. | ||
| */ | ||
| private long requestBreakerBytes; | ||
|
|
||
| public void addRequestBreakerBytes(long delta) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It’d be good to document that this field is strictly for fetch-phase memory accounting, will be reversed at the end of the FetchPhase, and shouldn’t be accessed or modified by subclasses. (if any added in the future) |
||
| requestBreakerBytes += delta; | ||
| } | ||
|
|
||
| public long getRequestBreakerBytes() { | ||
| return requestBreakerBytes; | ||
| } | ||
|
|
||
| /** | ||
| * Called when a new leaf reader is reached | ||
| * @param ctx the leaf reader for this set of doc ids | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -386,15 +386,15 @@ public Query rewrittenQuery() { | |
| public abstract long memAccountingBufferSize(); | ||
|
|
||
| /** | ||
| * Checks if the accumulated bytes are greater than the buffer size and if so, checks the available memory in the parent breaker | ||
| * (the real memory breaker). | ||
| * Checks if the accumulated bytes are greater than the buffer size and if so, checks the circuit breaker. | ||
| * IMPORTANT: the caller is responsible for cleaning up the circuit breaker. | ||
| * @param locallyAccumulatedBytes the number of bytes accumulated locally | ||
| * @param label the label to use in the breaker | ||
| * @return true if the real memory breaker is called and false otherwise | ||
| * @return true if the circuit breaker is called and false otherwise | ||
| */ | ||
| public final boolean checkRealMemoryCB(int locallyAccumulatedBytes, String label) { | ||
| public final boolean checkCircuitBreaker(int locallyAccumulatedBytes, String label) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is no longer RealMemory, but rather normal (Request) CB
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just in case, is there something else using this, that could not be freeing the accounted memory?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's the only usage for now
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add to the Javadoc that callers are responsible for decrementing the breaker by the same amount at some point. |
||
| if (locallyAccumulatedBytes >= memAccountingBufferSize()) { | ||
| circuitBreaker().addEstimateBytesAndMaybeBreak(0, label); | ||
| circuitBreaker().addEstimateBytesAndMaybeBreak(locallyAccumulatedBytes, label); | ||
| return true; | ||
| } | ||
| return false; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really like to have this test working, but I couldn't find a way to make it pass deterministically; depending on the system conditions sometimes it OOMs before CB, and if I reduce the memory further, it doesn't CB anymore.
We have unit tests for FetchPhase CB, but having an integration test would be really really good.
I don't know if we should keep this here or delete it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I had a similar problem with the aggs reduction phase memory test, which is now muted: #134667
Test works, but it fails sometimes. I tried tweaking it to have an exact amount of nodes of each kind, as well as docs, queyr limits and CB settings, and it improved, but still flaky.
Maybe you could try forcing a set amount of nodes, like in here:
elasticsearch/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
Line 44 in c6ddf5d
Less random, but less flaky (luckily 🤞 )