Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,66 @@ describe('wrapScopedClusterClient', () => {
);
expect(logger.warn).not.toHaveBeenCalled();
});

test('throws error when es|ql async search throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();

abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);

const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();

await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'POST',
path: '/_query/async',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);

expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"POST\",\"path\":\"/_query/async\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});

test('throws error when es|ql async query poll throws abort error', async () => {
const { abortController, scopedClusterClient, childClient } = getMockClusterClients();

abortController.abort();
childClient.transport.request.mockRejectedValueOnce(
new Error('Request has been aborted by the user')
);

const abortableSearchClient = createWrappedScopedClusterClientFactory({
scopedClusterClient,
rule,
logger,
abortController,
}).client();

await expect(
abortableSearchClient.asInternalUser.transport.request({
method: 'GET',
path: '/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3',
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"ES|QL search has been aborted due to cancelled execution"`
);

expect(loggingSystemMock.collect(logger).debug[0][0]).toEqual(
`executing ES|QL query for rule .test-rule-type:abcdefg in space my-space - {\"method\":\"GET\",\"path\":\"/_query/async/FjhHTHlyRVltUm5xck1tV0RFN18wREEeOUxMcnkxZ3NTd0MzOTNabm1NQW9TUTozMjY1NjQ3\"} - with options {}`
);
expect(logger.warn).not.toHaveBeenCalled();
});
});

test('uses asInternalUser when specified', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ function getWrappedTransportRequestFn(opts: WrapEsClientOpts) {
options?: TransportRequestOptions
): Promise<TResponse | TransportResult<TResponse, TContext>> {
// Wrap ES|QL requests with an abort signal
if (params.method === 'POST' && params.path === '/_query') {
if (
(params.method === 'POST' && ['/_query', '/_query/async'].includes(params.path)) ||
(params.method === 'GET' && params.path.startsWith('/_query/async'))
) {
let requestOptions: TransportRequestOptions = {};
try {
requestOptions = options ?? {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -319,6 +320,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -492,6 +494,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -596,6 +599,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -738,6 +742,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -853,6 +858,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -969,6 +975,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1075,6 +1082,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1181,6 +1189,7 @@ describe('RuleTypeRunner', () => {

expect(ruleType.executor).toHaveBeenCalledWith({
executionId: 'abc',
ruleExecutionTimeout: '5m',
services: {
alertFactory: alertsClient.factory(),
alertsClient: alertsClient.client(),
Expand Down Expand Up @@ -1247,6 +1256,47 @@ describe('RuleTypeRunner', () => {
shouldLogAlerts: true,
});
});

test('should call executor with custom ruleExecutionTimeout', async () => {
const mockRuleExecutionTimeout = '15m';

await ruleTypeRunner.run({
context: {
alertingEventLogger,
flappingSettings: DEFAULT_FLAPPING_SETTINGS,
queryDelaySec: 0,
request: fakeRequest,
maintenanceWindowsService,
ruleId: RULE_ID,
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
alertsClient,
executionId: 'abc',
executorServices: {
getDataViews,
ruleMonitoringService: publicRuleMonitoringService,
ruleResultService: publicRuleResultService,
savedObjectsClient,
uiSettingsClient,
wrappedScopedClusterClient,
getWrappedSearchSourceClient,
},
rule: mockedRule,
ruleType: { ...ruleType, ruleTaskTimeout: mockRuleExecutionTimeout },
startedAt: new Date(DATE_1970),
state: mockTaskInstance().state,
validatedParams: mockedRuleParams,
});

expect(ruleType.executor).toHaveBeenCalledWith(
expect.objectContaining({
ruleExecutionTimeout: mockRuleExecutionTimeout,
})
);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ export class RuleTypeRunner<
...(startedAtOverridden ? { forceNow: startedAt } : {}),
}),
isServerless: context.isServerless,
ruleExecutionTimeout: ruleType.ruleTaskTimeout,
})
)
);
Expand Down
1 change: 1 addition & 0 deletions x-pack/platform/plugins/shared/alerting/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ export interface RuleExecutorOptions<
flappingSettings: RulesSettingsFlappingProperties;
getTimeRange: (timeWindow?: string) => GetTimeRangeResult;
isServerless: boolean;
ruleExecutionTimeout?: string;
}

export interface RuleParamsAndRefs<Params extends RuleTypeParams> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export const previewRulesRoute = (
rule,
services: {
shouldWriteAlerts,
shouldStopExecution: () => false,
shouldStopExecution: () => isAborted,
alertsClient: null,
alertFactory,
savedObjectsClient: coreContext.savedObjects.client,
Expand Down Expand Up @@ -322,6 +322,7 @@ export const previewRulesRoute = (
return { dateStart: date, dateEnd: date };
},
isServerless,
ruleExecutionTimeout: `${PREVIEW_TIMEOUT_SECONDS}s`,
})) as { state: TState; loggedRequests: RulePreviewLoggedRequest[] });

const errors = loggedStatusChanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface BuildEqlSearchRequestParams {
primaryTimestamp: TimestampOverride;
secondaryTimestamp: TimestampOverride | undefined;
exceptionFilter: Filter | undefined;
ruleExecutionTimeout: string | undefined;
}

export const buildEsqlSearchRequest = ({
Expand All @@ -34,6 +35,7 @@ export const buildEsqlSearchRequest = ({
secondaryTimestamp,
exceptionFilter,
size,
ruleExecutionTimeout,
}: BuildEqlSearchRequestParams) => {
const esFilter = getQueryFilter({
query: '',
Expand Down Expand Up @@ -61,5 +63,7 @@ export const buildEsqlSearchRequest = ({
filter: requestFilter,
},
},
wait_for_completion_timeout: '4m', // hard limit request timeout is 5m set by ES proxy and alerting framework. So, we should be fine to wait 4m for async query completion. If rule execution is shorter than 4m and query was not completed, it will be aborted.
...(ruleExecutionTimeout ? { keep_alive: ruleExecutionTimeout } : {}),
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import { fetchSourceDocuments } from './fetch_source_documents';
import { buildReasonMessageForEsqlAlert } from '../utils/reason_formatters';
import type { RulePreviewLoggedRequest } from '../../../../../common/api/detection_engine/rule_preview/rule_preview.gen';
import type { CreateRuleOptions, SecuritySharedParams, SignalSource } from '../types';
import { logEsqlRequest } from '../utils/logged_requests';
import { getDataTierFilter } from '../utils/get_data_tier_filter';
import { checkErrorDetails } from '../utils/check_error_details';
import * as i18n from '../translations';

import {
addToSearchAfterReturn,
Expand All @@ -54,13 +52,15 @@ export const esqlExecutor = async ({
experimentalFeatures,
licensing,
scheduleNotificationResponseActionsService,
ruleExecutionTimeout,
}: {
sharedParams: SecuritySharedParams<EsqlRuleParams>;
services: RuleExecutorServices<AlertInstanceState, AlertInstanceContext, 'default'>;
state: Record<string, unknown>;
experimentalFeatures: ExperimentalFeatures;
licensing: LicensingPluginSetup;
scheduleNotificationResponseActionsService: CreateRuleOptions['scheduleNotificationResponseActionsService'];
ruleExecutionTimeout?: string;
}) => {
const {
completeRule,
Expand Down Expand Up @@ -108,16 +108,10 @@ export const esqlExecutor = async ({
primaryTimestamp,
secondaryTimestamp,
exceptionFilter,
ruleExecutionTimeout,
});
const esqlQueryString = { drop_null_columns: true };

if (isLoggedRequestsEnabled) {
loggedRequests.push({
request: logEsqlRequest(esqlRequest, esqlQueryString),
description: i18n.ESQL_SEARCH_REQUEST_DESCRIPTION,
});
}

ruleExecutionLogger.debug(`ES|QL query request: ${JSON.stringify(esqlRequest)}`);
const exceptionsWarning = getUnprocessedExceptionsWarnings(unprocessedExceptions);
if (exceptionsWarning) {
Expand All @@ -130,15 +124,14 @@ export const esqlExecutor = async ({
esClient: services.scopedClusterClient.asCurrentUser,
requestBody: esqlRequest,
requestQueryParams: esqlQueryString,
shouldStopExecution: services.shouldStopExecution,
ruleExecutionLogger,
loggedRequests: isLoggedRequestsEnabled ? loggedRequests : undefined,
});

const esqlSearchDuration = performance.now() - esqlSignalSearchStart;
result.searchAfterTimes.push(makeFloatString(esqlSearchDuration));

if (isLoggedRequestsEnabled && loggedRequests[0]) {
loggedRequests[0].duration = Math.round(esqlSearchDuration);
}

ruleExecutionLogger.debug(`ES|QL query request took: ${esqlSearchDuration}ms`);

const isRuleAggregating = computeIsESQLQueryAggregating(completeRule.ruleParams.query);
Expand Down
Loading