Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/88538.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88538
summary: Improve EQL Sequence circuit breaker precision
area: EQL
type: bug
issues:
- 88300
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ public void clear() {
private final Stats stats = new Stats();

private boolean headLimit = false;
private long totalRamBytesUsed = 0;

// circuit breaker accounting
private long prevRamBytesUsedInFlight = 0;
private long prevRamBytesUsedCompleted = 0;

@SuppressWarnings("rawtypes")
public SequenceMatcher(int stages, boolean descending, TimeValue maxSpan, Limit limit, CircuitBreaker circuitBreaker) {
Expand Down Expand Up @@ -114,9 +117,6 @@ private void trackSequence(Sequence sequence) {
* Returns false if the process needs to be stopped.
*/
boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
long ramBytesUsedInFlight = ramBytesUsedInFlight();
long ramBytesUsedCompleted = ramBytesUsedCompleted();

for (Tuple<KeyAndOrdinal, HitReference> tuple : hits) {
KeyAndOrdinal ko = tuple.v1();
HitReference hit = tuple.v2();
Expand Down Expand Up @@ -145,7 +145,7 @@ boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) {
log.trace("{}", stats);
matched = true;
}
trackMemory(ramBytesUsedInFlight, ramBytesUsedCompleted);
trackMemory();
return matched;
}

Expand Down Expand Up @@ -305,34 +305,35 @@ public void clear() {
clearCircuitBreaker();
}

private long ramBytesUsedInFlight() {
// protected for testing purposes
protected long ramBytesUsedInFlight() {
return RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys);
}

private long ramBytesUsedCompleted() {
// protected for testing purposes
protected long ramBytesUsedCompleted() {
return RamUsageEstimator.sizeOfCollection(completed);
}

private void addMemory(long bytes, String label) {
totalRamBytesUsed += bytes;
circuitBreaker.addEstimateBytesAndMaybeBreak(bytes, label);
}

private void clearCircuitBreaker() {
circuitBreaker.addWithoutBreaking(-totalRamBytesUsed);
totalRamBytesUsed = 0;
circuitBreaker.addWithoutBreaking(-prevRamBytesUsedInFlight - prevRamBytesUsedCompleted);
prevRamBytesUsedInFlight = 0;
prevRamBytesUsedCompleted = 0;
}

// The method is called at the end of match() which is called for every sub query in the sequence query
// and for each subquery every "fetch_size" docs. Doing RAM accounting on object creation is
// expensive, so we just calculate the difference in bytes of the total memory that the matcher's
// structure occupy for the in-flight tracking of sequences, as well as for the list of completed
// sequences.
private void trackMemory(long prevRamBytesUsedInflight, long prevRamBytesUsedCompleted) {
long bytesDiff = ramBytesUsedInFlight() - prevRamBytesUsedInflight;
addMemory(bytesDiff, CB_INFLIGHT_LABEL);
bytesDiff = ramBytesUsedCompleted() - prevRamBytesUsedCompleted;
addMemory(bytesDiff, CB_COMPLETED_LABEL);
private void trackMemory() {
long newRamBytesUsedInFlight = ramBytesUsedInFlight();
circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedInFlight - prevRamBytesUsedInFlight, CB_INFLIGHT_LABEL);
prevRamBytesUsedInFlight = newRamBytesUsedInFlight;

long newRamBytesUsedCompleted = ramBytesUsedCompleted();
circuitBreaker.addEstimateBytesAndMaybeBreak(newRamBytesUsedCompleted - prevRamBytesUsedCompleted, CB_COMPLETED_LABEL);
prevRamBytesUsedCompleted = newRamBytesUsedCompleted;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,7 @@ public void fetchHits(Iterable<List<HitReference>> refs, ActionListener<List<Lis

public void testCircuitBreakerTumblingWindow() {
QueryClient client = new TestQueryClient();
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(stages);

for (int i = 0; i < stages; i++) {
final int j = i;
criteria.add(
new Criterion<>(
i,
new BoxedQueryRequest(
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
"@timestamp",
emptyList(),
emptySet()
),
keyExtractors,
tsExtractor,
null,
implicitTbExtractor,
false
)
);
}
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(stages);

SequenceMatcher matcher = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, CIRCUIT_BREAKER);
TumblingWindow window = new TumblingWindow(client, criteria, null, matcher);
Expand Down Expand Up @@ -187,8 +167,10 @@ public void testCircuitBreakerSequenceMatcher() {
assertEquals("sequence_inflight", e.getMessage());

// Break on second iteration
SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, new EqlTestCircuitBreaker(15000));
EqlTestCircuitBreaker breaker = new EqlTestCircuitBreaker(15000);
SequenceMatcher matcher2 = new SequenceMatcher(stages, false, TimeValue.MINUS_ONE, null, breaker);
matcher2.match(0, hits);
assertEquals(matcher2.ramBytesUsedInFlight() + matcher2.ramBytesUsedCompleted(), breaker.ramBytesUsed);
e = expectThrows(CircuitBreakingException.class, () -> matcher2.match(0, hits));
assertEquals("sequence_inflight", e.getMessage());

Expand All @@ -210,92 +192,18 @@ public void testMemoryClearedOnShardsException() {
}

private void assertMemoryCleared(int sequenceFiltersCount, BiFunction<CircuitBreaker, Integer, ESMockClient> esClientSupplier) {
final int SEARCH_REQUESTS_EXPECTED_COUNT = 2;
List<BreakerSettings> eqlBreakerSettings = Collections.singletonList(
new BreakerSettings(
CIRCUIT_BREAKER_NAME,
CIRCUIT_BREAKER_LIMIT,
CIRCUIT_BREAKER_OVERHEAD,
CircuitBreaker.Type.MEMORY,
CircuitBreaker.Durability.TRANSIENT
)
);
final int searchRequestsExpectedCount = 2;
try (
CircuitBreakerService service = new HierarchyCircuitBreakerService(
Settings.EMPTY,
eqlBreakerSettings,
breakerSettings(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), SEARCH_REQUESTS_EXPECTED_COUNT);
ESMockClient esClient = esClientSupplier.apply(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount);
) {
CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME);
EqlConfiguration eqlConfiguration = new EqlConfiguration(
new String[] { "test" },
org.elasticsearch.xpack.ql.util.DateUtils.UTC,
"nobody",
"cluster",
null,
emptyMap(),
null,
TimeValue.timeValueSeconds(30),
null,
123,
"",
new TaskId("test", 123),
new EqlSearchTask(
randomLong(),
"transport",
EqlSearchAction.NAME,
"",
null,
emptyMap(),
emptyMap(),
new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)),
TimeValue.timeValueDays(5)
),
x -> Collections.emptySet()
);
IndexResolver indexResolver = new IndexResolver(
esClient,
"cluster",
DefaultDataTypeRegistry.INSTANCE,
() -> { return emptySet(); }
);
EqlSession eqlSession = new EqlSession(
esClient,
eqlConfiguration,
indexResolver,
new PreAnalyzer(),
new PostAnalyzer(),
new EqlFunctionRegistry(),
new Verifier(new Metrics()),
new Optimizer(),
new Planner(),
eqlCircuitBreaker
);
QueryClient eqlClient = new PITAwareQueryClient(eqlSession);
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(sequenceFiltersCount);

for (int i = 0; i < sequenceFiltersCount; i++) {
final int j = i;
criteria.add(
new Criterion<>(
i,
new BoxedQueryRequest(
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
"@timestamp",
emptyList(),
emptySet()
),
keyExtractors,
tsExtractor,
null,
implicitTbExtractor,
false
)
);
}

QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker);
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(sequenceFiltersCount);
SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker);
TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher);
window.execute(wrap(p -> {}, ex -> {}));
Expand All @@ -306,6 +214,112 @@ private void assertMemoryCleared(int sequenceFiltersCount, BiFunction<CircuitBre
}
}

// test covering fix for https://github.com/elastic/elasticsearch/issues/88300
public void testEqlCBCleanedUp_on_ParentCBBreak() {
final int sequenceFiltersCount = 2;
final int searchRequestsExpectedCount = 2;

// let the parent circuit breaker fail, setting its limit to zero
Settings settings = Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), 0).build();

try (
CircuitBreakerService service = new HierarchyCircuitBreakerService(
settings,
breakerSettings(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
ESMockClient esClient = new SuccessfulESMockClient(service.getBreaker(CIRCUIT_BREAKER_NAME), searchRequestsExpectedCount);
) {
CircuitBreaker eqlCircuitBreaker = service.getBreaker(CIRCUIT_BREAKER_NAME);
QueryClient eqlClient = buildQueryClient(esClient, eqlCircuitBreaker);
List<Criterion<BoxedQueryRequest>> criteria = buildCriteria(sequenceFiltersCount);

SequenceMatcher matcher = new SequenceMatcher(sequenceFiltersCount, false, TimeValue.MINUS_ONE, null, eqlCircuitBreaker);
TumblingWindow window = new TumblingWindow(eqlClient, criteria, null, matcher);
window.execute(wrap(p -> fail(), ex -> assertTrue(ex instanceof CircuitBreakingException)));
}
}

private List<BreakerSettings> breakerSettings() {
List<BreakerSettings> eqlBreakerSettings = Collections.singletonList(
new BreakerSettings(
CIRCUIT_BREAKER_NAME,
CIRCUIT_BREAKER_LIMIT,
CIRCUIT_BREAKER_OVERHEAD,
CircuitBreaker.Type.MEMORY,
CircuitBreaker.Durability.TRANSIENT
)
);
return eqlBreakerSettings;
}

private List<Criterion<BoxedQueryRequest>> buildCriteria(int sequenceFiltersCount) {
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(sequenceFiltersCount);
for (int i = 0; i < sequenceFiltersCount; i++) {
final int j = i;
criteria.add(
new Criterion<>(
i,
new BoxedQueryRequest(
() -> SearchSourceBuilder.searchSource().size(10).query(matchAllQuery()).terminateAfter(j),
"@timestamp",
emptyList(),
emptySet()
),
keyExtractors,
tsExtractor,
null,
implicitTbExtractor,
false
)
);
}
return criteria;
}

private QueryClient buildQueryClient(ESMockClient esClient, CircuitBreaker eqlCircuitBreaker) {
EqlConfiguration eqlConfiguration = new EqlConfiguration(
new String[] { "test" },
org.elasticsearch.xpack.ql.util.DateUtils.UTC,
"nobody",
"cluster",
null,
emptyMap(),
null,
TimeValue.timeValueSeconds(30),
null,
123,
"",
new TaskId("test", 123),
new EqlSearchTask(
randomLong(),
"transport",
EqlSearchAction.NAME,
"",
null,
emptyMap(),
emptyMap(),
new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1)),
TimeValue.timeValueDays(5)
),
x -> Collections.emptySet()
);
IndexResolver indexResolver = new IndexResolver(esClient, "cluster", DefaultDataTypeRegistry.INSTANCE, Collections::emptySet);
EqlSession eqlSession = new EqlSession(
esClient,
eqlConfiguration,
indexResolver,
new PreAnalyzer(),
new PostAnalyzer(),
new EqlFunctionRegistry(),
new Verifier(new Metrics()),
new Optimizer(),
new Planner(),
eqlCircuitBreaker
);
return new PITAwareQueryClient(eqlSession);
}

/**
* A type of internal Node client that deals with three types of requests: open PIT, close PIT and SearchRequest.
* This class is used by {@code CircuitBreakerTests#testMemoryClearedOnSuccessfulRequest()} and
Expand Down