Skip to content

Commit 890b8f4

Browse files
Improve EQL Sequence circuit breaker precision (#88538) (#88976)
Fixes #88300
1 parent c8dd9ae commit 890b8f4

File tree

3 files changed

+141
-120
lines changed

3 files changed

+141
-120
lines changed

docs/changelog/88538.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 88538
2+
summary: Improve EQL Sequence circuit breaker precision
3+
area: EQL
4+
type: bug
5+
issues:
6+
- 88300

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ public void clear() {
8282
private final Stats stats = new Stats();
8383

8484
private boolean headLimit = false;
85-
private long totalRamBytesUsed = 0;
85+
86+
// circuit breaker accounting
87+
private long prevRamBytesUsedInFlight = 0;
88+
private long prevRamBytesUsedCompleted = 0;
8689

8790
@SuppressWarnings("rawtypes")
8891
public SequenceMatcher(int stages, boolean descending, TimeValue maxSpan, Limit limit, CircuitBreaker circuitBreaker) {
@@ -114,9 +117,6 @@ private void trackSequence(Sequence sequence) {
114117
* Returns false if the process needs to be stopped.
115118
*/
116119
boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
117-
long ramBytesUsedInFlight = ramBytesUsedInFlight();
118-
long ramBytesUsedCompleted = ramBytesUsedCompleted();
119-
120120
for (Tuple<KeyAndOrdinal, HitReference> tuple : hits) {
121121
KeyAndOrdinal ko = tuple.v1();
122122
HitReference hit = tuple.v2();
@@ -145,7 +145,7 @@ boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
145145
log.trace("{}", stats);
146146
matched = true;
147147
}
148-
trackMemory(ramBytesUsedInFlight, ramBytesUsedCompleted);
148+
trackMemory();
149149
return matched;
150150
}
151151

@@ -305,34 +305,35 @@ public void clear() {
305305
clearCircuitBreaker();
306306
}
307307

308-
private long ramBytesUsedInFlight() {
308+
// protected for testing purposes
309+
protected long ramBytesUsedInFlight() {
309310
return RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys);
310311
}
311312

312-
private long ramBytesUsedCompleted() {
313+
// protected for testing purposes
314+
protected long ramBytesUsedCompleted() {
313315
return RamUsageEstimator.sizeOfCollection(completed);
314316
}
315317

316-
private void addMemory(long bytes, String label) {
317-
totalRamBytesUsed += bytes;
318-
circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label);
319-
}
320-
321318
private void clearCircuitBreaker() {
322-
circuitBreaker.addWithoutBreaking(-totalRamBytesUsed);
323-
totalRamBytesUsed = 0;
319+
circuitBreaker.addWithoutBreaking(-prevRamBytesUsedInFlight - prevRamBytesUsedCompleted);
320+
prevRamBytesUsedInFlight = 0;
321+
prevRamBytesUsedCompleted = 0;
324322
}
325323

326324
// The method is called at the end of match() which is called for every sub query in the sequence query
327325
// and for each subquery every "fetch_size" docs. Doing RAM accounting on object creation is
328326
// expensive, so we just calculate the difference in bytes of the total memory that the matcher's
329327
// structure occupy for the in-flight tracking of sequences, as well as for the list of completed
330328
// sequences.
331-
private void trackMemory(long prevRamBytesUsedInflight, long prevRamBytesUsedCompleted) {
332-
long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInflight;
333-
addMemory(bytesDiff, CB_INFLIGHT_LABEL);
334-
bytesDiff = ramBytesUsedCompleted() - prevRamBytesUsedCompleted;
335-
addMemory(bytesDiff, CB_COMPLETED_LABEL);
329+
private void trackMemory() {
330+
long newRamBytesUsedInFlight = ramBytesUsedInFlight();
331+
circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL);
332+
prevRamBytesUsedInFlight = newRamBytesUsedInFlight;
333+
334+
long newRamBytesUsedCompleted = ramBytesUsedCompleted();
335+
circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL);
336+
prevRamBytesUsedCompleted = newRamBytesUsedCompleted;
336337
}
337338

338339
@Override

x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java

Lines changed: 115 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -132,27 +132,7 @@ public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<Lis
132132

133133
public void testCircuitBreakerTumblingWindow() {
134134
QueryClient client = new TestQueryClient();
135-
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(stages);
136-
137-
for (int i = 0; i < stages; i++) {
138-
final int j = i;
139-
criteria.add(
140-
new Criterion<>(
141-
i,
142-
new BoxedQueryRequest(
143-
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
144-
"@timestamp",
145-
emptyList(),
146-
emptySet()
147-
),
148-
keyExtractors,
149-
tsExtractor,
150-
null,
151-
implicitTbExtractor,
152-
false
153-
)
154-
);
155-
}
135+
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(stages);
156136

157137
SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, CIRCUIT_BREAKER);
158138
TumblingWindow window = new TumblingWindow(client, criteria, null, matcher);
@@ -187,8 +167,10 @@ public void testCircuitBreakerSequenceMatcher() {
187167
assertEquals("sequence_inflight", e.getMessage());
188168

189169
// Break on second iteration
190-
SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, new EqlTestCircuitBreaker(15000));
170+
EqlTestCircuitBreaker breaker = new EqlTestCircuitBreaker(15000);
171+
SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, breaker);
191172
matcher2.match(0, hits);
173+
assertEquals(matcher2.ramBytesUsedInFlight() + matcher2.ramBytesUsedCompleted(), breaker.ramBytesUsed);
192174
e = expectThrows(CircuitBreakingException.class, () -> matcher2.match(0, hits));
193175
assertEquals("sequence_inflight", e.getMessage());
194176

@@ -210,92 +192,18 @@ public void testMemoryClearedOnShardsException() {
210192
}
211193

212194
private void assertMemoryCleared(int sequenceFiltersCount, BiFunction<CircuitBreaker, Integer, ESMockClient> esClientSupplier) {
213-
final int SEARCH_REQUESTS_EXPECTED_COUNT = 2;
214-
List<BreakerSettings> eqlBreakerSettings = Collections.singletonList(
215-
new BreakerSettings(
216-
CIRCUIT_BREAKER_NAME,
217-
CIRCUIT_BREAKER_LIMIT,
218-
CIRCUIT_BREAKER_OVERHEAD,
219-
CircuitBreaker.Type.MEMORY,
220-
CircuitBreaker.Durability.TRANSIENT
221-
)
222-
);
195+
final int searchRequestsExpectedCount = 2;
223196
try (
224197
CircuitBreakerService service = new HierarchyCircuitBreakerService(
225198
Settings.EMPTY,
226-
eqlBreakerSettings,
199+
breakerSettings(),
227200
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
228201
);
229-
ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT);
202+
ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount);
230203
) {
231204
CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME);
232-
EqlConfiguration eqlConfiguration = new EqlConfiguration(
233-
new String[] { "test" },
234-
org.elasticsearch.xpack.ql.util.DateUtils.UTC,
235-
"nobody",
236-
"cluster",
237-
null,
238-
emptyMap(),
239-
null,
240-
TimeValue.timeValueSeconds(30),
241-
null,
242-
123,
243-
"",
244-
new TaskId("test", 123),
245-
new EqlSearchTask(
246-
randomLong(),
247-
"transport",
248-
EqlSearchAction.NAME,
249-
"",
250-
null,
251-
emptyMap(),
252-
emptyMap(),
253-
new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)),
254-
TimeValue.timeValueDays(5)
255-
),
256-
x -> Collections.emptySet()
257-
);
258-
IndexResolver indexResolver = new IndexResolver(
259-
esClient,
260-
"cluster",
261-
DefaultDataTypeRegistry.INSTANCE,
262-
() -> { return emptySet(); }
263-
);
264-
EqlSession eqlSession = new EqlSession(
265-
esClient,
266-
eqlConfiguration,
267-
indexResolver,
268-
new PreAnalyzer(),
269-
new PostAnalyzer(),
270-
new EqlFunctionRegistry(),
271-
new Verifier(new Metrics()),
272-
new Optimizer(),
273-
new Planner(),
274-
eqlCircuitBreaker
275-
);
276-
QueryClient eqlClient = new PITAwareQueryClient(eqlSession);
277-
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(sequenceFiltersCount);
278-
279-
for (int i = 0; i < sequenceFiltersCount; i++) {
280-
final int j = i;
281-
criteria.add(
282-
new Criterion<>(
283-
i,
284-
new BoxedQueryRequest(
285-
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
286-
"@timestamp",
287-
emptyList(),
288-
emptySet()
289-
),
290-
keyExtractors,
291-
tsExtractor,
292-
null,
293-
implicitTbExtractor,
294-
false
295-
)
296-
);
297-
}
298-
205+
QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker);
206+
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(sequenceFiltersCount);
299207
SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker);
300208
TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher);
301209
window.execute(wrap(p -> {}, ex -> {}));
@@ -306,6 +214,112 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction<CircuitBre
306214
}
307215
}
308216

217+
// test covering fix for https://github.com/elastic/elasticsearch/issues/88300
218+
public void testEqlCBCleanedUp_on_ParentCBBreak() {
219+
final int sequenceFiltersCount = 2;
220+
final int searchRequestsExpectedCount = 2;
221+
222+
// let the parent circuit breaker fail, setting its limit to zero
223+
Settings settings = Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), 0).build();
224+
225+
try (
226+
CircuitBreakerService service = new HierarchyCircuitBreakerService(
227+
settings,
228+
breakerSettings(),
229+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
230+
);
231+
ESMockClient esClient = new SuccessfulESMockClient(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount);
232+
) {
233+
CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME);
234+
QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker);
235+
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(sequenceFiltersCount);
236+
237+
SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker);
238+
TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher);
239+
window.execute(wrap(p -> fail(), ex -> assertTrue(ex instanceof CircuitBreakingException)));
240+
}
241+
}
242+
243+
private List<BreakerSettings> breakerSettings() {
244+
List<BreakerSettings> eqlBreakerSettings = Collections.singletonList(
245+
new BreakerSettings(
246+
CIRCUIT_BREAKER_NAME,
247+
CIRCUIT_BREAKER_LIMIT,
248+
CIRCUIT_BREAKER_OVERHEAD,
249+
CircuitBreaker.Type.MEMORY,
250+
CircuitBreaker.Durability.TRANSIENT
251+
)
252+
);
253+
return eqlBreakerSettings;
254+
}
255+
256+
private List<Criterion<BoxedQueryRequest>> buildCriteria(int sequenceFiltersCount) {
257+
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(sequenceFiltersCount);
258+
for (int i = 0; i < sequenceFiltersCount; i++) {
259+
final int j = i;
260+
criteria.add(
261+
new Criterion<>(
262+
i,
263+
new BoxedQueryRequest(
264+
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
265+
"@timestamp",
266+
emptyList(),
267+
emptySet()
268+
),
269+
keyExtractors,
270+
tsExtractor,
271+
null,
272+
implicitTbExtractor,
273+
false
274+
)
275+
);
276+
}
277+
return criteria;
278+
}
279+
280+
private QueryClient buildQueryClient(ESMockClient esClient, CircuitBreaker eqlCircuitBreaker) {
281+
EqlConfiguration eqlConfiguration = new EqlConfiguration(
282+
new String[] { "test" },
283+
org.elasticsearch.xpack.ql.util.DateUtils.UTC,
284+
"nobody",
285+
"cluster",
286+
null,
287+
emptyMap(),
288+
null,
289+
TimeValue.timeValueSeconds(30),
290+
null,
291+
123,
292+
"",
293+
new TaskId("test", 123),
294+
new EqlSearchTask(
295+
randomLong(),
296+
"transport",
297+
EqlSearchAction.NAME,
298+
"",
299+
null,
300+
emptyMap(),
301+
emptyMap(),
302+
new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)),
303+
TimeValue.timeValueDays(5)
304+
),
305+
x -> Collections.emptySet()
306+
);
307+
IndexResolver indexResolver = new IndexResolver(esClient, "cluster", DefaultDataTypeRegistry.INSTANCE, Collections::emptySet);
308+
EqlSession eqlSession = new EqlSession(
309+
esClient,
310+
eqlConfiguration,
311+
indexResolver,
312+
new PreAnalyzer(),
313+
new PostAnalyzer(),
314+
new EqlFunctionRegistry(),
315+
new Verifier(new Metrics()),
316+
new Optimizer(),
317+
new Planner(),
318+
eqlCircuitBreaker
319+
);
320+
return new PITAwareQueryClient(eqlSession);
321+
}
322+
309323
/**
310324
* A type of internal Node client that deals with three types of requests: open PIT, close PIT and SearchRequest.
311325
* This class is used by {@code CircuitBreakerTests#testMemoryClearedOnSuccessfulRequest()} and

0 commit comments

Comments
 (0)