Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8fd25d5
[Security Solution][Detection Engine] adds async ES|QL query
vitaliidm Apr 1, 2025
a7724d9
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 2, 2025
957e85b
[Security Solution][Detection Engine] test
vitaliidm Apr 2, 2025
1ee4962
[Security Solution][Detection Engine] more informative message
vitaliidm Apr 2, 2025
8a02d3c
[Security Solution][Detection Engine] alerting changes
vitaliidm Apr 2, 2025
0d3c168
Update build_esql_search_request.ts
vitaliidm Apr 2, 2025
ff2ab3b
[Security Solution][Detection Engine] refacotr
vitaliidm Apr 2, 2025
4ac1d30
Merge branch 'de_9_1/async_esql' of https://github.com/vitaliidm/kiba…
vitaliidm Apr 2, 2025
de0c6d2
[Security Solution][Detection Engine] logged requests refactor
vitaliidm Apr 2, 2025
df83947
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 2, 2025
79bc275
[Security Solution][Detection Engine] alerting framewrok
vitaliidm Apr 2, 2025
fa753e4
Merge branch 'de_9_1/async_esql' of https://github.com/vitaliidm/kiba…
vitaliidm Apr 2, 2025
46f74e6
[Security Solution][Detection Engine] exclude DELETE from alerting wr…
vitaliidm Apr 2, 2025
27e7f84
Update build_esql_search_request.ts
vitaliidm Apr 3, 2025
c681fd5
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 4, 2025
1be54cf
[Security Solution][Detection Engine] CR feedback
vitaliidm Apr 4, 2025
9a701e9
Merge branch 'de_9_1/async_esql' of https://github.com/vitaliidm/kiba…
vitaliidm Apr 4, 2025
e874bd3
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 4, 2025
a78438d
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 14, 2025
423551c
[Security Solution][Detection Engine] cr feedbak
vitaliidm Apr 14, 2025
dd6de17
[Security Solution][Detection Engine] introduce keep_alive
vitaliidm Apr 14, 2025
1200aae
[Security Solution][Detection Engine] add tests
vitaliidm Apr 14, 2025
b53a8ef
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 15, 2025
066c9e4
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 15, 2025
7e67924
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 16, 2025
236e9da
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 16, 2025
e849de0
Update x-pack/solutions/security/plugins/security_solution/server/lib…
vitaliidm Apr 17, 2025
8b75f69
Update esql_request.ts
vitaliidm Apr 17, 2025
adc5ef8
Merge branch 'main' into de_9_1/async_esql
vitaliidm Apr 17, 2025
4d37838
Update esql_request.ts
vitaliidm Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,66 @@ describe('wrapScopedClusterClient', () => {
);
expect(logger.warn).not.toHaveBeenCalled();
});

test('throws error when es|ql async search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();

abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);

const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();

await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'POST',
path: '/_query/async',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);

expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"POST\",\"path\":\"/_query/async\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});

test('throws error when es|ql async query poll throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();

abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);

const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();

await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'GET',
path: '/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);

expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"GET\",\"path\":\"/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});
});

test('uses asInternalUser when specified', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
options?: TransportRequestOptions
): Promise<TResponse | TransportResult<TResponse, TContext>> {
// Wrap ES|QL requests with an abort signal
if (params.method === 'POST' && params.path === '/_query') {
if (
(params.method === 'POST' && ['/_query', '/_query/async'].includes(params.path)) ||
(params.method === 'GET' && params.path.startsWith('/_query/async'))
) {
Copy link
Copy Markdown
Contributor

@doakalexi doakalexi Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really small, but in this wrapper we log the search duration. We may not want to log duration metrics for async searches bc the query times won't be accurate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have set wait_for_complete to 4m. So, query duration can be up to 4m., which is quite significant value.
And afterwards, we'll have polling requests that would help to build a whole picture of requests timings

let requestOptions: TransportRequestOptions = {};
try {
requestOptions = options ?? {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -318,6 +319,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -493,6 +495,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -599,6 +602,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -743,6 +747,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -859,6 +864,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -976,6 +982,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1080,6 +1087,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1187,6 +1195,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1254,6 +1263,47 @@ describe('RuleTypeRunner', () => {
shouldLogAlerts: true,
});
});

test('should call executor with custom ruleExecutionTimeout', async () => {
const mockRuleExecutionTimeout = '15m';

await ruleTypeRunner.run({
context: {
alertingEventLogger,
flappingSettings: DEFAULT_FLAPPING_SETTINGS,
queryDelaySec: 0,
request: fakeRequest,
maintenanceWindowsService,
ruleId: RULE_ID,
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
alertsClient,
executionId: 'abc',
executorServices: {
getDataViews,
ruleMonitoringService: publicRuleMonitoringService,
ruleResultService: publicRuleResultService,
savedObjectsClient,
uiSettingsClient,
wrappedScopedClusterClient,
getWrappedSearchSourceClient,
},
rule: mockedRule,
ruleType: { ...ruleType, ruleTaskTimeout: mockRuleExecutionTimeout },
startedAt: new Date(DATE_1970),
state: mockTaskInstance().state,
validatedParams: mockedRuleParams,
});

expect(ruleType.executor).toHaveBeenCalledWith(
expect.objectContaining({
ruleExecutionTimeout: mockRuleExecutionTimeout,
})
);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ export class RuleTypeRunner<
...(startedAtOverridden ? { forceNow: startedAt } : {}),
}),
isServerless: context.isServerless,
ruleExecutionTimeout: ruleType.ruleTaskTimeout,
})
)
);
Expand Down
1 change: 1 addition & 0 deletions x-pack/platform/plugins/shared/alerting/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export interface RuleExecutorOptions<
flappingSettings: RulesSettingsFlappingProperties;
getTimeRange: (timeWindow?: string) => GetTimeRangeResult;
isServerless: boolean;
ruleExecutionTimeout?: string;
}

export interface RuleParamsAndRefs<Params extends RuleTypeParams> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export const previewRulesRoute = (
rule,
services: {
shouldWriteAlerts: () => true,
shouldStopExecution: () => false,
shouldStopExecution: () => isAborted,
alertsClient: null,
alertFactory: {
create: alertInstanceFactoryStub<
Expand Down Expand Up @@ -298,6 +298,7 @@ export const previewRulesRoute = (
return { dateStart: date, dateEnd: date };
},
isServerless,
ruleExecutionTimeout: `${PREVIEW_TIMEOUT_SECONDS}s`,
})) as { state: TState; loggedRequests: RulePreviewLoggedRequest[] });

const errors = loggedStatusChanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface BuildEqlSearchRequestParams {
primaryTimestamp: TimestampOverride;
secondaryTimestamp: TimestampOverride | undefined;
exceptionFilter: Filter | undefined;
ruleExecutionTimeout: string | undefined;
}

export const buildEsqlSearchRequest = ({
Expand All @@ -34,6 +35,7 @@ export const buildEsqlSearchRequest = ({
secondaryTimestamp,
exceptionFilter,
size,
ruleExecutionTimeout,
}: BuildEqlSearchRequestParams) => {
const esFilter = getQueryFilter({
query: '',
Expand Down Expand Up @@ -61,5 +63,7 @@ export const buildEsqlSearchRequest = ({
filter: requestFilter,
},
},
wait_for_completion_timeout: '4m', // hard limit request timeout is 5m set by ES proxy and alerting framework. So, we should be fine to wait 4m for async query completion. If rule execution is shorter than 4m and query was not completed, it will be aborted.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting wait_for_completion_timeout this high makes this effectively a synchronous query. Should this be keep_alive instead? Some keep_alive value longer than the rule timeout would be sufficient. I think we want wait_for_completion_timeout to be some number of seconds, like 5 or 10 seconds.

Setting keep_alive will help ensure that the results are deleted quickly even if the cleanup DELETE request fails.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea was not to start polling until rule timeouts. If rule timeout is greater then 5m, when we can hit ES requests timeout limitation, we would stop waiting for query to complete(4m) and start polling.

Setting keep_alive will help ensure that the results are deleted quickly even if the cleanup DELETE request fails.

We don't have access to rule timeout within executor - only to shouldStopExecution. Which can't be used to set up keep_alive beforehand. I am not sure we want to expose this to executor and in future to rely on that value while running rule.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense. Is 4m the right value for both ECH and serverless? The default rule timeout is different for serverless, right? Is the default connection timeout different?

I think it would be worth following up on this PR to see if we can get access to the rule timeout or maybe have the framework wrapper inject a keep_alive value depending on the rule timeout. There could be some use case eventually for async requests initiated by one rule execution and retrieved by a later one, so maybe not every async request needs to have keep_alive == rule timeout, but I think it's the typical scenario for us. It'll just help with system resilience to set it since there could be hundreds of ESQL rules running every few minutes and we'll never need those results for 5 days.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense. Is 4m the right value for both ECH and serverless? The default rule timeout is different for serverless, right? Is the default connection timeout different?

Default rule timeout for serverless is 1m. If async query request runs longer that this, it would be aborted and query cancelled/deleted in ES.
It's 5m for ECH. If query takes longer than this to finish, it would be cancelled and deleted as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marshallmain , issue to expose rule timeout to rule executor: #218072

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we'll just have to be aware of any timeout discrepancies in the future. If the rule timeout increases in serverless (which is a desirable change for us) but the connection timeout values are different (e.g. lower in serverless), we could start seeing environment specific failures.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think errors would be in the same category - timeout related. Can be just for different requests(query or poll) and different times. But, we already have different timeout for different envs

...(ruleExecutionTimeout ? { keep_alive: ruleExecutionTimeout } : {}),
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import { fetchSourceDocuments } from './fetch_source_documents';
import { buildReasonMessageForEsqlAlert } from '../utils/reason_formatters';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
import type { SecurityRuleServices, SecuritySharedParams, SignalSource } from '../types';
import { logEsqlRequest } from '../utils/logged_requests';
import { getDataTierFilter } from '../utils/get_data_tier_filter';
import { checkErrorDetails } from '../utils/check_error_details';
import * as i18n from '../translations';

import {
addToSearchAfterReturn,
Expand All @@ -52,12 +50,14 @@ export const esqlExecutor = async ({
state,
licensing,
scheduleNotificationResponseActionsService,
ruleExecutionTimeout,
}: {
sharedParams: SecuritySharedParams<EsqlRuleParams>;
services: SecurityRuleServices;
state: Record<string, unknown>;
licensing: LicensingPluginSetup;
scheduleNotificationResponseActionsService: ScheduleNotificationResponseActionsService;
ruleExecutionTimeout?: string;
}) => {
const {
completeRule,
Expand Down Expand Up @@ -99,16 +99,10 @@ export const esqlExecutor = async ({
primaryTimestamp,
secondaryTimestamp,
exceptionFilter,
ruleExecutionTimeout,
});
const esqlQueryString = { drop_null_columns: true };

if (isLoggedRequestsEnabled) {
loggedRequests.push({
request: logEsqlRequest(esqlRequest, esqlQueryString),
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
});
}

ruleExecutionLogger.debug(`ES|QL query request: ${JSON.stringify(esqlRequest)}`);
const exceptionsWarning = getUnprocessedExceptionsWarnings(unprocessedExceptions);
if (exceptionsWarning) {
Expand All @@ -121,15 +115,14 @@ export const esqlExecutor = async ({
esClient: services.scopedClusterClient.asCurrentUser,
requestBody: esqlRequest,
requestQueryParams: esqlQueryString,
shouldStopExecution: services.shouldStopExecution,
ruleExecutionLogger,
loggedRequests: isLoggedRequestsEnabled ? loggedRequests : undefined,
});

const esqlSearchDuration = performance.now() - esqlSignalSearchStart;
result.searchAfterTimes.push(makeFloatString(esqlSearchDuration));

if (isLoggedRequestsEnabled && loggedRequests[0]) {
loggedRequests[0].duration = Math.round(esqlSearchDuration);
}

ruleExecutionLogger.debug(`ES|QL query request took: ${esqlSearchDuration}ms`);

const isRuleAggregating = computeIsESQLQueryAggregating(completeRule.ruleParams.query);
Expand Down
Loading