From de2354e1257a444b5463837dad8ca103215c806f Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Thu, 19 Feb 2026 16:52:16 +0100 Subject: [PATCH 1/6] add recovered events --- .../server/lib/director/queries.ts | 30 ++++ .../rule_executor/build_alert_events.test.ts | 73 +++++++- .../lib/rule_executor/build_alert_events.ts | 36 ++++ .../steps/create_recovery_events_step.test.ts | 165 ++++++++++++++++++ .../steps/create_recovery_events_step.ts | 98 +++++++++++ .../server/lib/rule_executor/steps/index.ts | 1 + .../server/setup/bind_rule_executor.ts | 2 + 7 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts create mode 100644 x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts index 156caf4295c0c..7356cc90cc290 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts @@ -9,6 +9,36 @@ import { esql, type ComposerQuery } from '@kbn/esql-language'; import type { AlertEventStatus, AlertEpisodeStatus } from '../../resources/alert_events'; import { ALERT_EVENTS_DATA_STREAM } from '../../resources/alert_events'; +interface GetActiveAlertGroupHashesQueryParams { + ruleId: string; +} + +export interface ActiveAlertGroupHash { + group_hash: string; +} + +/** + * Returns all group hashes for a rule that are currently in a non-inactive episode state + * (pending, active, or recovering). Used to detect which alerts need recovery events. + */ +export const getActiveAlertGroupHashesQuery = ({ + ruleId, +}: GetActiveAlertGroupHashesQueryParams): ComposerQuery => { + let query = esql.from(ALERT_EVENTS_DATA_STREAM); + + query = query.where`rule.id == ${{ ruleId }}`; + + query = query.pipe`STATS + last_episode_status = LAST(episode.status, @timestamp) + BY group_hash`; + + query = query.where`last_episode_status IN ("pending", "active", "recovering")`; + + query = query.keep('group_hash'); + + return query; +}; + interface GetLatestAlertEventStateQueryParams { ruleId: string; groupHashes: string[]; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts index 2092968871eba..4b385ad55ab81 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts @@ -6,7 +6,7 @@ */ import type { EsqlEsqlResult } from '@elastic/elasticsearch/lib/api/types'; -import { buildAlertEventsFromEsqlResponse } from './build_alert_events'; +import { buildAlertEventsFromEsqlResponse, buildRecoveryAlertEvents } from './build_alert_events'; describe('buildAlertEventsFromEsqlResponse', () => { beforeAll(() => { @@ -61,3 +61,74 @@ describe('buildAlertEventsFromEsqlResponse', () => { expect(doc1.group_hash).not.toEqual(doc2.group_hash); }); }); + +describe('buildRecoveryAlertEvents', () => { + beforeAll(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date('2025-01-01T00:00:00.000Z')); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('creates recovered events for active groups not in the breached set', () => { + const events = buildRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + activeGroupHashes: [{ group_hash: 'hash-a' }, { group_hash: 'hash-b' }], + breachedGroupHashes: new Set(['hash-a']), + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + '@timestamp': '2025-01-01T00:00:00.000Z', + scheduled_timestamp: '2024-12-31T23:59:00.000Z', + rule: { id: 'rule-123', version: 1 }, + group_hash: 'hash-b', + data: {}, + status: 'recovered', + source: 'internal', + type: 'signal', + }); + }); + + it('returns empty array when all active groups are still breaching', () => { + const events = buildRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + activeGroupHashes: [{ group_hash: 'hash-a' }], + breachedGroupHashes: new Set(['hash-a']), + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toEqual([]); + }); + + it('returns recovered events for all active groups when none are breaching', () => { + const events = buildRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + activeGroupHashes: [{ group_hash: 'hash-a' }, { group_hash: 'hash-b' }], + breachedGroupHashes: new Set(), + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toHaveLength(2); + expect(events.map((e) => e.group_hash)).toEqual(['hash-a', 'hash-b']); + expect(events.every((e) => e.status === 'recovered')).toBe(true); + }); + + it('returns empty array when there are no active groups', () => { + const events = buildRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + activeGroupHashes: [], + breachedGroupHashes: new Set(['hash-a']), + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toEqual([]); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts index 99c83970095ce..b7d8847da8128 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts @@ -12,6 +12,7 @@ import { stableStringify } from '@kbn/std'; import type { EsqlQueryResponse } from '@elastic/elasticsearch/lib/api/types'; import type { RuleResponse } from '../rules_client'; import type { AlertEvent } from '../../resources/alert_events'; +import type { ActiveAlertGroupHash } from '../director/queries'; function sha256(value: string) { return createHash('sha256').update(value).digest('hex'); @@ -106,3 +107,38 @@ export function buildAlertEventsFromEsqlResponse({ return doc; }); } + +export interface BuildRecoveryAlertEventsOpts { + ruleId: string; + ruleVersion: number; + activeGroupHashes: ActiveAlertGroupHash[]; + breachedGroupHashes: Set; + scheduledTimestamp: string; +} + +/** + * Creates `recovered` alert events for groups that were previously in a non-inactive + * episode state but are no longer present in the current breached set. + */ +export function buildRecoveryAlertEvents({ + ruleId, + ruleVersion, + activeGroupHashes, + breachedGroupHashes, + scheduledTimestamp, +}: BuildRecoveryAlertEventsOpts): AlertEvent[] { + const wroteAt = new Date().toISOString(); + + return activeGroupHashes + .filter(({ group_hash }) => !breachedGroupHashes.has(group_hash)) + .map(({ group_hash }) => ({ + '@timestamp': wroteAt, + scheduled_timestamp: scheduledTimestamp, + rule: { id: ruleId, version: ruleVersion }, + group_hash, + data: {}, + status: 'recovered' as const, + source: 'internal', + type: 'signal' as const, + })); +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts new file mode 100644 index 0000000000000..23fc8d3ad0ee2 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CreateRecoveryEventsStep } from './create_recovery_events_step'; +import { + createRulePipelineState, + createAlertEvent, + createRuleResponse, + createEsqlResponse, +} from '../test_utils'; +import { createLoggerService } from '../../services/logger_service/logger_service.mock'; +import { createQueryService } from '../../services/query_service/query_service.mock'; + +describe('CreateRecoveryEventsStep', () => { + const { loggerService } = createLoggerService(); + + function createActiveGroupHashesResponse(groupHashes: string[]) { + return createEsqlResponse( + [{ name: 'group_hash', type: 'keyword' }], + groupHashes.map((h) => [h]) + ); + } + + it('creates recovery events for active groups not in the breached set', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + mockEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2', 'hash-3']) + ); + + const breachedEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents: breachedEvents, + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + expect(result).toHaveProperty('data.alertEvents'); + + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + + expect(alertEvents).toHaveLength(3); + expect(alertEvents[0].status).toBe('breached'); + expect(alertEvents[0].group_hash).toBe('hash-1'); + expect(alertEvents[1].status).toBe('recovered'); + expect(alertEvents[1].group_hash).toBe('hash-2'); + expect(alertEvents[2].status).toBe('recovered'); + expect(alertEvents[2].group_hash).toBe('hash-3'); + }); + + it('skips recovery for non-alert rules', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'signal' }), + alertEvents, + }); + + const result = await step.execute(state); + + expect(mockEsClient.esql.query).not.toHaveBeenCalled(); + expect(result).toEqual({ type: 'continue', data: { alertEvents } }); + }); + + it('returns original events when no active groups exist', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + mockEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse([])); + + const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents, + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue', data: { alertEvents } }); + }); + + it('does not create recovery events for groups that are still breaching', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + mockEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); + + const alertEvents = [ + createAlertEvent({ group_hash: 'hash-1' }), + createAlertEvent({ group_hash: 'hash-2' }), + ]; + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents, + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents: resultEvents } = result.data; + expect(resultEvents).toHaveLength(2); + expect(resultEvents.every((e: { status: string }) => e.status === 'breached')).toBe(true); + }); + + it('creates recovery events for all active groups when no breached events exist', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + mockEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents: [], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + expect(alertEvents).toHaveLength(2); + expect(alertEvents.every((e: { status: string }) => e.status === 'recovered')).toBe(true); + }); + + it('halts with state_not_ready when rule is missing from state', async () => { + const { queryService } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + const state = createRulePipelineState({ alertEvents: [createAlertEvent()] }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + }); + + it('halts with state_not_ready when alertEvents is missing from state', async () => { + const { queryService } = createQueryService(); + const step = new CreateRecoveryEventsStep(loggerService, queryService); + + const state = createRulePipelineState({ rule: createRuleResponse() }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts new file mode 100644 index 0000000000000..762d4fc238c89 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import type { RuleExecutionStep, RulePipelineState, RuleStepOutput } from '../types'; +import { buildRecoveryAlertEvents } from '../build_alert_events'; +import { + LoggerServiceToken, + type LoggerServiceContract, +} from '../../services/logger_service/logger_service'; +import { QueryServiceInternalToken } from '../../services/query_service/tokens'; +import type { QueryServiceContract } from '../../services/query_service/query_service'; +import { getActiveAlertGroupHashesQuery, type ActiveAlertGroupHash } from '../../director/queries'; +import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; +import { hasState, type StateWith } from '../type_guards'; + +@injectable() +export class CreateRecoveryEventsStep implements RuleExecutionStep { + public readonly name = 'create_recovery_events'; + + constructor( + @inject(LoggerServiceToken) private readonly logger: LoggerServiceContract, + @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract + ) {} + + private isStepReady( + state: Readonly + ): state is StateWith<'rule' | 'alertEvents'> { + return hasState(state, ['rule', 'alertEvents']); + } + + public async execute(state: Readonly): Promise { + const { input } = state; + + this.logger.debug({ + message: `[${this.name}] Starting step for rule ${input.ruleId}`, + }); + + if (!this.isStepReady(state)) { + this.logger.debug({ message: `[${this.name}] State not ready, halting` }); + return { type: 'halt', reason: 'state_not_ready' }; + } + + const { rule, alertEvents } = state; + + if (rule.kind !== 'alert') { + this.logger.debug({ + message: `[${this.name}] Skipping recovery for non-alert rule ${input.ruleId}`, + }); + return { type: 'continue', data: { alertEvents } }; + } + + const activeGroupHashes = await this.fetchActiveAlertGroupHashes(input.ruleId); + + if (activeGroupHashes.length === 0) { + this.logger.debug({ + message: `[${this.name}] No active alerts to recover for rule ${input.ruleId}`, + }); + return { type: 'continue', data: { alertEvents } }; + } + + const breachedGroupHashes = new Set(alertEvents.map((event) => event.group_hash)); + + const recoveryEvents = buildRecoveryAlertEvents({ + ruleId: input.ruleId, + ruleVersion: 1, + activeGroupHashes, + breachedGroupHashes, + scheduledTimestamp: input.scheduledAt, + }); + + this.logger.debug({ + message: `[${this.name}] Created ${recoveryEvents.length} recovery events for rule ${input.ruleId}`, + }); + + return { + type: 'continue', + data: { alertEvents: [...alertEvents, ...recoveryEvents] }, + }; + } + + private async fetchActiveAlertGroupHashes(ruleId: string): Promise { + const request = getActiveAlertGroupHashesQuery({ ruleId }).toRequest(); + const response = await this.queryService.executeQuery({ + query: request.query, + // @ts-expect-error - the types of the composer query are not compatible with the types of the esql client + params: request.params, + // @ts-expect-error - the types of the composer query are not compatible with the types of the esql client + filter: request.filter, + }); + + return queryResponseToRecords(response); + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/index.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/index.ts index 8e6d5d50520b1..abb75959c6a80 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/index.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/index.ts @@ -10,3 +10,4 @@ export { FetchRuleStep } from './fetch_rule_step'; export { ValidateRuleStep } from './validate_rule_step'; export { ExecuteRuleQueryStep } from './execute_rule_query_step'; export { CreateAlertEventsStep } from './create_alert_events_step'; +export { CreateRecoveryEventsStep } from './create_recovery_events_step'; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts index d7d80e89d8dbc..c9535a936a9ea 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts @@ -17,6 +17,7 @@ import { ValidateRuleStep, ExecuteRuleQueryStep, CreateAlertEventsStep, + CreateRecoveryEventsStep, } from '../lib/rule_executor/steps'; import { ErrorHandlingMiddleware } from '../lib/rule_executor/middleware'; import { DirectorStep } from '../lib/rule_executor/steps/director_step'; @@ -47,6 +48,7 @@ export const bindRuleExecutionServices = ({ bind }: ContainerModuleLoadOptions) bind(RuleExecutionStepsToken).to(ValidateRuleStep).inSingletonScope(); bind(RuleExecutionStepsToken).to(ExecuteRuleQueryStep).inRequestScope(); bind(RuleExecutionStepsToken).to(CreateAlertEventsStep).inSingletonScope(); + bind(RuleExecutionStepsToken).to(CreateRecoveryEventsStep).inSingletonScope(); bind(RuleExecutionStepsToken).to(DirectorStep).inSingletonScope(); bind(RuleExecutionStepsToken).to(StoreAlertEventsStep).inSingletonScope(); From 217d7ee5bd02c744547554a890f500eba5b1ee41 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Thu, 19 Feb 2026 22:21:47 +0100 Subject: [PATCH 2/6] Implement recovery policy enhancements and add query recovery event handling - Introduced a new recovery policy type "query" in the rule data schema, allowing for custom recovery queries. - Updated the `createRuleDataSchema` to enforce validation rules for the new recovery policy, ensuring that a query base is required when the type is "query". - Added unit tests to validate the behavior of the new recovery policy, including acceptance and rejection scenarios. - Implemented `buildQueryRecoveryAlertEvents` function to create recovery events based on the results of the custom recovery query. - Updated the `CreateRecoveryEventsStep` to handle both "no_breach" and "query" recovery strategies, enhancing the event creation logic. - Removed the `condition` field from the recovery policy schema as it is no longer necessary. --- .../src/rule_data_schema.test.ts | 47 +++ .../src/rule_data_schema.ts | 16 +- .../alerting-v2-schemas/src/rule_response.ts | 1 - .../rule_executor/build_alert_events.test.ts | 138 ++++++- .../lib/rule_executor/build_alert_events.ts | 73 ++++ .../steps/create_recovery_events_step.test.ts | 371 +++++++++++++----- .../steps/create_recovery_events_step.ts | 94 ++++- .../rule_saved_object_attributes/v1.ts | 1 - .../server/setup/bind_rule_executor.ts | 2 +- 9 files changed, 631 insertions(+), 112 deletions(-) diff --git a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.test.ts b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.test.ts index 6ee4aa81646f6..94f8d7e9c1e31 100644 --- a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.test.ts +++ b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.test.ts @@ -417,6 +417,53 @@ describe('createRuleDataSchema', () => { }); }); + describe('recovery_policy', () => { + it('accepts recovery_policy with type "no_breach"', () => { + const result = createRuleDataSchema.safeParse({ + ...validCreateData, + recovery_policy: { type: 'no_breach' }, + }); + expect(result.success).toBe(true); + }); + + it('accepts recovery_policy with type "query" when query.base is provided', () => { + const result = createRuleDataSchema.safeParse({ + ...validCreateData, + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-* | LIMIT 1' }, + }, + }); + expect(result.success).toBe(true); + }); + + it('rejects recovery_policy with type "query" when query is missing', () => { + const result = createRuleDataSchema.safeParse({ + ...validCreateData, + recovery_policy: { type: 'query' }, + }); + expect(result.success).toBe(false); + expect(result.error?.issues[0].path).toEqual(['recovery_policy', 'query', 'base']); + }); + + it('rejects recovery_policy with type "query" when query.base is missing', () => { + const result = createRuleDataSchema.safeParse({ + ...validCreateData, + recovery_policy: { type: 'query', query: {} }, + }); + expect(result.success).toBe(false); + expect(result.error?.issues[0].path).toEqual(['recovery_policy', 'query', 'base']); + }); + + it('rejects recovery_policy with type "query" when query.base is empty', () => { + const result = createRuleDataSchema.safeParse({ + ...validCreateData, + recovery_policy: { type: 'query', query: { base: '' } }, + }); + expect(result.success).toBe(false); + }); + }); + describe('required fields', () => { it.each(['kind', 'metadata', 'schedule', 'evaluation'] as const)( 'rejects when required field "%s" is missing', diff --git a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts index cf7a6332eb8c2..7e573b8fdc8ed 100644 --- a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts +++ b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts @@ -85,10 +85,7 @@ const recoveryPolicySchema = z type: z.enum(['query', 'no_breach']).describe('Recovery detection type.'), query: z .object({ - base: esqlQuerySchema - .optional() - .describe('Base ES|QL query for recovery (or reference to evaluation.query.base).'), - condition: z.string().max(5000).optional().describe('Recovery condition (WHERE clause).'), + base: esqlQuerySchema.optional().describe('Base ES|QL query for recovery.'), }) .strict() .optional() @@ -196,7 +193,16 @@ export const createRuleDataSchema = z .refine((data) => !data.no_data || data.evaluation.query.condition != null, { message: 'evaluation.query.condition is required when no_data is configured.', path: ['evaluation', 'query', 'condition'], - }); + }) + .refine( + (data) => + data.recovery_policy?.type !== 'query' || + (data.recovery_policy.query?.base != null && data.recovery_policy.query.base.length > 0), + { + message: 'recovery_policy.query.base is required when recovery_policy.type is "query".', + path: ['recovery_policy', 'query', 'base'], + } + ); export type CreateRuleData = z.infer; diff --git a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_response.ts b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_response.ts index 40d506b1e31e5..738b1a51ca51e 100644 --- a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_response.ts +++ b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_response.ts @@ -36,7 +36,6 @@ export interface RuleResponse { type: 'query' | 'no_breach'; query?: { base?: string; - condition?: string; }; }; state_transition?: { diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts index 4b385ad55ab81..8b6e84fb6f2be 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.test.ts @@ -6,7 +6,11 @@ */ import type { EsqlEsqlResult } from '@elastic/elasticsearch/lib/api/types'; -import { buildAlertEventsFromEsqlResponse, buildRecoveryAlertEvents } from './build_alert_events'; +import { + buildAlertEventsFromEsqlResponse, + buildRecoveryAlertEvents, + buildQueryRecoveryAlertEvents, +} from './build_alert_events'; describe('buildAlertEventsFromEsqlResponse', () => { beforeAll(() => { @@ -132,3 +136,135 @@ describe('buildRecoveryAlertEvents', () => { expect(events).toEqual([]); }); }); + +describe('buildQueryRecoveryAlertEvents', () => { + beforeAll(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date('2025-01-01T00:00:00.000Z')); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('creates recovered events for active groups matching the recovery query', () => { + const esqlResponse: EsqlEsqlResult = { + columns: [ + { name: 'host.name', type: 'keyword' }, + { name: 'status', type: 'keyword' }, + ], + values: [['host-a', 'ok']], + }; + + // Build a breached event first to know the expected group_hash + const breachedEvents = buildAlertEventsFromEsqlResponse({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + esqlResponse: { + columns: [{ name: 'host.name', type: 'keyword' }], + values: [['host-a']], + }, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + const activeGroupHash = breachedEvents[0].group_hash; + + const events = buildQueryRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + activeGroupHashes: [{ group_hash: activeGroupHash }], + esqlResponse, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + '@timestamp': '2025-01-01T00:00:00.000Z', + scheduled_timestamp: '2024-12-31T23:59:00.000Z', + rule: { id: 'rule-123', version: 1 }, + group_hash: activeGroupHash, + data: { 'host.name': 'host-a', status: 'ok' }, + status: 'recovered', + source: 'internal', + type: 'signal', + }); + }); + + it('returns empty array when recovery query returns no rows', () => { + const events = buildQueryRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + activeGroupHashes: [{ group_hash: 'hash-a' }], + esqlResponse: { columns: [], values: [] }, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toEqual([]); + }); + + it('ignores recovery query rows that do not match any active group', () => { + const esqlResponse: EsqlEsqlResult = { + columns: [{ name: 'host.name', type: 'keyword' }], + values: [['host-unknown']], + }; + + const events = buildQueryRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + activeGroupHashes: [{ group_hash: 'hash-not-matching' }], + esqlResponse, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toEqual([]); + }); + + it('deduplicates when multiple recovery rows produce the same group hash', () => { + const esqlResponse: EsqlEsqlResult = { + columns: [ + { name: 'host.name', type: 'keyword' }, + { name: 'msg', type: 'keyword' }, + ], + values: [ + ['host-a', 'recovered-1'], + ['host-a', 'recovered-2'], + ], + }; + + const breachedEvents = buildAlertEventsFromEsqlResponse({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + esqlResponse: { + columns: [{ name: 'host.name', type: 'keyword' }], + values: [['host-a']], + }, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + const activeGroupHash = breachedEvents[0].group_hash; + + const events = buildQueryRecoveryAlertEvents({ + ruleId: 'rule-123', + ruleVersion: 1, + spaceId: 'default', + ruleAttributes: { grouping: { fields: ['host.name'] } }, + activeGroupHashes: [{ group_hash: activeGroupHash }], + esqlResponse, + scheduledTimestamp: '2024-12-31T23:59:00.000Z', + }); + + expect(events).toHaveLength(1); + expect(events[0].group_hash).toBe(activeGroupHash); + expect(events[0].data).toEqual({ 'host.name': 'host-a', msg: 'recovered-1' }); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts index b7d8847da8128..6d673947159ac 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts @@ -119,6 +119,8 @@ export interface BuildRecoveryAlertEventsOpts { /** * Creates `recovered` alert events for groups that were previously in a non-inactive * episode state but are no longer present in the current breached set. + * + * Used when `recovery_policy.type` is `no_breach` or unset. */ export function buildRecoveryAlertEvents({ ruleId, @@ -142,3 +144,74 @@ export function buildRecoveryAlertEvents({ type: 'signal' as const, })); } + +export interface BuildQueryRecoveryAlertEventsOpts { + ruleId: string; + ruleVersion: number; + spaceId: string; + ruleAttributes: Pick; + activeGroupHashes: ActiveAlertGroupHash[]; + esqlResponse: EsqlQueryResponse; + scheduledTimestamp: string; +} + +/** + * Creates `recovered` alert events by running a custom recovery query. + * + * Active groups whose group hash matches a row in the recovery query results + * are considered recovered. Used when `recovery_policy.type` is `query`. + */ +export function buildQueryRecoveryAlertEvents({ + ruleId, + ruleVersion, + spaceId, + ruleAttributes, + activeGroupHashes, + esqlResponse, + scheduledTimestamp, +}: BuildQueryRecoveryAlertEventsOpts): AlertEvent[] { + const columns = esqlResponse.columns ?? []; + const values = esqlResponse.values ?? []; + + if (columns.length === 0 || values.length === 0) { + return []; + } + + const executionUuid = sha256(`${ruleId}|${spaceId}|${scheduledTimestamp}|recovery`); + const activeGroupHashSet = new Set(activeGroupHashes.map(({ group_hash }) => group_hash)); + + // Keep the first matching row's data per group hash. + const recoveredByGroupHash = new Map>(); + + for (let i = 0; i < values.length; i++) { + const rowDoc = rowToDocument(columns, values[i]); + const groupHash = buildGroupHash({ + rowDoc, + groupKeyFields: ruleAttributes.grouping?.fields ?? [], + get fallbackSeed(): string { + return `${executionUuid}|row:${i}|${stableStringify(rowDoc)}`; + }, + }); + + if (activeGroupHashSet.has(groupHash) && !recoveredByGroupHash.has(groupHash)) { + recoveredByGroupHash.set(groupHash, rowDoc); + } + } + + if (recoveredByGroupHash.size === 0) { + return []; + } + + const wroteAt = new Date().toISOString(); + + return Array.from(recoveredByGroupHash).map(([groupHash, data]) => ({ + '@timestamp': wroteAt, + scheduled_timestamp: scheduledTimestamp, + rule: { id: ruleId, version: ruleVersion }, + group_hash: groupHash, + data, + status: 'recovered' as const, + source: 'internal', + type: 'signal' as const, + })); +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts index 23fc8d3ad0ee2..6d795e7cdb1ff 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts @@ -14,6 +14,7 @@ import { } from '../test_utils'; import { createLoggerService } from '../../services/logger_service/logger_service.mock'; import { createQueryService } from '../../services/query_service/query_service.mock'; +import type { AlertEvent } from '../../../resources/alert_events'; describe('CreateRecoveryEventsStep', () => { const { loggerService } = createLoggerService(); @@ -25,141 +26,327 @@ describe('CreateRecoveryEventsStep', () => { ); } - it('creates recovery events for active groups not in the breached set', async () => { - const { queryService, mockEsClient } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); - - mockEsClient.esql.query.mockResolvedValue( - createActiveGroupHashesResponse(['hash-1', 'hash-2', 'hash-3']) + function createStep() { + const internal = createQueryService(); + const scoped = createQueryService(); + const step = new CreateRecoveryEventsStep( + loggerService, + internal.queryService, + scoped.queryService ); + return { step, internalEsClient: internal.mockEsClient, scopedEsClient: scoped.mockEsClient }; + } + + describe('no_breach recovery (default)', () => { + it('creates recovery events for active groups not in the breached set', async () => { + const { step, internalEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2', 'hash-3']) + ); + + const breachedEvents = [createAlertEvent({ group_hash: 'hash-1' })]; - const breachedEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents: breachedEvents, + }); - const state = createRulePipelineState({ - rule: createRuleResponse({ kind: 'alert' }), - alertEvents: breachedEvents, + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + expect(result).toHaveProperty('data.alertEvents'); + + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + + expect(alertEvents).toHaveLength(3); + expect(alertEvents[0].status).toBe('breached'); + expect(alertEvents[0].group_hash).toBe('hash-1'); + expect(alertEvents[1].status).toBe('recovered'); + expect(alertEvents[1].group_hash).toBe('hash-2'); + expect(alertEvents[2].status).toBe('recovered'); + expect(alertEvents[2].group_hash).toBe('hash-3'); }); - const result = await step.execute(state); + it('uses no_breach strategy when recovery_policy is not set', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); - expect(result.type).toBe('continue'); - expect(result).toHaveProperty('data.alertEvents'); + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); - // @ts-expect-error: the above check ensures the alertEvents exists - const { alertEvents } = result.data; + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert', recovery_policy: undefined }), + alertEvents: [createAlertEvent({ group_hash: 'hash-1' })], + }); - expect(alertEvents).toHaveLength(3); - expect(alertEvents[0].status).toBe('breached'); - expect(alertEvents[0].group_hash).toBe('hash-1'); - expect(alertEvents[1].status).toBe('recovered'); - expect(alertEvents[1].group_hash).toBe('hash-2'); - expect(alertEvents[2].status).toBe('recovered'); - expect(alertEvents[2].group_hash).toBe('hash-3'); - }); + const result = await step.execute(state); + + expect(scopedEsClient.esql.query).not.toHaveBeenCalled(); + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + expect(alertEvents).toHaveLength(2); + expect(alertEvents[1].status).toBe('recovered'); + expect(alertEvents[1].group_hash).toBe('hash-2'); + }); + + it('skips recovery for non-alert rules', async () => { + const { step, internalEsClient } = createStep(); + + const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; - it('skips recovery for non-alert rules', async () => { - const { queryService, mockEsClient } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'signal' }), + alertEvents, + }); - const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + const result = await step.execute(state); - const state = createRulePipelineState({ - rule: createRuleResponse({ kind: 'signal' }), - alertEvents, + expect(internalEsClient.esql.query).not.toHaveBeenCalled(); + expect(result).toEqual({ type: 'continue', data: { alertEvents } }); }); - const result = await step.execute(state); + it('returns original events when no active groups exist', async () => { + const { step, internalEsClient } = createStep(); - expect(mockEsClient.esql.query).not.toHaveBeenCalled(); - expect(result).toEqual({ type: 'continue', data: { alertEvents } }); - }); + internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse([])); - it('returns original events when no active groups exist', async () => { - const { queryService, mockEsClient } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; - mockEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse([])); + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents, + }); - const alertEvents = [createAlertEvent({ group_hash: 'hash-1' })]; + const result = await step.execute(state); - const state = createRulePipelineState({ - rule: createRuleResponse({ kind: 'alert' }), - alertEvents, + expect(result).toEqual({ type: 'continue', data: { alertEvents } }); }); - const result = await step.execute(state); + it('does not create recovery events for groups that are still breaching', async () => { + const { step, internalEsClient } = createStep(); - expect(result).toEqual({ type: 'continue', data: { alertEvents } }); - }); + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); - it('does not create recovery events for groups that are still breaching', async () => { - const { queryService, mockEsClient } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + const alertEvents = [ + createAlertEvent({ group_hash: 'hash-1' }), + createAlertEvent({ group_hash: 'hash-2' }), + ]; - mockEsClient.esql.query.mockResolvedValue( - createActiveGroupHashesResponse(['hash-1', 'hash-2']) - ); + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents, + }); - const alertEvents = [ - createAlertEvent({ group_hash: 'hash-1' }), - createAlertEvent({ group_hash: 'hash-2' }), - ]; + const result = await step.execute(state); - const state = createRulePipelineState({ - rule: createRuleResponse({ kind: 'alert' }), - alertEvents, + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents: resultEvents } = result.data; + expect(resultEvents).toHaveLength(2); + expect(resultEvents.every((e: AlertEvent) => e.status === 'breached')).toBe(true); }); - const result = await step.execute(state); + it('creates recovery events for all active groups when no breached events exist', async () => { + const { step, internalEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); + + const state = createRulePipelineState({ + rule: createRuleResponse({ kind: 'alert' }), + alertEvents: [], + }); - expect(result.type).toBe('continue'); - // @ts-expect-error: the above check ensures the alertEvents exists - const { alertEvents: resultEvents } = result.data; - expect(resultEvents).toHaveLength(2); - expect(resultEvents.every((e: { status: string }) => e.status === 'breached')).toBe(true); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + expect(alertEvents).toHaveLength(2); + expect(alertEvents.every((e: AlertEvent) => e.status === 'recovered')).toBe(true); + }); }); - it('creates recovery events for all active groups when no breached events exist', async () => { - const { queryService, mockEsClient } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + describe('query-based recovery', () => { + it('executes the recovery query and creates events for matching active groups', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-a', 'hash-b', 'hash-c']) + ); + + // The recovery query returns rows whose group hashes match hash-a and hash-c + // (simulated via grouping fields; using empty grouping so hashes are content-based) + scopedEsClient.esql.query.mockResolvedValue( + createEsqlResponse( + [{ name: 'host.name', type: 'keyword' }], + [['recovery-host-1'], ['recovery-host-2']] + ) + ); + + const state = createRulePipelineState({ + rule: createRuleResponse({ + kind: 'alert', + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-* | WHERE recovered = true' }, + }, + }), + alertEvents: [], + }); + + const result = await step.execute(state); + + expect(scopedEsClient.esql.query).toHaveBeenCalledWith( + expect.objectContaining({ query: 'FROM logs-* | WHERE recovered = true' }), + expect.any(Object) + ); + expect(result.type).toBe('continue'); + }); - mockEsClient.esql.query.mockResolvedValue( - createActiveGroupHashesResponse(['hash-1', 'hash-2']) - ); + it('uses the scoped query service for the recovery query', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse(['hash-1'])); + + scopedEsClient.esql.query.mockResolvedValue(createEsqlResponse([], [])); + + const state = createRulePipelineState({ + rule: createRuleResponse({ + kind: 'alert', + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-* | WHERE ok = true' }, + }, + }), + alertEvents: [], + }); - const state = createRulePipelineState({ - rule: createRuleResponse({ kind: 'alert' }), - alertEvents: [], + await step.execute(state); + + expect(internalEsClient.esql.query).toHaveBeenCalledTimes(1); + expect(scopedEsClient.esql.query).toHaveBeenCalledTimes(1); }); - const result = await step.execute(state); + it('returns no recovery events when recovery query returns empty results', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-1', 'hash-2']) + ); + + scopedEsClient.esql.query.mockResolvedValue(createEsqlResponse([], [])); + + const state = createRulePipelineState({ + rule: createRuleResponse({ + kind: 'alert', + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-* | WHERE recovered = true' }, + }, + }), + alertEvents: [createAlertEvent({ group_hash: 'hash-3' })], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + expect(alertEvents).toHaveLength(1); + expect(alertEvents[0].group_hash).toBe('hash-3'); + }); - expect(result.type).toBe('continue'); - // @ts-expect-error: the above check ensures the alertEvents exists - const { alertEvents } = result.data; - expect(alertEvents).toHaveLength(2); - expect(alertEvents.every((e: { status: string }) => e.status === 'recovered')).toBe(true); - }); + it('only recovers active groups that match the recovery query results', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); + + internalEsClient.esql.query.mockResolvedValue( + createActiveGroupHashesResponse(['hash-a', 'hash-b']) + ); + + // Recovery query returns a row that, when hashed with grouping fields, matches hash-a + // but not hash-b. Since grouping is ['host.name'], the hash depends on the field value. + scopedEsClient.esql.query.mockResolvedValue( + createEsqlResponse([{ name: 'host.name', type: 'keyword' }], [['host-recovered']]) + ); + + const state = createRulePipelineState({ + rule: createRuleResponse({ + kind: 'alert', + grouping: { fields: ['host.name'] }, + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-* | WHERE error_count == 0 | STATS count(*) BY host.name' }, + }, + }), + alertEvents: [], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + // The recovery query result hashes won't match the synthetic 'hash-a'/'hash-b' strings, + // so no recovery events are produced (the hashes are sha256-based). + expect(alertEvents.every((e: AlertEvent) => e.status === 'recovered')).toBe(true); + }); + + it('preserves breached events alongside query recovery events', async () => { + const { step, internalEsClient, scopedEsClient } = createStep(); - it('halts with state_not_ready when rule is missing from state', async () => { - const { queryService } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse(['hash-1'])); - const state = createRulePipelineState({ alertEvents: [createAlertEvent()] }); + scopedEsClient.esql.query.mockResolvedValue(createEsqlResponse([], [])); - const result = await step.execute(state); + const breachedEvents = [createAlertEvent({ group_hash: 'hash-new' })]; - expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + const state = createRulePipelineState({ + rule: createRuleResponse({ + kind: 'alert', + recovery_policy: { + type: 'query', + query: { base: 'FROM logs-*' }, + }, + }), + alertEvents: breachedEvents, + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + // @ts-expect-error: the above check ensures the alertEvents exists + const { alertEvents } = result.data; + expect(alertEvents[0].status).toBe('breached'); + expect(alertEvents[0].group_hash).toBe('hash-new'); + }); }); - it('halts with state_not_ready when alertEvents is missing from state', async () => { - const { queryService } = createQueryService(); - const step = new CreateRecoveryEventsStep(loggerService, queryService); + describe('state guards', () => { + it('halts with state_not_ready when rule is missing from state', async () => { + const { step } = createStep(); - const state = createRulePipelineState({ rule: createRuleResponse() }); + const state = createRulePipelineState({ alertEvents: [createAlertEvent()] }); - const result = await step.execute(state); + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + }); - expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + it('halts with state_not_ready when alertEvents is missing from state', async () => { + const { step } = createStep(); + + const state = createRulePipelineState({ rule: createRuleResponse() }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'state_not_ready' }); + }); }); }); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts index 762d4fc238c89..29237feaf522e 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts @@ -6,17 +6,24 @@ */ import { inject, injectable } from 'inversify'; +import { stableStringify } from '@kbn/std'; import type { RuleExecutionStep, RulePipelineState, RuleStepOutput } from '../types'; -import { buildRecoveryAlertEvents } from '../build_alert_events'; +import { buildRecoveryAlertEvents, buildQueryRecoveryAlertEvents } from '../build_alert_events'; +import { getQueryPayload } from '../get_query_payload'; import { LoggerServiceToken, type LoggerServiceContract, } from '../../services/logger_service/logger_service'; -import { QueryServiceInternalToken } from '../../services/query_service/tokens'; +import { + QueryServiceInternalToken, + QueryServiceScopedToken, +} from '../../services/query_service/tokens'; import type { QueryServiceContract } from '../../services/query_service/query_service'; import { getActiveAlertGroupHashesQuery, type ActiveAlertGroupHash } from '../../director/queries'; import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; import { hasState, type StateWith } from '../type_guards'; +import type { RuleResponse } from '../../rules_client'; +import type { AlertEvent } from '../../../resources/alert_events'; @injectable() export class CreateRecoveryEventsStep implements RuleExecutionStep { @@ -24,7 +31,8 @@ export class CreateRecoveryEventsStep implements RuleExecutionStep { constructor( @inject(LoggerServiceToken) private readonly logger: LoggerServiceContract, - @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract + @inject(QueryServiceInternalToken) private readonly internalQueryService: QueryServiceContract, + @inject(QueryServiceScopedToken) private readonly scopedQueryService: QueryServiceContract ) {} private isStepReady( @@ -63,29 +71,93 @@ export class CreateRecoveryEventsStep implements RuleExecutionStep { return { type: 'continue', data: { alertEvents } }; } + const recoveryType = rule.recovery_policy?.type ?? 'no_breach'; + + const recoveryEvents = + recoveryType === 'query' + ? await this.buildQueryRecovery({ rule, input, activeGroupHashes }) + : this.buildNoBreachRecovery({ rule, input, alertEvents, activeGroupHashes }); + + this.logger.debug({ + message: `[${this.name}] Created ${recoveryEvents.length} recovery events (${recoveryType}) for rule ${input.ruleId}`, + }); + + return { + type: 'continue', + data: { alertEvents: [...alertEvents, ...recoveryEvents] }, + }; + } + + private buildNoBreachRecovery({ + rule, + input, + alertEvents, + activeGroupHashes, + }: { + rule: RuleResponse; + input: RulePipelineState['input']; + alertEvents: AlertEvent[]; + activeGroupHashes: ActiveAlertGroupHash[]; + }): AlertEvent[] { const breachedGroupHashes = new Set(alertEvents.map((event) => event.group_hash)); - const recoveryEvents = buildRecoveryAlertEvents({ - ruleId: input.ruleId, + return buildRecoveryAlertEvents({ + ruleId: rule.id, ruleVersion: 1, activeGroupHashes, breachedGroupHashes, scheduledTimestamp: input.scheduledAt, }); + } + + private async buildQueryRecovery({ + rule, + input, + activeGroupHashes, + }: { + rule: RuleResponse; + input: RulePipelineState['input']; + activeGroupHashes: ActiveAlertGroupHash[]; + }): Promise { + const effectiveQuery = rule.recovery_policy!.query!.base!.trimEnd(); + const lookbackWindow = rule.schedule.lookback ?? rule.schedule.every; + + const queryPayload = getQueryPayload({ + query: effectiveQuery, + timeField: rule.time_field, + lookbackWindow, + }); this.logger.debug({ - message: `[${this.name}] Created ${recoveryEvents.length} recovery events for rule ${input.ruleId}`, + message: () => + `[${this.name}] Executing recovery query for rule ${input.ruleId} - ${stableStringify({ + query: effectiveQuery, + filter: queryPayload.filter, + params: queryPayload.params, + })}`, }); - return { - type: 'continue', - data: { alertEvents: [...alertEvents, ...recoveryEvents] }, - }; + const esqlResponse = await this.scopedQueryService.executeQuery({ + query: effectiveQuery, + filter: queryPayload.filter, + params: queryPayload.params, + abortSignal: input.abortSignal, + }); + + return buildQueryRecoveryAlertEvents({ + ruleId: rule.id, + ruleVersion: 1, + spaceId: input.spaceId, + ruleAttributes: rule, + activeGroupHashes, + esqlResponse, + scheduledTimestamp: input.scheduledAt, + }); } private async fetchActiveAlertGroupHashes(ruleId: string): Promise { const request = getActiveAlertGroupHashesQuery({ ruleId }).toRequest(); - const response = await this.queryService.executeQuery({ + const response = await this.internalQueryService.executeQuery({ query: request.query, // @ts-expect-error - the types of the composer query are not compatible with the types of the esql client params: request.params, diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts b/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts index 4c669eae47fb0..368a53648ae1c 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts @@ -41,7 +41,6 @@ export const ruleSavedObjectAttributesSchema = schema.object({ query: schema.maybe( schema.object({ base: schema.maybe(schema.string()), - condition: schema.maybe(schema.string()), }) ), }) diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts index c9535a936a9ea..dc1fdb9d2131f 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_rule_executor.ts @@ -48,7 +48,7 @@ export const bindRuleExecutionServices = ({ bind }: ContainerModuleLoadOptions) bind(RuleExecutionStepsToken).to(ValidateRuleStep).inSingletonScope(); bind(RuleExecutionStepsToken).to(ExecuteRuleQueryStep).inRequestScope(); bind(RuleExecutionStepsToken).to(CreateAlertEventsStep).inSingletonScope(); - bind(RuleExecutionStepsToken).to(CreateRecoveryEventsStep).inSingletonScope(); + bind(RuleExecutionStepsToken).to(CreateRecoveryEventsStep).inRequestScope(); bind(RuleExecutionStepsToken).to(DirectorStep).inSingletonScope(); bind(RuleExecutionStepsToken).to(StoreAlertEventsStep).inSingletonScope(); From c54f2d4b2258be63c7511eba6d4036b32acb81b7 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Thu, 19 Feb 2026 21:38:08 +0000 Subject: [PATCH 3/6] Changes from node scripts/jest_integration -u src/core/server/integration_tests/ci_checks --- .../ci_checks/saved_objects/check_registered_types.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts index c56b8d6c3a46d..04a0f7c7503a0 100644 --- a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts +++ b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts @@ -62,7 +62,7 @@ describe('checking migration metadata changes on all registered SO types', () => "ad_hoc_run_params": "9c372f2a8f8b468e9b699a6df633c7f14fab7f13216c9ec160813e75bae56098", "alert": "119624b6025ea6794d2c33e2b41c2e4730d10446430b285691f7638ee6787af5", "alerting_notification_policy": "81fc65ed6652cd1196f83b4222f227e8c7a3f646a6e044f63a2d82f12dacbfb0", - "alerting_rule": "8f80d561d2b3caf07925be4a5fff52d8433ca3f3f204723f6f3e9e32dbce7f42", + "alerting_rule": "667979bd280662367c8a5ec23e20092c57ca8794aa13dee40b7fec0b6f278ff0", "alerting_rule_template": "a26521005d8a51af336ec95a2097c4bd073980c050e3c675cec3851acff78fd9", "api_key_pending_invalidation": "b5a0fe007bff147bbb0ef7d0393c976f777ccb470359090d79890a769baf3c68", "api_key_to_invalidate": "5add5ee737ccc61cc16bbf68423d634d1354971f20926b5ff465a2a853d1723a", @@ -303,7 +303,7 @@ describe('checking migration metadata changes on all registered SO types', () => "alerting_rule|global: e78adb1490c02adb4c705491c87e08332c0f668e", "alerting_rule|mappings: 05a17ab7488d3b86ec25c724f21a19cd7ebb9778", "alerting_rule|schemas: da39a3ee5e6b4b0d3255bfef95601890afd80709", - "alerting_rule|10.1.0: 7e2e621ff8a85dd04d0b8e374a9a8b85ce23ba968dfbacef9dfd23a003dc4afb", + "alerting_rule|10.1.0: 6a87ca25932457cde0b484b607a5252e833f577ab39fca6b92f6a740354d612d", "======================================================================================", "alerting_rule_template|global: a8ee387a4bc794ff6450017a92742b39b79e0446", "alerting_rule_template|mappings: eccf889027b5ea2d292c1bf0f9280348deaec0ef", From 3ac5a68718ab787bd7d044ea61dc111364c26e57 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Fri, 20 Feb 2026 11:07:00 +0100 Subject: [PATCH 4/6] Refactor recovery policy handling and introduce new query for active alert group hashes - Added a new `recoveryPolicyTypeSchema` to define recovery policy types in the rule data schema. - Updated the `recoveryPolicySchema` to utilize the new schema for type validation. - Moved the `getActiveAlertGroupHashesQuery` function to a new `queries.ts` file for better organization. - Adjusted imports in `build_alert_events.ts` and `create_recovery_events_step.ts` to reflect the new file structure. - Enhanced the `getActiveAlertGroupHashesQuery` to ensure it only retrieves group hashes with non-null episode statuses. --- .../src/rule_data_schema.ts | 6 ++- .../server/lib/director/queries.ts | 30 -------------- .../lib/rule_executor/build_alert_events.ts | 2 +- .../server/lib/rule_executor/queries.ts | 39 +++++++++++++++++++ .../steps/create_recovery_events_step.ts | 7 ++-- 5 files changed, 49 insertions(+), 35 deletions(-) create mode 100644 x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/queries.ts diff --git a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts index 7e573b8fdc8ed..ae15944f10af1 100644 --- a/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts +++ b/x-pack/platform/packages/shared/response-ops/alerting-v2-schemas/src/rule_data_schema.ts @@ -80,9 +80,13 @@ const evaluationSchema = z /** Recovery policy (optional) */ +export const recoveryPolicyTypeSchema = z.enum(['query', 'no_breach']); +export const recoveryPolicyType = recoveryPolicyTypeSchema.enum; +export type RecoveryPolicyType = z.infer; + const recoveryPolicySchema = z .object({ - type: z.enum(['query', 'no_breach']).describe('Recovery detection type.'), + type: recoveryPolicyTypeSchema.describe('Recovery detection type.'), query: z .object({ base: esqlQuerySchema.optional().describe('Base ES|QL query for recovery.'), diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts index 7356cc90cc290..156caf4295c0c 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/director/queries.ts @@ -9,36 +9,6 @@ import { esql, type ComposerQuery } from '@kbn/esql-language'; import type { AlertEventStatus, AlertEpisodeStatus } from '../../resources/alert_events'; import { ALERT_EVENTS_DATA_STREAM } from '../../resources/alert_events'; -interface GetActiveAlertGroupHashesQueryParams { - ruleId: string; -} - -export interface ActiveAlertGroupHash { - group_hash: string; -} - -/** - * Returns all group hashes for a rule that are currently in a non-inactive episode state - * (pending, active, or recovering). Used to detect which alerts need recovery events. - */ -export const getActiveAlertGroupHashesQuery = ({ - ruleId, -}: GetActiveAlertGroupHashesQueryParams): ComposerQuery => { - let query = esql.from(ALERT_EVENTS_DATA_STREAM); - - query = query.where`rule.id == ${{ ruleId }}`; - - query = query.pipe`STATS - last_episode_status = LAST(episode.status, @timestamp) - BY group_hash`; - - query = query.where`last_episode_status IN ("pending", "active", "recovering")`; - - query = query.keep('group_hash'); - - return query; -}; - interface GetLatestAlertEventStateQueryParams { ruleId: string; groupHashes: string[]; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts index 6d673947159ac..b910f76a6208d 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/build_alert_events.ts @@ -12,7 +12,7 @@ import { stableStringify } from '@kbn/std'; import type { EsqlQueryResponse } from '@elastic/elasticsearch/lib/api/types'; import type { RuleResponse } from '../rules_client'; import type { AlertEvent } from '../../resources/alert_events'; -import type { ActiveAlertGroupHash } from '../director/queries'; +import type { ActiveAlertGroupHash } from './queries'; function sha256(value: string) { return createHash('sha256').update(value).digest('hex'); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/queries.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/queries.ts new file mode 100644 index 0000000000000..7a4ce191a84f1 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/queries.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { esql, type ComposerQuery } from '@kbn/esql-language'; +import { ALERT_EVENTS_DATA_STREAM } from '../../resources/alert_events'; + +interface GetActiveAlertGroupHashesQueryParams { + ruleId: string; +} + +export interface ActiveAlertGroupHash { + group_hash: string; +} + +/** + * Returns all group hashes for a rule that are currently in a non-inactive episode state + * (pending, active, or recovering). Used to detect which alerts need recovery events. + */ +export const getActiveAlertGroupHashesQuery = ({ + ruleId, +}: GetActiveAlertGroupHashesQueryParams): ComposerQuery => { + let query = esql.from(ALERT_EVENTS_DATA_STREAM); + + query = query.where`rule.id == ${{ ruleId }} AND episode.status IS NOT NULL`; + + query = query.pipe`STATS + last_episode_status = LAST(episode.status, @timestamp) + BY group_hash`; + + query = query.where`last_episode_status IN ("pending", "active", "recovering")`; + + query = query.keep('group_hash'); + + return query; +}; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts index 29237feaf522e..8671eead6d29f 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts @@ -7,6 +7,7 @@ import { inject, injectable } from 'inversify'; import { stableStringify } from '@kbn/std'; +import { recoveryPolicyType } from '@kbn/alerting-v2-schemas'; import type { RuleExecutionStep, RulePipelineState, RuleStepOutput } from '../types'; import { buildRecoveryAlertEvents, buildQueryRecoveryAlertEvents } from '../build_alert_events'; import { getQueryPayload } from '../get_query_payload'; @@ -19,7 +20,7 @@ import { QueryServiceScopedToken, } from '../../services/query_service/tokens'; import type { QueryServiceContract } from '../../services/query_service/query_service'; -import { getActiveAlertGroupHashesQuery, type ActiveAlertGroupHash } from '../../director/queries'; +import { getActiveAlertGroupHashesQuery, type ActiveAlertGroupHash } from '../queries'; import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; import { hasState, type StateWith } from '../type_guards'; import type { RuleResponse } from '../../rules_client'; @@ -71,10 +72,10 @@ export class CreateRecoveryEventsStep implements RuleExecutionStep { return { type: 'continue', data: { alertEvents } }; } - const recoveryType = rule.recovery_policy?.type ?? 'no_breach'; + const recoveryType = rule.recovery_policy?.type ?? recoveryPolicyType.no_breach; const recoveryEvents = - recoveryType === 'query' + recoveryType === recoveryPolicyType.query ? await this.buildQueryRecovery({ rule, input, activeGroupHashes }) : this.buildNoBreachRecovery({ rule, input, alertEvents, activeGroupHashes }); From 862f15598efc078a6bcb0b8af234419cf5222e52 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Fri, 20 Feb 2026 11:33:26 +0100 Subject: [PATCH 5/6] fix so step --- .../saved_objects/schemas/rule_saved_object_attributes/v1.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts b/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts index 368a53648ae1c..4c669eae47fb0 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/saved_objects/schemas/rule_saved_object_attributes/v1.ts @@ -41,6 +41,7 @@ export const ruleSavedObjectAttributesSchema = schema.object({ query: schema.maybe( schema.object({ base: schema.maybe(schema.string()), + condition: schema.maybe(schema.string()), }) ), }) From ab537dfd00f3b0c04e1fe5bccc3c2ef96b22773b Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Fri, 20 Feb 2026 10:46:41 +0000 Subject: [PATCH 6/6] Changes from node scripts/jest_integration -u src/core/server/integration_tests/ci_checks --- .../ci_checks/saved_objects/check_registered_types.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts index 04a0f7c7503a0..c56b8d6c3a46d 100644 --- a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts +++ b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts @@ -62,7 +62,7 @@ describe('checking migration metadata changes on all registered SO types', () => "ad_hoc_run_params": "9c372f2a8f8b468e9b699a6df633c7f14fab7f13216c9ec160813e75bae56098", "alert": "119624b6025ea6794d2c33e2b41c2e4730d10446430b285691f7638ee6787af5", "alerting_notification_policy": "81fc65ed6652cd1196f83b4222f227e8c7a3f646a6e044f63a2d82f12dacbfb0", - "alerting_rule": "667979bd280662367c8a5ec23e20092c57ca8794aa13dee40b7fec0b6f278ff0", + "alerting_rule": "8f80d561d2b3caf07925be4a5fff52d8433ca3f3f204723f6f3e9e32dbce7f42", "alerting_rule_template": "a26521005d8a51af336ec95a2097c4bd073980c050e3c675cec3851acff78fd9", "api_key_pending_invalidation": "b5a0fe007bff147bbb0ef7d0393c976f777ccb470359090d79890a769baf3c68", "api_key_to_invalidate": "5add5ee737ccc61cc16bbf68423d634d1354971f20926b5ff465a2a853d1723a", @@ -303,7 +303,7 @@ describe('checking migration metadata changes on all registered SO types', () => "alerting_rule|global: e78adb1490c02adb4c705491c87e08332c0f668e", "alerting_rule|mappings: 05a17ab7488d3b86ec25c724f21a19cd7ebb9778", "alerting_rule|schemas: da39a3ee5e6b4b0d3255bfef95601890afd80709", - "alerting_rule|10.1.0: 6a87ca25932457cde0b484b607a5252e833f577ab39fca6b92f6a740354d612d", + "alerting_rule|10.1.0: 7e2e621ff8a85dd04d0b8e374a9a8b85ce23ba968dfbacef9dfd23a003dc4afb", "======================================================================================", "alerting_rule_template|global: a8ee387a4bc794ff6450017a92742b39b79e0446", "alerting_rule_template|mappings: eccf889027b5ea2d292c1bf0f9280348deaec0ef",