diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/index.ts b/x-pack/platform/plugins/shared/alerting_v2/server/index.ts index 59846e4d58065..163561a00c039 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/index.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/index.ts @@ -14,6 +14,7 @@ import { bindOnStart } from './setup/bind_on_start'; import { bindRoutes } from './setup/bind_routes'; import { bindServices } from './setup/bind_services'; import { bindRuleExecutionServices } from './setup/bind_rule_executor'; +import { bindDispatcherExecutionServices } from './setup/bind_dispatcher_executor'; import { bindTasks } from './setup/bind_tasks'; export const config: PluginConfigDescriptor = { @@ -31,6 +32,7 @@ export const module = new ContainerModule((options) => { bindRoutes(options); bindServices(options); bindRuleExecutionServices(options); + bindDispatcherExecutionServices(options); bindTasks(options); }); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.test.ts index 161cf045e4023..aafaec0322a58 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.test.ts @@ -7,35 +7,134 @@ import type { BulkResponse } from '@elastic/elasticsearch/lib/api/types'; import type { DeeplyMockedApi } from '@kbn/core-elasticsearch-client-server-mocks'; -import type { ElasticsearchClient } from '@kbn/core/server'; +import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server'; import moment from 'moment'; import { ALERT_ACTIONS_DATA_STREAM, type AlertAction } from '../../resources/alert_actions'; +import { RULE_SAVED_OBJECT_TYPE, NOTIFICATION_POLICY_SAVED_OBJECT_TYPE } from '../../saved_objects'; +import type { RuleSavedObjectAttributes } from '../../saved_objects'; +import { createRuleSoAttributes } from '../test_utils'; import { createLoggerService } from '../services/logger_service/logger_service.mock'; +import type { NotificationPolicySavedObjectServiceContract } from '../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import { createNotificationPolicySavedObjectService } from '../services/notification_policy_saved_object_service/notification_policy_saved_object_service.mock'; import type { QueryServiceContract } from '../services/query_service/query_service'; import { createQueryService } from '../services/query_service/query_service.mock'; +import type { RulesSavedObjectServiceContract } from '../services/rules_saved_object_service/rules_saved_object_service'; +import { createRulesSavedObjectService } from '../services/rules_saved_object_service/rules_saved_object_service.mock'; import type { StorageServiceContract } from '../services/storage_service/storage_service'; import { createStorageService } from '../services/storage_service/storage_service.mock'; import { LOOKBACK_WINDOW_MINUTES } from './constants'; import { DispatcherService } from './dispatcher'; +import { DispatcherPipeline } from './execution_pipeline'; import { createAlertEpisodeSuppressionsResponse, createDispatchableAlertEventsResponse, + createLastNotifiedTimestampsResponse, } from './fixtures/dispatcher'; import { getDispatchableAlertEventsQuery } from './queries'; +import { + FetchEpisodesStep, + FetchSuppressionsStep, + ApplySuppressionStep, + FetchRulesStep, + FetchPoliciesStep, + EvaluateMatchersStep, + BuildGroupsStep, + ApplyThrottlingStep, + DispatchStep, + StoreActionsStep, +} from './steps'; import type { AlertEpisode, AlertEpisodeSuppression } from './types'; +function mockRulesBulkGet( + mockSoClient: jest.Mocked, + ruleIds: string[], + overrides?: Partial +) { + mockSoClient.bulkGet.mockResolvedValue({ + saved_objects: ruleIds.map((id) => ({ + id, + type: RULE_SAVED_OBJECT_TYPE, + attributes: createRuleSoAttributes({ + notification_policies: [{ ref: 'policy_456' }], + ...overrides, + }), + references: [], + })), + }); +} + +function mockNpBulkGet(mockSoClient: jest.Mocked, policyIds: string[]) { + mockSoClient.bulkGet.mockResolvedValue({ + saved_objects: policyIds.map((id) => ({ + id, + type: NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, + attributes: { + name: `Policy ${id}`, + description: `Description for ${id}`, + destinations: [{ type: 'workflow', id: 'workflow-test-id' }], + createdBy: null, + updatedBy: null, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + }, + references: [], + })), + }); +} + +function buildDispatcherService(deps: { + queryService: QueryServiceContract; + storageService: StorageServiceContract; + rulesSoService: RulesSavedObjectServiceContract; + npSoService: NotificationPolicySavedObjectServiceContract; +}): DispatcherService { + const { loggerService } = createLoggerService(); + const pipeline = new DispatcherPipeline(loggerService, [ + new FetchEpisodesStep(deps.queryService), + new FetchSuppressionsStep(deps.queryService), + new ApplySuppressionStep(), + new FetchRulesStep(deps.rulesSoService), + new FetchPoliciesStep(deps.npSoService), + new EvaluateMatchersStep(), + new BuildGroupsStep(), + new ApplyThrottlingStep(deps.queryService, loggerService), + new DispatchStep(loggerService), + new StoreActionsStep(deps.storageService), + ]); + return new DispatcherService(pipeline); +} + describe('DispatcherService', () => { let dispatcherService: DispatcherService; let queryService: QueryServiceContract; let storageService: StorageServiceContract; let queryEsClient: DeeplyMockedApi; let storageEsClient: jest.Mocked; + let rulesSoService: RulesSavedObjectServiceContract; + let npSoService: NotificationPolicySavedObjectServiceContract; + let rulesMockSoClient: jest.Mocked; + let npMockSoClient: jest.Mocked; beforeEach(() => { ({ queryService, mockEsClient: queryEsClient } = createQueryService()); ({ storageService, mockEsClient: storageEsClient } = createStorageService()); - const { loggerService } = createLoggerService(); - dispatcherService = new DispatcherService(queryService, loggerService, storageService); + + const rulesMock = createRulesSavedObjectService(); + rulesSoService = rulesMock.rulesSavedObjectService; + rulesMockSoClient = rulesMock.mockSavedObjectsClient; + mockRulesBulkGet(rulesMockSoClient, ['rule-1', 'rule-2']); + + const npMock = createNotificationPolicySavedObjectService(); + npSoService = npMock.notificationPolicySavedObjectService; + npMockSoClient = npMock.mockSavedObjectsClient; + mockNpBulkGet(npMockSoClient, ['policy_456']); + + dispatcherService = buildDispatcherService({ + queryService, + storageService, + rulesSoService, + npSoService, + }); }); afterEach(() => { @@ -78,7 +177,8 @@ describe('DispatcherService', () => { queryEsClient.esql.query .mockResolvedValueOnce(createDispatchableAlertEventsResponse(alertEpisodes)) - .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)); + .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)) + .mockResolvedValueOnce(createLastNotifiedTimestampsResponse()); storageEsClient.bulk.mockResolvedValue({ items: [{ create: { _id: '1', status: 201 } }, { create: { _id: '2', status: 201 } }], @@ -97,7 +197,7 @@ describe('DispatcherService', () => { .subtract(LOOKBACK_WINDOW_MINUTES, 'minutes') .toISOString(); - expect(queryEsClient.esql.query).toHaveBeenCalledTimes(2); + expect(queryEsClient.esql.query).toHaveBeenCalledTimes(3); expect(queryEsClient.esql.query).toHaveBeenCalledWith( { query: getDispatchableAlertEventsQuery().query, @@ -126,7 +226,11 @@ describe('DispatcherService', () => { expect(createOperations).toEqual( expect.arrayContaining([{ create: { _index: ALERT_ACTIONS_DATA_STREAM } }]) ); - expect(docs).toHaveLength(alertEpisodes.length); + + const fireActions = docs.filter((d: any) => d.action_type === 'fire'); + const notifiedActions = docs.filter((d: any) => d.action_type === 'notified'); + expect(fireActions).toHaveLength(alertEpisodes.length); + expect(notifiedActions.length).toBeGreaterThan(0); expect(docs).toEqual( expect.arrayContaining([ @@ -185,7 +289,8 @@ describe('DispatcherService', () => { queryEsClient.esql.query .mockResolvedValueOnce(createDispatchableAlertEventsResponse(alertEpisodes)) - .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)); + .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)) + .mockResolvedValueOnce(createLastNotifiedTimestampsResponse()); storageEsClient.bulk.mockResolvedValue({ items: [{ create: { _id: '1', status: 201 } }, { create: { _id: '2', status: 201 } }], @@ -201,7 +306,11 @@ describe('DispatcherService', () => { const [{ operations }] = storageEsClient.bulk.mock.calls[0]; const safeOperations = operations ?? []; const docs = safeOperations.filter((_, index) => index % 2 === 1); - expect(docs).toHaveLength(2); + + const suppressDocs = docs.filter((d: any) => d.action_type === 'suppress'); + const fireDocs = docs.filter((d: any) => d.action_type === 'fire'); + expect(suppressDocs).toHaveLength(1); + expect(fireDocs).toHaveLength(1); expect(docs).toEqual( expect.arrayContaining([ @@ -238,6 +347,29 @@ describe('DispatcherService', () => { }); it('dispatches correct fire/suppress actions across 5 rules with ack, unack, snooze, and deactivate suppressions', async () => { + const rulesMock = createRulesSavedObjectService(); + rulesSoService = rulesMock.rulesSavedObjectService; + rulesMockSoClient = rulesMock.mockSavedObjectsClient; + mockRulesBulkGet(rulesMockSoClient, [ + 'rule-001', + 'rule-002', + 'rule-003', + 'rule-004', + 'rule-005', + ]); + + const npMock = createNotificationPolicySavedObjectService(); + npSoService = npMock.notificationPolicySavedObjectService; + npMockSoClient = npMock.mockSavedObjectsClient; + mockNpBulkGet(npMockSoClient, ['policy_456']); + + dispatcherService = buildDispatcherService({ + queryService, + storageService, + rulesSoService, + npSoService, + }); + // Dataset: 5 rules, 9 episodes total // rule-001: single series, ack then unack → fire // rule-002: single series, ack with no unack → suppress @@ -359,7 +491,8 @@ describe('DispatcherService', () => { queryEsClient.esql.query .mockResolvedValueOnce(createDispatchableAlertEventsResponse(alertEpisodes)) - .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)); + .mockResolvedValueOnce(createAlertEpisodeSuppressionsResponse(suppressions)) + .mockResolvedValueOnce(createLastNotifiedTimestampsResponse()); storageEsClient.bulk.mockResolvedValue({ items: Array.from({ length: 10 }, (_, i) => ({ @@ -373,17 +506,18 @@ describe('DispatcherService', () => { }); expect(result.startedAt).toBeInstanceOf(Date); - expect(queryEsClient.esql.query).toHaveBeenCalledTimes(2); + expect(queryEsClient.esql.query).toHaveBeenCalledTimes(3); const [{ operations }] = storageEsClient.bulk.mock.calls[0]; const docs = (operations ?? []).filter((_, index) => index % 2 === 1) as AlertAction[]; - expect(docs).toHaveLength(10); const fireActions = docs.filter((doc) => doc.action_type === 'fire'); const suppressActions = docs.filter((doc) => doc.action_type === 'suppress'); + const notifiedActions = docs.filter((doc) => doc.action_type === 'notified'); expect(fireActions).toHaveLength(6); expect(suppressActions).toHaveLength(4); + expect(notifiedActions.length).toBeGreaterThan(0); // rule-001: fire (ack then unack cancels suppression) expect(docs).toEqual( diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.ts index 995a4a9711217..16801ba4207d8 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/dispatcher.ts @@ -6,26 +6,8 @@ */ import { inject, injectable } from 'inversify'; -import moment from 'moment'; -import { ALERT_ACTIONS_DATA_STREAM, type AlertAction } from '../../resources/alert_actions'; -import { - LoggerServiceToken, - type LoggerServiceContract, -} from '../services/logger_service/logger_service'; -import { queryResponseToRecords } from '../services/query_service/query_response_to_records'; -import type { QueryServiceContract } from '../services/query_service/query_service'; -import { QueryServiceInternalToken } from '../services/query_service/tokens'; -import type { StorageServiceContract } from '../services/storage_service/storage_service'; -import { StorageServiceInternalToken } from '../services/storage_service/tokens'; -import { LOOKBACK_WINDOW_MINUTES } from './constants'; -import { getAlertEpisodeSuppressionsQuery, getDispatchableAlertEventsQuery } from './queries'; -import type { - AlertEpisode, - AlertEpisodeSuppression, - DispatcherExecutionParams, - DispatcherExecutionResult, -} from './types'; -import { withDispatcherSpan } from './with_dispatcher_span'; +import { DispatcherPipeline, type DispatcherPipelineContract } from './execution_pipeline'; +import type { DispatcherExecutionParams, DispatcherExecutionResult } from './types'; export interface DispatcherServiceContract { run(params: DispatcherExecutionParams): Promise; @@ -33,128 +15,15 @@ export interface DispatcherServiceContract { @injectable() export class DispatcherService implements DispatcherServiceContract { - constructor( - @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract, - @inject(LoggerServiceToken) private readonly logger: LoggerServiceContract, - @inject(StorageServiceInternalToken) private readonly storageService: StorageServiceContract - ) {} + constructor(@inject(DispatcherPipeline) private readonly pipeline: DispatcherPipelineContract) {} public async run({ previousStartedAt = new Date(), }: DispatcherExecutionParams): Promise { const startedAt = new Date(); - const alertEpisodes = await withDispatcherSpan('dispatcher:fetch-alert-episodes', () => - this.fetchAlertEpisodes(previousStartedAt) - ); - const suppressions = await withDispatcherSpan('dispatcher:fetch-suppressions', () => - this.fetchAlertEpisodeSuppressions(alertEpisodes) - ); - - const { suppressed, active } = this.applySuppression(alertEpisodes, suppressions); - - this.logger.debug({ - message: `Dispatcher processed ${alertEpisodes.length} alert episodes: ${suppressed.length} suppressed, ${active.length} not suppressed`, - }); - - const now = new Date(); - await withDispatcherSpan('dispatcher:bulk-index-actions', () => - this.storageService.bulkIndexDocs({ - index: ALERT_ACTIONS_DATA_STREAM, - docs: [ - ...suppressed.map((episode) => this.toAction({ episode, actionType: 'suppress', now })), - ...active.map((episode) => this.toAction({ episode, actionType: 'fire', now })), - ], - }) - ); + await this.pipeline.execute({ startedAt, previousStartedAt }); return { startedAt }; } - - private applySuppression( - episodes: AlertEpisode[], - suppressions: AlertEpisodeSuppression[] - ): { suppressed: AlertEpisode[]; active: AlertEpisode[] } { - const suppressionMap = new Map(); - - for (const s of suppressions) { - if (s.episode_id) { - suppressionMap.set(`${s.rule_id}:${s.group_hash}:${s.episode_id}`, s); - } else { - suppressionMap.set(`${s.rule_id}:${s.group_hash}:*`, s); - } - } - - const suppressed: AlertEpisode[] = []; - const active: AlertEpisode[] = []; - - for (const ep of episodes) { - const episodeKey = `${ep.rule_id}:${ep.group_hash}:${ep.episode_id}`; - const seriesKey = `${ep.rule_id}:${ep.group_hash}:*`; - - const episodeSuppression = suppressionMap.get(episodeKey); - const seriesSuppression = suppressionMap.get(seriesKey); - - if (episodeSuppression?.should_suppress || seriesSuppression?.should_suppress) { - suppressed.push(ep); - } else { - active.push(ep); - } - } - - return { suppressed, active }; - } - - private toAction({ - episode, - actionType, - now, - }: { - episode: AlertEpisode; - actionType: 'suppress' | 'fire'; - now: Date; - }): AlertAction { - return { - '@timestamp': now.toISOString(), - group_hash: episode.group_hash, - last_series_event_timestamp: episode.last_event_timestamp, - actor: 'system', - action_type: actionType, - rule_id: episode.rule_id, - source: 'internal', - }; - } - - private async fetchAlertEpisodeSuppressions( - alertEpisodes: AlertEpisode[] - ): Promise { - if (alertEpisodes.length === 0) { - return []; - } - - const result = await this.queryService.executeQuery({ - query: getAlertEpisodeSuppressionsQuery(alertEpisodes).query, - }); - - return queryResponseToRecords(result); - } - - private async fetchAlertEpisodes(previousStartedAt: Date): Promise { - const lookback = moment(previousStartedAt) - .subtract(LOOKBACK_WINDOW_MINUTES, 'minutes') - .toISOString(); - - const result = await this.queryService.executeQuery({ - query: getDispatchableAlertEventsQuery().query, - filter: { - range: { - '@timestamp': { - gte: lookback, - }, - }, - }, - }); - - return queryResponseToRecords(result); - } } diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.test.ts new file mode 100644 index 0000000000000..b93713a0eb6ac --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.test.ts @@ -0,0 +1,146 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { DispatcherPipeline } from './execution_pipeline'; +import type { DispatcherPipelineState } from './types'; +import { createLoggerService } from '../services/logger_service/logger_service.mock'; +import { createDispatcherPipelineInput, createMockDispatcherStep } from './fixtures/test_utils'; + +jest.mock('./with_dispatcher_span', () => ({ + withDispatcherSpan: (_name: string, cb: () => Promise) => cb(), +})); + +describe('DispatcherPipeline', () => { + describe('execute', () => { + it('executes all steps in order when all continue', async () => { + const { loggerService } = createLoggerService(); + const executionOrder: string[] = []; + + const step1 = createMockDispatcherStep('step1', async () => { + executionOrder.push('step1'); + return { type: 'continue' }; + }); + + const step2 = createMockDispatcherStep('step2', async () => { + executionOrder.push('step2'); + return { type: 'continue' }; + }); + + const step3 = createMockDispatcherStep('step3', async () => { + executionOrder.push('step3'); + return { type: 'continue' }; + }); + + const pipeline = new DispatcherPipeline(loggerService, [step1, step2, step3]); + const input = createDispatcherPipelineInput(); + + const result = await pipeline.execute(input); + + expect(result.completed).toBe(true); + expect(result.haltReason).toBeUndefined(); + expect(executionOrder).toEqual(['step1', 'step2', 'step3']); + }); + + it('stops execution when a step returns halt', async () => { + const { loggerService } = createLoggerService(); + const executionOrder: string[] = []; + + const step1 = createMockDispatcherStep('step1', async () => { + executionOrder.push('step1'); + return { type: 'continue' }; + }); + + const step2 = createMockDispatcherStep('step2', async () => { + executionOrder.push('step2'); + return { type: 'halt', reason: 'no_episodes' }; + }); + + const step3 = createMockDispatcherStep('step3', async () => { + executionOrder.push('step3'); + return { type: 'continue' }; + }); + + const pipeline = new DispatcherPipeline(loggerService, [step1, step2, step3]); + const input = createDispatcherPipelineInput(); + + const result = await pipeline.execute(input); + + expect(result.completed).toBe(false); + expect(result.haltReason).toBe('no_episodes'); + expect(executionOrder).toEqual(['step1', 'step2']); + expect(step3.execute).not.toHaveBeenCalled(); + }); + + it('accumulates state across steps correctly', async () => { + const { loggerService } = createLoggerService(); + const statesReceived: DispatcherPipelineState[] = []; + + const step1 = createMockDispatcherStep('step1', async (state) => { + statesReceived.push({ ...state }); + return { type: 'continue', data: { episodes: [] } }; + }); + + const step2 = createMockDispatcherStep('step2', async (state) => { + statesReceived.push({ ...state }); + return { type: 'continue', data: { dispatchable: [], suppressed: [] } }; + }); + + const step3 = createMockDispatcherStep('step3', async (state) => { + statesReceived.push({ ...state }); + return { type: 'continue' }; + }); + + const pipeline = new DispatcherPipeline(loggerService, [step1, step2, step3]); + const input = createDispatcherPipelineInput(); + + const result = await pipeline.execute(input); + + expect(statesReceived[0]).toEqual({ input }); + expect(statesReceived[0].episodes).toBeUndefined(); + + expect(statesReceived[1].input).toEqual(input); + expect(statesReceived[1].episodes).toBeDefined(); + expect(statesReceived[1].dispatchable).toBeUndefined(); + + expect(statesReceived[2].input).toEqual(input); + expect(statesReceived[2].episodes).toBeDefined(); + expect(statesReceived[2].dispatchable).toBeDefined(); + + expect(result.finalState.episodes).toBeDefined(); + expect(result.finalState.dispatchable).toBeDefined(); + }); + + it('propagates errors from steps', async () => { + const { loggerService } = createLoggerService(); + + const step1 = createMockDispatcherStep('step1', async () => { + throw new Error('Step failed'); + }); + + const step2 = createMockDispatcherStep('step2', async () => { + return { type: 'continue' }; + }); + + const pipeline = new DispatcherPipeline(loggerService, [step1, step2]); + const input = createDispatcherPipelineInput(); + + await expect(pipeline.execute(input)).rejects.toThrow('Step failed'); + expect(step2.execute).not.toHaveBeenCalled(); + }); + + it('returns completed result when no steps', async () => { + const { loggerService } = createLoggerService(); + const pipeline = new DispatcherPipeline(loggerService, []); + const input = createDispatcherPipelineInput(); + + const result = await pipeline.execute(input); + + expect(result.completed).toBe(true); + expect(result.finalState).toEqual({ input }); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.ts new file mode 100644 index 0000000000000..f3ccb6e62e017 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/execution_pipeline.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable, multiInject } from 'inversify'; +import { + LoggerServiceToken, + type LoggerServiceContract, +} from '../services/logger_service/logger_service'; +import type { + DispatcherHaltReason, + DispatcherPipelineInput, + DispatcherPipelineState, + DispatcherStep, +} from './types'; +import { DispatcherExecutionStepsToken } from './steps/tokens'; +import { withDispatcherSpan } from './with_dispatcher_span'; + +export interface DispatcherPipelineResult { + readonly completed: boolean; + readonly haltReason?: DispatcherHaltReason; + readonly finalState: DispatcherPipelineState; +} + +export interface DispatcherPipelineContract { + execute(input: DispatcherPipelineInput): Promise; +} + +@injectable() +export class DispatcherPipeline implements DispatcherPipelineContract { + constructor( + @inject(LoggerServiceToken) private readonly logger: LoggerServiceContract, + @multiInject(DispatcherExecutionStepsToken) private readonly steps: DispatcherStep[] + ) {} + + public async execute(input: DispatcherPipelineInput): Promise { + let pipelineState: DispatcherPipelineState = { input }; + + for (const step of this.steps) { + this.logger.debug({ message: `Dispatcher: Executing step: ${step.name}` }); + + const output = await withDispatcherSpan(step.name, () => step.execute(pipelineState)); + + if (output.type === 'halt') { + this.logger.debug({ + message: `Dispatcher: Pipeline halted at step: ${step.name}, reason: ${output.reason}`, + }); + + return { + completed: false, + haltReason: output.reason, + finalState: pipelineState, + }; + } + + if (output.data) { + pipelineState = { ...pipelineState, ...output.data }; + } + } + + return { + completed: true, + finalState: pipelineState, + }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/dispatcher.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/dispatcher.ts index 23beae55c33d9..fe3f5b9d3f7c4 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/dispatcher.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/dispatcher.ts @@ -6,7 +6,7 @@ */ import type { EsqlQueryResponse } from '@elastic/elasticsearch/lib/api/types'; -import type { AlertEpisode, AlertEpisodeSuppression } from '../types'; +import type { AlertEpisode, AlertEpisodeSuppression, LastNotifiedRecord } from '../types'; export const createDispatchableAlertEventsResponse = ( alertEpisodes: AlertEpisode[] @@ -47,3 +47,15 @@ export const createAlertEpisodeSuppressionsResponse = ( ]), }; }; + +export const createLastNotifiedTimestampsResponse = ( + records: LastNotifiedRecord[] = [] +): EsqlQueryResponse => { + return { + columns: [ + { name: 'notification_group_id', type: 'keyword' }, + { name: 'last_notified', type: 'date' }, + ], + values: records.map((r) => [r.notification_group_id, r.last_notified]), + }; +}; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/test_utils.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/test_utils.ts new file mode 100644 index 0000000000000..f9d40d437a0c8 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/fixtures/test_utils.ts @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { + AlertEpisode, + AlertEpisodeSuppression, + DispatcherPipelineInput, + DispatcherPipelineState, + DispatcherStep, + DispatcherStepOutput, + MatchedPair, + NotificationGroup, + NotificationPolicy, + Rule, +} from '../types'; + +export function createDispatcherPipelineInput( + overrides: Partial = {} +): DispatcherPipelineInput { + return { + startedAt: new Date('2026-01-22T08:00:00.000Z'), + previousStartedAt: new Date('2026-01-22T07:30:00.000Z'), + ...overrides, + }; +} + +export function createDispatcherPipelineState( + state?: Partial +): DispatcherPipelineState { + return { + input: createDispatcherPipelineInput(), + ...state, + }; +} + +export function createAlertEpisode(overrides: Partial = {}): AlertEpisode { + return { + last_event_timestamp: '2026-01-22T07:10:00.000Z', + rule_id: 'rule-1', + group_hash: 'hash-1', + episode_id: 'episode-1', + episode_status: 'active', + ...overrides, + }; +} + +export function createAlertEpisodeSuppression( + overrides: Partial = {} +): AlertEpisodeSuppression { + return { + rule_id: 'rule-1', + group_hash: 'hash-1', + episode_id: 'episode-1', + should_suppress: false, + ...overrides, + }; +} + +export function createRule(overrides: Partial = {}): Rule { + return { + id: 'rule-1', + name: 'Test rule', + description: '', + notificationPolicyIds: ['policy-1'], + enabled: true, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + ...overrides, + }; +} + +export function createNotificationPolicy( + overrides: Partial = {} +): NotificationPolicy { + return { + id: 'policy-1', + name: 'Test policy', + destinations: [{ type: 'workflow' as const, id: 'workflow-1' }], + groupBy: [], + ...overrides, + }; +} + +export function createMatchedPair(overrides: Partial = {}): MatchedPair { + return { + episode: createAlertEpisode(), + policy: createNotificationPolicy(), + ...overrides, + }; +} + +export function createNotificationGroup( + overrides: Partial = {} +): NotificationGroup { + return { + id: 'group-1', + ruleId: 'rule-1', + policyId: 'policy-1', + destinations: [{ type: 'workflow' as const, id: 'workflow-1' }], + groupKey: {}, + episodes: [createAlertEpisode()], + ...overrides, + }; +} + +export function createMockDispatcherStep( + name: string, + executeFn: (state: Readonly) => Promise +): DispatcherStep { + return { + name, + execute: jest.fn(executeFn), + }; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/integration_tests/dispatcher.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/integration_tests/dispatcher.test.ts index b8171fe763dbd..0e1ad1f757310 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/integration_tests/dispatcher.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/integration_tests/dispatcher.test.ts @@ -19,7 +19,29 @@ import { StorageService, type StorageServiceContract, } from '../../services/storage_service/storage_service'; +import type { SpacesPluginStart } from '@kbn/spaces-plugin/server'; +import { NotificationPolicySavedObjectService } from '../../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import type { NotificationPolicySavedObjectServiceContract } from '../../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import { RulesSavedObjectService } from '../../services/rules_saved_object_service/rules_saved_object_service'; +import type { RulesSavedObjectServiceContract } from '../../services/rules_saved_object_service/rules_saved_object_service'; +import type { + RuleSavedObjectAttributes, + NotificationPolicySavedObjectAttributes, +} from '../../../saved_objects'; import { DispatcherService, type DispatcherServiceContract } from '../dispatcher'; +import { DispatcherPipeline } from '../execution_pipeline'; +import { + FetchEpisodesStep, + FetchSuppressionsStep, + ApplySuppressionStep, + FetchRulesStep, + FetchPoliciesStep, + EvaluateMatchersStep, + BuildGroupsStep, + ApplyThrottlingStep, + DispatchStep, + StoreActionsStep, +} from '../steps'; import { waitForDataStreamsReady } from './helpers/wait'; import { setupTestServers } from './setup_test_servers'; @@ -319,6 +341,8 @@ describe('DispatcherService integration tests', () => { let queryService: QueryServiceContract; let storageService: StorageServiceContract; let mockLoggerService: LoggerServiceContract; + let rulesSoService: RulesSavedObjectServiceContract; + let npSoService: NotificationPolicySavedObjectServiceContract; beforeAll(async () => { const servers = await setupTestServers(); @@ -326,7 +350,18 @@ describe('DispatcherService integration tests', () => { kibanaServer = servers.kibanaServer; esClient = kibanaServer.coreStart.elasticsearch.client.asInternalUser; + rulesSoService = new RulesSavedObjectService( + (opts) => kibanaServer.coreStart.savedObjects.getUnsafeInternalClient(opts), + undefined as unknown as SpacesPluginStart + ); + npSoService = new NotificationPolicySavedObjectService( + (opts) => kibanaServer.coreStart.savedObjects.getUnsafeInternalClient(opts), + undefined as unknown as SpacesPluginStart + ); + await waitForDataStreamsReady(esClient, [ALERT_EVENTS_DATA_STREAM, ALERT_ACTIONS_DATA_STREAM]); + + await seedRulesAndPolicies(rulesSoService, npSoService); }); afterAll(async () => { @@ -345,7 +380,20 @@ describe('DispatcherService integration tests', () => { queryService = new QueryService(esClient, mockLoggerService); storageService = new StorageService(esClient, mockLoggerService); - dispatcherService = new DispatcherService(queryService, mockLoggerService, storageService); + + const pipeline = new DispatcherPipeline(mockLoggerService, [ + new FetchEpisodesStep(queryService), + new FetchSuppressionsStep(queryService), + new ApplySuppressionStep(), + new FetchRulesStep(rulesSoService), + new FetchPoliciesStep(npSoService), + new EvaluateMatchersStep(), + new BuildGroupsStep(), + new ApplyThrottlingStep(queryService, mockLoggerService), + new DispatchStep(mockLoggerService), + new StoreActionsStep(storageService), + ]); + dispatcherService = new DispatcherService(pipeline); }); describe('when there are no alert events', () => { @@ -379,7 +427,7 @@ describe('DispatcherService integration tests', () => { const actionsResponse = await esClient.search({ index: ALERT_ACTIONS_DATA_STREAM, - query: { match_all: {} }, + query: { term: { action_type: 'fire' } }, size: 100, }); @@ -604,10 +652,48 @@ async function seedAlertEvents(esClient: ElasticsearchClient, events: AlertEvent await esClient.bulk({ operations, - refresh: 'wait_for', + refresh: true, }); } +const NOTIFICATION_POLICY_ID = 'np-1'; + +const TEST_RULE_IDS = ['rule-1', 'rule-001', 'rule-002', 'rule-003', 'rule-004', 'rule-005']; + +async function seedRulesAndPolicies( + rulesSoService: RulesSavedObjectServiceContract, + npSoService: NotificationPolicySavedObjectServiceContract +): Promise { + const policyAttrs: NotificationPolicySavedObjectAttributes = { + name: 'Test Policy', + description: 'Test notification policy', + destinations: [{ type: 'workflow' as const, id: 'test-workflow' }], + createdBy: null, + updatedBy: null, + createdAt: '2026-01-20T00:00:00.000Z', + updatedAt: '2026-01-20T00:00:00.000Z', + }; + await npSoService.create({ attrs: policyAttrs, id: NOTIFICATION_POLICY_ID }); + + const ruleAttrs: RuleSavedObjectAttributes = { + kind: 'alert', + metadata: { name: 'Test Rule' }, + time_field: '@timestamp', + schedule: { every: '5m' }, + evaluation: { query: { base: 'FROM test' } }, + notification_policies: [{ ref: NOTIFICATION_POLICY_ID }], + enabled: true, + createdBy: null, + updatedBy: null, + updatedAt: '2026-01-20T00:00:00.000Z', + createdAt: '2026-01-20T00:00:00.000Z', + }; + + await Promise.all( + TEST_RULE_IDS.map((ruleId) => rulesSoService.create({ attrs: ruleAttrs, id: ruleId })) + ); +} + async function seedAlertActions( esClient: ElasticsearchClient, actions: AlertAction[] @@ -619,6 +705,6 @@ async function seedAlertActions( await esClient.bulk({ operations, - refresh: 'wait_for', + refresh: true, }); } diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.test.ts new file mode 100644 index 0000000000000..2a00d29c4e704 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.test.ts @@ -0,0 +1,187 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + getDispatchableAlertEventsQuery, + getAlertEpisodeSuppressionsQuery, + getLastNotifiedTimestampsQuery, +} from './queries'; +import { createAlertEpisode } from './fixtures/test_utils'; + +describe('getDispatchableAlertEventsQuery', () => { + it('returns a valid ES|QL request', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req).toHaveProperty('query'); + expect(typeof req.query).toBe('string'); + }); + + it('queries both alert events and alert actions data streams', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('.alerts-events'); + expect(req.query).toContain('.alerts-actions'); + }); + + it('filters for alert event type', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('type == "alert"'); + }); + + it('coalesces rule_id and episode_id from both schemas', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('COALESCE(rule.id, rule_id)'); + expect(req.query).toContain('COALESCE(episode.id, episode_id)'); + }); + + it('computes last_fired via INLINE STATS for fire/suppress actions', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('last_fired = MAX(last_series_event_timestamp)'); + expect(req.query).toContain('action_type == "fire" OR action_type == "suppress"'); + }); + + it('aggregates by rule_id, group_hash, episode_id, episode_status', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('BY rule_id, group_hash, episode_id, episode_status'); + }); + + it('keeps the expected output columns', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain( + 'KEEP last_event_timestamp, rule_id, group_hash, episode_id, episode_status' + ); + }); + + it('sorts by timestamp ascending with a limit', () => { + const req = getDispatchableAlertEventsQuery(); + + expect(req.query).toContain('SORT last_event_timestamp ASC'); + expect(req.query).toContain('LIMIT 10000'); + }); +}); + +describe('getAlertEpisodeSuppressionsQuery', () => { + it('builds a WHERE clause matching each episode rule_id and group_hash', () => { + const episodes = [ + createAlertEpisode({ rule_id: 'rule-1', group_hash: 'hash-1' }), + createAlertEpisode({ rule_id: 'rule-2', group_hash: 'hash-2' }), + ]; + + const req = getAlertEpisodeSuppressionsQuery(episodes); + + expect(req.query).toContain('rule_id == "rule-1" AND group_hash == "hash-1"'); + expect(req.query).toContain('rule_id == "rule-2" AND group_hash == "hash-2"'); + }); + + it('queries the alert actions data stream', () => { + const req = getAlertEpisodeSuppressionsQuery([createAlertEpisode()]); + + expect(req.query).toContain('.alerts-actions'); + }); + + it('filters for suppression action types', () => { + const req = getAlertEpisodeSuppressionsQuery([createAlertEpisode()]); + + expect(req.query).toContain( + 'action_type IN ("ack", "unack", "deactivate", "activate", "snooze", "unsnooze")' + ); + }); + + it('uses the minimum last_event_timestamp for snooze expiry filtering', () => { + const episodes = [ + createAlertEpisode({ last_event_timestamp: '2026-01-22T10:00:00.000Z' }), + createAlertEpisode({ last_event_timestamp: '2026-01-22T08:00:00.000Z' }), + ]; + + const req = getAlertEpisodeSuppressionsQuery(episodes); + + expect(req.query).toContain('expiry > "2026-01-22T08:00:00.000Z"::DATETIME'); + }); + + it('falls back to epoch when all timestamps are invalid', () => { + const episodes = [createAlertEpisode({ last_event_timestamp: 'not-a-date' })]; + + const req = getAlertEpisodeSuppressionsQuery(episodes); + + expect(req.query).toContain('expiry > "1970-01-01T00:00:00.000Z"::DATETIME'); + }); + + it('skips invalid timestamps when computing minimum', () => { + const episodes = [ + createAlertEpisode({ last_event_timestamp: 'not-a-date' }), + createAlertEpisode({ last_event_timestamp: '2026-01-22T09:00:00.000Z' }), + ]; + + const req = getAlertEpisodeSuppressionsQuery(episodes); + + expect(req.query).toContain('expiry > "2026-01-22T09:00:00.000Z"::DATETIME'); + }); + + it('computes should_suppress with snooze, ack, and deactivate precedence', () => { + const req = getAlertEpisodeSuppressionsQuery([createAlertEpisode()]); + + expect(req.query).toContain('EVAL should_suppress = CASE('); + expect(req.query).toContain('last_snooze_action == "snooze", TRUE'); + expect(req.query).toContain('last_ack_action == "ack", TRUE'); + expect(req.query).toContain('last_deactivate_action == "deactivate", TRUE'); + }); + + it('keeps the expected output columns', () => { + const req = getAlertEpisodeSuppressionsQuery([createAlertEpisode()]); + + expect(req.query).toContain( + 'KEEP rule_id, group_hash, episode_id, should_suppress, last_ack_action, last_deactivate_action, last_snooze_action' + ); + }); + + it('handles a single episode', () => { + const req = getAlertEpisodeSuppressionsQuery([ + createAlertEpisode({ rule_id: 'only-rule', group_hash: 'only-hash' }), + ]); + + expect(req.query).toContain('rule_id == "only-rule" AND group_hash == "only-hash"'); + }); +}); + +describe('getLastNotifiedTimestampsQuery', () => { + it('builds a query for a single notification group', () => { + const req = getLastNotifiedTimestampsQuery(['group-1']); + + expect(req.query).toContain('notification_group_id IN ("group-1")'); + expect(req.query).toContain('.alerts-actions'); + expect(req.query).toContain('last_notified = MAX(@timestamp)'); + }); + + it('builds a query for multiple notification groups', () => { + const req = getLastNotifiedTimestampsQuery(['group-1', 'group-2']); + + expect(req.query).toContain('notification_group_id IN ("group-1", "group-2")'); + }); + + it('filters for notified action type', () => { + const req = getLastNotifiedTimestampsQuery(['group-1']); + + expect(req.query).toContain('action_type == "notified"'); + }); + + it('keeps the expected output columns', () => { + const req = getLastNotifiedTimestampsQuery(['group-1']); + + expect(req.query).toContain('KEEP notification_group_id, last_notified'); + }); + + it('groups by notification_group_id', () => { + const req = getLastNotifiedTimestampsQuery(['group-1']); + + expect(req.query).toContain('BY notification_group_id'); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.ts index 512ad460aa588..78f5102c3c5b2 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/queries.ts @@ -15,7 +15,7 @@ import { ALERT_EVENTS_DATA_STREAM, type AlertEventType, } from '../../resources/alert_events'; -import type { AlertEpisode } from './types'; +import type { AlertEpisode, NotificationGroupId } from './types'; export const getDispatchableAlertEventsQuery = (): EsqlRequest => { const alertEventType: AlertEventType = 'alert'; @@ -73,5 +73,18 @@ export const getAlertEpisodeSuppressionsQuery = (alertEpisodes: AlertEpisode[]): last_deactivate_action == "deactivate", true, false ) - | KEEP rule_id, group_hash, episode_id, should_suppress`.toRequest(); + | KEEP rule_id, group_hash, episode_id, should_suppress, last_ack_action, last_deactivate_action, last_snooze_action`.toRequest(); +}; + +export const getLastNotifiedTimestampsQuery = ( + notificationGroupIds: NotificationGroupId[] +): EsqlRequest => { + const values = notificationGroupIds.map((id) => esql.str(id)); + const whereClause = esql.exp`action_type == "notified" AND notification_group_id IN (${values})`; + + return esql`FROM ${ALERT_ACTIONS_DATA_STREAM} + | WHERE ${whereClause} + | STATS last_notified = MAX(@timestamp) BY notification_group_id + | KEEP notification_group_id, last_notified + `.toRequest(); }; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.test.ts new file mode 100644 index 0000000000000..98562db5a0dce --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.test.ts @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ApplySuppressionStep, applySuppression } from './apply_suppression_step'; +import { + createAlertEpisode, + createAlertEpisodeSuppression, + createDispatcherPipelineState, +} from '../fixtures/test_utils'; + +describe('ApplySuppressionStep', () => { + const step = new ApplySuppressionStep(); + + it('separates suppressed and active episodes', async () => { + const ep1 = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const ep2 = createAlertEpisode({ rule_id: 'r2', group_hash: 'h2', episode_id: 'e2' }); + + const state = createDispatcherPipelineState({ + episodes: [ep1, ep2], + suppressions: [ + createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: true, + last_ack_action: 'ack', + }), + createAlertEpisodeSuppression({ + rule_id: 'r2', + group_hash: 'h2', + episode_id: 'e2', + should_suppress: false, + }), + ], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + + expect(result.data?.suppressed).toHaveLength(1); + expect(result.data?.suppressed?.[0]).toEqual( + expect.objectContaining({ rule_id: 'r1', reason: 'ack' }) + ); + expect(result.data?.dispatchable).toHaveLength(1); + expect(result.data?.dispatchable?.[0]).toEqual(expect.objectContaining({ rule_id: 'r2' })); + }); + + it('treats all episodes as active when there are no suppressions', async () => { + const state = createDispatcherPipelineState({ + episodes: [createAlertEpisode(), createAlertEpisode({ episode_id: 'e2' })], + suppressions: [], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.dispatchable).toHaveLength(2); + expect(result.data?.suppressed).toHaveLength(0); + }); + + it('handles empty episodes', async () => { + const state = createDispatcherPipelineState({ episodes: [], suppressions: [] }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.dispatchable).toHaveLength(0); + expect(result.data?.suppressed).toHaveLength(0); + }); +}); + +describe('applySuppression', () => { + it('suppresses by episode-level match', () => { + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const suppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: true, + last_ack_action: 'ack', + }); + + const { suppressed, dispatchable } = applySuppression([episode], [suppression]); + + expect(suppressed).toHaveLength(1); + expect(suppressed[0].reason).toBe('ack'); + expect(dispatchable).toHaveLength(0); + }); + + it('suppresses by series-level match (null episode_id)', () => { + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const suppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: null, + should_suppress: true, + last_snooze_action: 'snooze', + }); + + const { suppressed, dispatchable } = applySuppression([episode], [suppression]); + + expect(suppressed).toHaveLength(1); + expect(suppressed[0].reason).toBe('snooze'); + expect(dispatchable).toHaveLength(0); + }); + + it('uses deactivate reason when deactivated', () => { + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const suppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: true, + last_deactivate_action: 'deactivate', + }); + + const { suppressed } = applySuppression([episode], [suppression]); + + expect(suppressed[0].reason).toBe('deactivate'); + }); + + it('prefers episode-level suppression over series-level', () => { + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const episodeSuppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: true, + last_ack_action: 'ack', + }); + const seriesSuppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: null, + should_suppress: true, + last_snooze_action: 'snooze', + }); + + const { suppressed } = applySuppression([episode], [episodeSuppression, seriesSuppression]); + + expect(suppressed).toHaveLength(1); + expect(suppressed[0].reason).toBe('ack'); + }); + + it('does not suppress when should_suppress is false', () => { + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const suppression = createAlertEpisodeSuppression({ + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: false, + }); + + const { suppressed, dispatchable } = applySuppression([episode], [suppression]); + + expect(suppressed).toHaveLength(0); + expect(dispatchable).toHaveLength(1); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.ts new file mode 100644 index 0000000000000..3852e119c2dc8 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_suppression_step.ts @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { injectable } from 'inversify'; +import type { + AlertEpisode, + AlertEpisodeSuppression, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; + +@injectable() +export class ApplySuppressionStep implements DispatcherStep { + public readonly name = 'apply_suppression'; + + public async execute(state: Readonly): Promise { + const { episodes = [], suppressions = [] } = state; + + const { suppressed, dispatchable } = applySuppression(episodes, suppressions); + + return { type: 'continue', data: { suppressed, dispatchable } }; + } +} + +export function applySuppression( + episodes: readonly AlertEpisode[], + suppressions: readonly AlertEpisodeSuppression[] +): { suppressed: Array; dispatchable: AlertEpisode[] } { + const suppressionMap = new Map(); + + for (const s of suppressions) { + if (s.episode_id) { + suppressionMap.set(`${s.rule_id}:${s.group_hash}:${s.episode_id}`, s); + } else { + suppressionMap.set(`${s.rule_id}:${s.group_hash}:*`, s); + } + } + + const suppressed: Array = []; + const dispatchable: AlertEpisode[] = []; + + for (const ep of episodes) { + const episodeKey = `${ep.rule_id}:${ep.group_hash}:${ep.episode_id}`; + const seriesKey = `${ep.rule_id}:${ep.group_hash}:*`; + + const episodeSuppression = suppressionMap.get(episodeKey); + const seriesSuppression = suppressionMap.get(seriesKey); + + if (episodeSuppression?.should_suppress || seriesSuppression?.should_suppress) { + const matchingSuppression = episodeSuppression?.should_suppress + ? episodeSuppression + : seriesSuppression!; + suppressed.push({ ...ep, reason: getSuppressionReason(matchingSuppression) }); + } else { + dispatchable.push(ep); + } + } + + return { suppressed, dispatchable }; +} + +function getSuppressionReason(suppression: AlertEpisodeSuppression): string { + if (suppression.last_snooze_action === 'snooze') return 'snooze'; + if (suppression.last_ack_action === 'ack') return 'ack'; + if (suppression.last_deactivate_action === 'deactivate') return 'deactivate'; + return 'unknown suppression reason'; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.test.ts new file mode 100644 index 0000000000000..6b281f58ffe5c --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.test.ts @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { applyThrottling } from './apply_throttling_step'; +import { createNotificationGroup, createNotificationPolicy } from '../fixtures/test_utils'; + +describe('applyThrottling', () => { + it('dispatches group when no previous notification exists', () => { + const group = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const policy = createNotificationPolicy({ id: 'p1', throttle: { interval: '1h' } }); + + const { dispatch, throttled } = applyThrottling( + [group], + new Map([['p1', policy]]), + new Map(), + new Date('2026-01-22T10:00:00.000Z') + ); + + expect(dispatch).toHaveLength(1); + expect(throttled).toHaveLength(0); + }); + + it('throttles group when last notified within interval', () => { + const group = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const policy = createNotificationPolicy({ id: 'p1', throttle: { interval: '1h' } }); + + const { dispatch, throttled } = applyThrottling( + [group], + new Map([['p1', policy]]), + new Map([['g1', new Date('2026-01-22T09:30:00.000Z')]]), + new Date('2026-01-22T10:00:00.000Z') + ); + + expect(dispatch).toHaveLength(0); + expect(throttled).toHaveLength(1); + }); + + it('dispatches group when last notified outside interval', () => { + const group = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const policy = createNotificationPolicy({ id: 'p1', throttle: { interval: '1h' } }); + + const { dispatch, throttled } = applyThrottling( + [group], + new Map([['p1', policy]]), + new Map([['g1', new Date('2026-01-22T08:00:00.000Z')]]), + new Date('2026-01-22T10:00:00.000Z') + ); + + expect(dispatch).toHaveLength(1); + expect(throttled).toHaveLength(0); + }); + + it('dispatches group when policy has no throttle', () => { + const group = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const policy = createNotificationPolicy({ id: 'p1' }); + + const { dispatch, throttled } = applyThrottling( + [group], + new Map([['p1', policy]]), + new Map([['g1', new Date('2026-01-22T09:59:00.000Z')]]), + new Date('2026-01-22T10:00:00.000Z') + ); + + expect(dispatch).toHaveLength(1); + expect(throttled).toHaveLength(0); + }); + + it('handles mixed dispatch and throttle across groups', () => { + const g1 = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const g2 = createNotificationGroup({ id: 'g2', policyId: 'p1' }); + const policy = createNotificationPolicy({ id: 'p1', throttle: { interval: '1h' } }); + + const { dispatch, throttled } = applyThrottling( + [g1, g2], + new Map([['p1', policy]]), + new Map([['g1', new Date('2026-01-22T09:30:00.000Z')]]), + new Date('2026-01-22T10:00:00.000Z') + ); + + expect(dispatch).toHaveLength(1); + expect(dispatch[0].id).toBe('g2'); + expect(throttled).toHaveLength(1); + expect(throttled[0].id).toBe('g1'); + }); + + it('returns empty arrays when no groups', () => { + const { dispatch, throttled } = applyThrottling([], new Map(), new Map(), new Date()); + + expect(dispatch).toHaveLength(0); + expect(throttled).toHaveLength(0); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.ts new file mode 100644 index 0000000000000..bf221835e3169 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/apply_throttling_step.ts @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import type { + LastNotifiedRecord, + NotificationGroup, + NotificationGroupId, + NotificationPolicy, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; +import { + LoggerServiceToken, + type LoggerServiceContract, +} from '../../services/logger_service/logger_service'; +import type { QueryServiceContract } from '../../services/query_service/query_service'; +import { QueryServiceInternalToken } from '../../services/query_service/tokens'; +import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; +import { getLastNotifiedTimestampsQuery } from '../queries'; +import { parseDurationToMs } from '../../duration'; + +@injectable() +export class ApplyThrottlingStep implements DispatcherStep { + public readonly name = 'apply_throttling'; + + constructor( + @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract, + @inject(LoggerServiceToken) private readonly logger: LoggerServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { groups = [], policies = new Map(), input } = state; + + if (groups.length === 0) { + return { type: 'continue', data: { dispatch: [], throttled: [] } }; + } + + const lastNotifiedMap = await this.fetchLastNotifiedTimestamps(groups.map((g) => g.id)); + + const { dispatch, throttled } = applyThrottling( + groups, + policies, + lastNotifiedMap, + input.startedAt + ); + + this.logger.debug({ + message: () => + `Applied throttling to ${throttled.length} groups and dispatched ${dispatch.length} groups`, + }); + + return { type: 'continue', data: { dispatch, throttled } }; + } + + private async fetchLastNotifiedTimestamps( + notificationGroupIds: NotificationGroupId[] + ): Promise> { + const result = await this.queryService.executeQuery({ + query: getLastNotifiedTimestampsQuery(notificationGroupIds).query, + }); + + const records = queryResponseToRecords(result); + return new Map( + records.map((record) => [record.notification_group_id, new Date(record.last_notified)]) + ); + } +} + +export function applyThrottling( + groups: readonly NotificationGroup[], + policies: ReadonlyMap, + lastNotifiedMap: ReadonlyMap, + now: Date +): { dispatch: NotificationGroup[]; throttled: NotificationGroup[] } { + const dispatch: NotificationGroup[] = []; + const throttled: NotificationGroup[] = []; + + for (const group of groups) { + const policy = policies.get(group.policyId)!; + const lastNotified = lastNotifiedMap.get(group.id); + + if ( + lastNotified && + policy.throttle && + policy.throttle.interval && + isWithinInterval(lastNotified, policy.throttle.interval, now) + ) { + throttled.push(group); + } else { + dispatch.push(group); + } + } + + return { dispatch, throttled }; +} + +function isWithinInterval(lastNotifiedAt: Date, interval: string, now: Date): boolean { + try { + const intervalMillis = parseDurationToMs(interval); + return lastNotifiedAt.getTime() + intervalMillis > now.getTime(); + } catch { + return false; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.test.ts new file mode 100644 index 0000000000000..06b1c3c73402f --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.test.ts @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { BuildGroupsStep, buildNotificationGroups } from './build_groups_step'; +import { + createAlertEpisode, + createDispatcherPipelineState, + createMatchedPair, + createNotificationPolicy, +} from '../fixtures/test_utils'; + +describe('BuildGroupsStep', () => { + const step = new BuildGroupsStep(); + + it('returns notification groups from matched pairs', async () => { + const state = createDispatcherPipelineState({ + matched: [ + createMatchedPair({ + episode: createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }), + policy: createNotificationPolicy({ + id: 'p1', + destinations: [{ type: 'workflow', id: 'w1' }], + }), + }), + ], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.groups).toHaveLength(1); + expect(result.data?.groups?.[0].ruleId).toBe('r1'); + expect(result.data?.groups?.[0].policyId).toBe('p1'); + expect(result.data?.groups?.[0].episodes).toHaveLength(1); + }); + + it('returns empty groups when no matched pairs', async () => { + const state = createDispatcherPipelineState({ matched: [] }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.groups).toHaveLength(0); + }); +}); + +describe('buildNotificationGroups', () => { + it('creates separate groups for different episodes with no groupBy', () => { + const policy = createNotificationPolicy({ + id: 'p1', + destinations: [{ type: 'workflow', id: 'w1' }], + }); + const matched = [ + createMatchedPair({ + episode: createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }), + policy, + }), + createMatchedPair({ + episode: createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e2' }), + policy, + }), + ]; + + const groups = buildNotificationGroups(matched); + + expect(groups).toHaveLength(2); + }); + + it('groups episodes from same rule+policy+groupKey into same group', () => { + const policy = createNotificationPolicy({ + id: 'p1', + destinations: [{ type: 'workflow', id: 'w1' }], + }); + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + const matched = [ + createMatchedPair({ episode, policy }), + createMatchedPair({ episode, policy }), + ]; + + const groups = buildNotificationGroups(matched); + + expect(groups).toHaveLength(1); + expect(groups[0].episodes).toHaveLength(2); + }); + + it('assigns deterministic group IDs', () => { + const policy = createNotificationPolicy({ + id: 'p1', + destinations: [{ type: 'workflow', id: 'w1' }], + }); + const episode = createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }); + + const groups1 = buildNotificationGroups([createMatchedPair({ episode, policy })]); + const groups2 = buildNotificationGroups([createMatchedPair({ episode, policy })]); + + expect(groups1[0].id).toBe(groups2[0].id); + }); + + it('throws when groupBy fields are provided', () => { + const policy = createNotificationPolicy({ id: 'p1', groupBy: ['field1'] }); + const episode = createAlertEpisode(); + + expect(() => buildNotificationGroups([createMatchedPair({ episode, policy })])).toThrow( + 'Grouping by fields is not supported yet' + ); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.ts new file mode 100644 index 0000000000000..7d0111f558d9a --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/build_groups_step.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { injectable } from 'inversify'; +import objectHash from 'object-hash'; +import type { + MatchedPair, + NotificationGroup, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; + +@injectable() +export class BuildGroupsStep implements DispatcherStep { + public readonly name = 'build_groups'; + + public async execute(state: Readonly): Promise { + const { matched = [] } = state; + + const groups = buildNotificationGroups(matched); + + return { type: 'continue', data: { groups } }; + } +} + +export function buildNotificationGroups(matched: readonly MatchedPair[]): NotificationGroup[] { + const groupMap = new Map(); + + for (const { episode, policy } of matched) { + let groupKey: Record = {}; + if (policy.groupBy.length === 0) { + groupKey = { + groupHash: episode.group_hash, + episodeId: episode.episode_id, + }; + } else { + throw new Error('Grouping by fields is not supported yet'); + } + + const notificationGroupId = objectHash({ + ruleId: episode.rule_id, + policyId: policy.id, + groupKey, + }); + + if (!groupMap.has(notificationGroupId)) { + groupMap.set(notificationGroupId, { + id: notificationGroupId, + ruleId: episode.rule_id, + policyId: policy.id, + destinations: policy.destinations, + groupKey, + episodes: [], + }); + } + + groupMap.get(notificationGroupId)!.episodes.push(episode); + } + + return [...groupMap.values()]; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.test.ts new file mode 100644 index 0000000000000..de5d8baab322c --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.test.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createLoggerService } from '../../services/logger_service/logger_service.mock'; +import { + createDispatcherPipelineState, + createNotificationGroup, + createNotificationPolicy, +} from '../fixtures/test_utils'; +import { DispatchStep } from './dispatch_step'; + +describe('DispatchStep', () => { + afterEach(() => jest.clearAllMocks()); + + it('logs debug message for each dispatch group', async () => { + const { loggerService, mockLogger } = createLoggerService(); + const step = new DispatchStep(loggerService); + + const group1 = createNotificationGroup({ id: 'g1', policyId: 'p1' }); + const group2 = createNotificationGroup({ id: 'g2', policyId: 'p2' }); + const policy1 = createNotificationPolicy({ + id: 'p1', + destinations: [{ type: 'workflow', id: 'workflow-1' }], + }); + const policy2 = createNotificationPolicy({ + id: 'p2', + destinations: [{ type: 'workflow', id: 'workflow-2' }], + }); + + const state = createDispatcherPipelineState({ + dispatch: [group1, group2], + policies: new Map([ + ['p1', policy1], + ['p2', policy2], + ]), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + expect(mockLogger.debug).toHaveBeenCalledTimes(2); + }); + + it('continues with no-op when dispatch is empty', async () => { + const { loggerService, mockLogger } = createLoggerService(); + const step = new DispatchStep(loggerService); + + const state = createDispatcherPipelineState({ dispatch: [] }); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + expect(mockLogger.debug).not.toHaveBeenCalled(); + }); + + it('continues when dispatch is undefined', async () => { + const { loggerService, mockLogger } = createLoggerService(); + const step = new DispatchStep(loggerService); + + const state = createDispatcherPipelineState({}); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + expect(mockLogger.debug).not.toHaveBeenCalled(); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.ts new file mode 100644 index 0000000000000..0ac8dd438932a --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/dispatch_step.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import { + LoggerServiceToken, + type LoggerServiceContract, +} from '../../services/logger_service/logger_service'; +import type { DispatcherPipelineState, DispatcherStep, DispatcherStepOutput } from '../types'; + +@injectable() +export class DispatchStep implements DispatcherStep { + public readonly name = 'dispatch'; + + constructor(@inject(LoggerServiceToken) private readonly logger: LoggerServiceContract) {} + + public async execute(state: Readonly): Promise { + const { dispatch = [] } = state; + + for (const group of dispatch) { + this.logger.debug({ + message: () => + `Dispatching notification group ${group.id} for policy ${group.policyId} with ${group.destinations.length} destination(s)`, + }); + } + + return { type: 'continue' }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.test.ts new file mode 100644 index 0000000000000..3cb065bbfb13d --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.test.ts @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { EvaluateMatchersStep, evaluateMatchers } from './evaluate_matchers_step'; +import { + createAlertEpisode, + createDispatcherPipelineState, + createNotificationPolicy, + createRule, +} from '../fixtures/test_utils'; + +describe('EvaluateMatchersStep', () => { + const step = new EvaluateMatchersStep(); + + it('returns matched pairs for episodes with catch-all policies', async () => { + const episode = createAlertEpisode({ rule_id: 'r1' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ id: 'p1' }); + + const state = createDispatcherPipelineState({ + dispatchable: [episode], + rules: new Map([['r1', rule]]), + policies: new Map([['p1', policy]]), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.matched).toHaveLength(1); + expect(result.data?.matched?.[0].episode).toBe(episode); + expect(result.data?.matched?.[0].policy).toBe(policy); + }); + + it('returns empty when no episodes', async () => { + const state = createDispatcherPipelineState({ + dispatchable: [], + rules: new Map(), + policies: new Map(), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.matched).toHaveLength(0); + }); +}); + +describe('evaluateMatchers', () => { + it('matches episode to all catch-all policies on its rule', () => { + const episode = createAlertEpisode({ rule_id: 'r1' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1', 'p2'] }); + const p1 = createNotificationPolicy({ id: 'p1' }); + const p2 = createNotificationPolicy({ id: 'p2' }); + + const matched = evaluateMatchers( + [episode], + new Map([['r1', rule]]), + new Map([ + ['p1', p1], + ['p2', p2], + ]) + ); + + expect(matched).toHaveLength(2); + }); + + it('skips episodes whose rule is not found', () => { + const episode = createAlertEpisode({ rule_id: 'unknown-rule' }); + + const matched = evaluateMatchers([episode], new Map(), new Map()); + + expect(matched).toHaveLength(0); + }); + + it('skips policies that are not found', () => { + const episode = createAlertEpisode({ rule_id: 'r1' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['missing-policy'] }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map()); + + expect(matched).toHaveLength(0); + }); + + it('does not match when KQL matcher evaluates to false', () => { + const episode = createAlertEpisode({ rule_id: 'r1', episode_status: 'inactive' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ id: 'p1', matcher: 'episode_status: active' }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map([['p1', policy]])); + + expect(matched).toHaveLength(0); + }); + + it('matches when KQL matcher evaluates to true', () => { + const episode = createAlertEpisode({ rule_id: 'r1', episode_status: 'active' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ id: 'p1', matcher: 'episode_status: active' }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map([['p1', policy]])); + + expect(matched).toHaveLength(1); + expect(matched[0].episode).toBe(episode); + expect(matched[0].policy).toBe(policy); + }); + + it('matches with complex KQL using AND operator', () => { + const episode = createAlertEpisode({ + rule_id: 'r1', + episode_status: 'active', + group_hash: 'critical-group', + }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ + id: 'p1', + matcher: 'episode_status: active and group_hash: critical-group', + }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map([['p1', policy]])); + + expect(matched).toHaveLength(1); + }); + + it('matches with complex KQL using OR operator', () => { + const episode = createAlertEpisode({ rule_id: 'r1', episode_status: 'recovering' }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ + id: 'p1', + matcher: 'episode_status: active or episode_status: recovering', + }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map([['p1', policy]])); + + expect(matched).toHaveLength(1); + }); + + it('does not match when AND condition is partially met', () => { + const episode = createAlertEpisode({ + rule_id: 'r1', + episode_status: 'active', + group_hash: 'normal-group', + }); + const rule = createRule({ id: 'r1', notificationPolicyIds: ['p1'] }); + const policy = createNotificationPolicy({ + id: 'p1', + matcher: 'episode_status: active and group_hash: critical-group', + }); + + const matched = evaluateMatchers([episode], new Map([['r1', rule]]), new Map([['p1', policy]])); + + expect(matched).toHaveLength(0); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.ts new file mode 100644 index 0000000000000..5b03e234dd27c --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/evaluate_matchers_step.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { injectable } from 'inversify'; +import { evaluateKql } from '@kbn/eval-kql'; +import type { + AlertEpisode, + MatchedPair, + NotificationPolicy, + NotificationPolicyId, + Rule, + RuleId, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; + +@injectable() +export class EvaluateMatchersStep implements DispatcherStep { + public readonly name = 'evaluate_matchers'; + + public async execute(state: Readonly): Promise { + const { dispatchable = [], rules = new Map(), policies = new Map() } = state; + + const matched = evaluateMatchers(dispatchable, rules, policies); + + return { type: 'continue', data: { matched } }; + } +} + +export function evaluateMatchers( + dispatchable: readonly AlertEpisode[], + rules: ReadonlyMap, + policies: ReadonlyMap +): MatchedPair[] { + const matched: MatchedPair[] = []; + + for (const episode of dispatchable) { + const rule = rules.get(episode.rule_id); + if (!rule) continue; + + for (const policyId of rule.notificationPolicyIds) { + const policy = policies.get(policyId); + if (!policy) continue; + + if (!policy.matcher) { + matched.push({ episode, policy }); + continue; + } + + const isMatch = evaluateKql(policy.matcher, episode); + if (isMatch) { + matched.push({ episode, policy }); + } + } + } + + return matched; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.test.ts new file mode 100644 index 0000000000000..49767d0dc63af --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.test.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FetchEpisodesStep } from './fetch_episodes_step'; +import { createQueryService } from '../../services/query_service/query_service.mock'; +import { createDispatchableAlertEventsResponse } from '../fixtures/dispatcher'; +import { createAlertEpisode, createDispatcherPipelineState } from '../fixtures/test_utils'; + +describe('FetchEpisodesStep', () => { + it('returns episodes and continues when episodes are found', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new FetchEpisodesStep(queryService); + + const episodes = [ + createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' }), + createAlertEpisode({ rule_id: 'r2', group_hash: 'h2', episode_id: 'e2' }), + ]; + + mockEsClient.esql.query.mockResolvedValueOnce(createDispatchableAlertEventsResponse(episodes)); + + const state = createDispatcherPipelineState(); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.episodes).toHaveLength(2); + expect(result.data?.episodes?.[0].rule_id).toBe('r1'); + }); + + it('halts with no_episodes when none are found', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new FetchEpisodesStep(queryService); + + mockEsClient.esql.query.mockResolvedValueOnce(createDispatchableAlertEventsResponse([])); + + const state = createDispatcherPipelineState(); + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'no_episodes' }); + }); + + it('propagates query errors', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new FetchEpisodesStep(queryService); + + mockEsClient.esql.query.mockRejectedValueOnce(new Error('ES error')); + + const state = createDispatcherPipelineState(); + await expect(step.execute(state)).rejects.toThrow('ES error'); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.ts new file mode 100644 index 0000000000000..26e7249d00bab --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_episodes_step.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import moment from 'moment'; +import { inject, injectable } from 'inversify'; +import type { + AlertEpisode, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; +import type { QueryServiceContract } from '../../services/query_service/query_service'; +import { QueryServiceInternalToken } from '../../services/query_service/tokens'; +import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; +import { LOOKBACK_WINDOW_MINUTES } from '../constants'; +import { getDispatchableAlertEventsQuery } from '../queries'; + +@injectable() +export class FetchEpisodesStep implements DispatcherStep { + public readonly name = 'fetch_episodes'; + + constructor( + @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { previousStartedAt } = state.input; + + const lookback = moment(previousStartedAt) + .subtract(LOOKBACK_WINDOW_MINUTES, 'minutes') + .toISOString(); + + const result = await this.queryService.executeQuery({ + query: getDispatchableAlertEventsQuery().query, + filter: { + range: { + '@timestamp': { + gte: lookback, + }, + }, + }, + }); + + const episodes = queryResponseToRecords(result); + + if (episodes.length === 0) { + return { type: 'halt', reason: 'no_episodes' }; + } + + return { type: 'continue', data: { episodes } }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.test.ts new file mode 100644 index 0000000000000..81168add89d5d --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.test.ts @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsClientContract } from '@kbn/core/server'; +import { FetchPoliciesStep } from './fetch_policies_step'; +import type { NotificationPolicySavedObjectService } from '../../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import { createNotificationPolicySavedObjectService } from '../../services/notification_policy_saved_object_service/notification_policy_saved_object_service.mock'; +import { NOTIFICATION_POLICY_SAVED_OBJECT_TYPE } from '../../../saved_objects'; +import { createDispatcherPipelineState, createRule } from '../fixtures/test_utils'; + +describe('FetchPoliciesStep', () => { + let npSoService: NotificationPolicySavedObjectService; + let mockSavedObjectsClient: jest.Mocked; + + beforeEach(() => { + ({ notificationPolicySavedObjectService: npSoService, mockSavedObjectsClient } = + createNotificationPolicySavedObjectService()); + }); + + it('fetches unique policies from rules', async () => { + mockSavedObjectsClient.bulkGet.mockResolvedValue({ + saved_objects: [ + { + id: 'p1', + type: NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, + attributes: { + name: 'Policy 1', + description: 'Test', + destinations: [{ type: 'workflow' as const, id: 'w1' }], + createdBy: null, + updatedBy: null, + createdAt: '2026-01-01T00:00:00.000Z', + updatedAt: '2026-01-01T00:00:00.000Z', + }, + references: [], + }, + ], + }); + + const step = new FetchPoliciesStep(npSoService); + const state = createDispatcherPipelineState({ + rules: new Map([ + ['r1', createRule({ id: 'r1', notificationPolicyIds: ['p1'] })], + ['r2', createRule({ id: 'r2', notificationPolicyIds: ['p1'] })], + ]), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.policies?.size).toBe(1); + expect(result.data?.policies?.get('p1')?.name).toBe('Policy 1'); + expect(mockSavedObjectsClient.bulkGet).toHaveBeenCalledWith( + [{ type: NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, id: 'p1' }], + undefined + ); + }); + + it('returns empty map when rules is empty', async () => { + const step = new FetchPoliciesStep(npSoService); + + const state = createDispatcherPipelineState({ rules: new Map() }); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.policies?.size).toBe(0); + expect(mockSavedObjectsClient.bulkGet).not.toHaveBeenCalled(); + }); + + it('returns empty map when rules have no policy IDs', async () => { + const step = new FetchPoliciesStep(npSoService); + + const state = createDispatcherPipelineState({ + rules: new Map([['r1', createRule({ id: 'r1', notificationPolicyIds: [] })]]), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.policies?.size).toBe(0); + expect(mockSavedObjectsClient.bulkGet).not.toHaveBeenCalled(); + }); + + it('skips documents with errors', async () => { + mockSavedObjectsClient.bulkGet.mockResolvedValue({ + saved_objects: [ + { + id: 'p1', + type: NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, + attributes: {}, + references: [], + error: { statusCode: 404, message: 'Not found', error: 'Not Found' }, + }, + ], + } as any); + + const step = new FetchPoliciesStep(npSoService); + const state = createDispatcherPipelineState({ + rules: new Map([['r1', createRule({ id: 'r1', notificationPolicyIds: ['p1'] })]]), + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.policies?.size).toBe(0); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.ts new file mode 100644 index 0000000000000..1e800148fcba0 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_policies_step.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import type { + NotificationPolicy, + NotificationPolicyId, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; +import type { NotificationPolicySavedObjectServiceContract } from '../../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import { NotificationPolicySavedObjectServiceInternalToken } from '../../services/notification_policy_saved_object_service/tokens'; + +@injectable() +export class FetchPoliciesStep implements DispatcherStep { + public readonly name = 'fetch_policies'; + + constructor( + @inject(NotificationPolicySavedObjectServiceInternalToken) + private readonly notificationPolicySavedObjectService: NotificationPolicySavedObjectServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { rules } = state; + if (!rules || rules.size === 0) { + return { type: 'continue', data: { policies: new Map() } }; + } + + const uniquePolicyIds = Array.from( + new Set(rules.values().flatMap((r) => r.notificationPolicyIds)) + ); + if (uniquePolicyIds.length === 0) { + return { type: 'continue', data: { policies: new Map() } }; + } + + const result = await this.notificationPolicySavedObjectService.bulkGetByIds(uniquePolicyIds); + const policies = new Map(); + + for (const doc of result) { + if ('error' in doc) continue; + + policies.set(doc.id, { + id: doc.id, + name: doc.attributes.name, + destinations: doc.attributes.destinations ?? [], + matcher: doc.attributes.matcher, + groupBy: doc.attributes.group_by ?? [], + throttle: doc.attributes.throttle, + }); + } + + return { type: 'continue', data: { policies } }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.test.ts new file mode 100644 index 0000000000000..24045525ccf3d --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.test.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsClientContract } from '@kbn/core/server'; +import { FetchRulesStep } from './fetch_rules_step'; +import type { RulesSavedObjectService } from '../../services/rules_saved_object_service/rules_saved_object_service'; +import { createRulesSavedObjectService } from '../../services/rules_saved_object_service/rules_saved_object_service.mock'; +import { createRuleSoAttributes } from '../../test_utils'; +import { RULE_SAVED_OBJECT_TYPE } from '../../../saved_objects'; +import { createAlertEpisode, createDispatcherPipelineState } from '../fixtures/test_utils'; + +describe('FetchRulesStep', () => { + let rulesSoService: RulesSavedObjectService; + let mockSavedObjectsClient: jest.Mocked; + + beforeEach(() => { + ({ rulesSavedObjectService: rulesSoService, mockSavedObjectsClient } = + createRulesSavedObjectService()); + }); + + it('fetches rules for unique rule IDs from active episodes', async () => { + mockSavedObjectsClient.bulkGet.mockResolvedValue({ + saved_objects: [ + { + id: 'r1', + type: RULE_SAVED_OBJECT_TYPE, + attributes: createRuleSoAttributes({ + metadata: { name: 'Rule 1' }, + notification_policies: [{ ref: 'p1' }], + }), + references: [], + }, + ], + }); + + const step = new FetchRulesStep(rulesSoService); + const state = createDispatcherPipelineState({ + dispatchable: [ + createAlertEpisode({ rule_id: 'r1' }), + createAlertEpisode({ rule_id: 'r1', episode_id: 'e2' }), + ], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.rules?.size).toBe(1); + expect(result.data?.rules?.get('r1')?.name).toBe('Rule 1'); + expect(mockSavedObjectsClient.bulkGet).toHaveBeenCalledWith([ + { type: RULE_SAVED_OBJECT_TYPE, id: 'r1' }, + ]); + }); + + it('returns empty map when no active episodes', async () => { + const step = new FetchRulesStep(rulesSoService); + + const state = createDispatcherPipelineState({ dispatchable: [] }); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.rules?.size).toBe(0); + expect(mockSavedObjectsClient.bulkGet).not.toHaveBeenCalled(); + }); + + it('skips documents with errors', async () => { + mockSavedObjectsClient.bulkGet.mockResolvedValue({ + saved_objects: [ + { + id: 'r1', + type: RULE_SAVED_OBJECT_TYPE, + attributes: {}, + references: [], + error: { statusCode: 404, message: 'Not found', error: 'Not Found' }, + }, + ], + } as any); + + const step = new FetchRulesStep(rulesSoService); + const state = createDispatcherPipelineState({ + dispatchable: [createAlertEpisode({ rule_id: 'r1' })], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.rules?.size).toBe(0); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.ts new file mode 100644 index 0000000000000..afcaf0fcfbd2d --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_rules_step.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import type { RulesSavedObjectServiceContract } from '../../services/rules_saved_object_service/rules_saved_object_service'; +import { RulesSavedObjectServiceInternalToken } from '../../services/rules_saved_object_service/tokens'; +import type { + DispatcherPipelineState, + DispatcherStep, + DispatcherStepOutput, + Rule, + RuleId, +} from '../types'; + +@injectable() +export class FetchRulesStep implements DispatcherStep { + public readonly name = 'fetch_rules'; + + constructor( + @inject(RulesSavedObjectServiceInternalToken) + private readonly rulesSavedObjectService: RulesSavedObjectServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { dispatchable = [] } = state; + + const uniqueRuleIds = Array.from(new Set(dispatchable.map((ep) => ep.rule_id))); + if (uniqueRuleIds.length === 0) { + return { type: 'continue', data: { rules: new Map() } }; + } + + const result = await this.rulesSavedObjectService.bulkGetByIds(uniqueRuleIds); + const rules = new Map(); + + for (const doc of result) { + if ('error' in doc) continue; + + rules.set(doc.id, { + id: doc.id, + name: doc.attributes.metadata.name, + description: doc.attributes.metadata.owner ?? '', + notificationPolicyIds: doc.attributes.notification_policies?.map((p) => p.ref) ?? [], + enabled: doc.attributes.enabled, + createdAt: doc.attributes.createdAt, + updatedAt: doc.attributes.updatedAt, + }); + } + + return { type: 'continue', data: { rules } }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.test.ts new file mode 100644 index 0000000000000..f35dd6a1e20fb --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.test.ts @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { FetchSuppressionsStep } from './fetch_suppressions_step'; +import { createQueryService } from '../../services/query_service/query_service.mock'; +import { createAlertEpisodeSuppressionsResponse } from '../fixtures/dispatcher'; +import { createAlertEpisode, createDispatcherPipelineState } from '../fixtures/test_utils'; + +describe('FetchSuppressionsStep', () => { + it('fetches suppressions for provided episodes', async () => { + const { queryService, mockEsClient } = createQueryService(); + const step = new FetchSuppressionsStep(queryService); + + mockEsClient.esql.query.mockResolvedValueOnce( + createAlertEpisodeSuppressionsResponse([ + { + rule_id: 'r1', + group_hash: 'h1', + episode_id: 'e1', + should_suppress: true, + }, + ]) + ); + + const state = createDispatcherPipelineState({ + episodes: [createAlertEpisode({ rule_id: 'r1', group_hash: 'h1', episode_id: 'e1' })], + }); + + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.suppressions).toHaveLength(1); + expect(result.data?.suppressions?.[0].should_suppress).toBe(true); + }); + + it('returns empty suppressions when no episodes exist', async () => { + const { queryService } = createQueryService(); + const step = new FetchSuppressionsStep(queryService); + + const state = createDispatcherPipelineState({ episodes: [] }); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.suppressions).toHaveLength(0); + }); + + it('returns empty suppressions when episodes is undefined', async () => { + const { queryService } = createQueryService(); + const step = new FetchSuppressionsStep(queryService); + + const state = createDispatcherPipelineState(); + const result = await step.execute(state); + + expect(result.type).toBe('continue'); + if (result.type !== 'continue') return; + expect(result.data?.suppressions).toHaveLength(0); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.ts new file mode 100644 index 0000000000000..6414f5a5ec973 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/fetch_suppressions_step.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import type { + AlertEpisodeSuppression, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; +import type { QueryServiceContract } from '../../services/query_service/query_service'; +import { QueryServiceInternalToken } from '../../services/query_service/tokens'; +import { queryResponseToRecords } from '../../services/query_service/query_response_to_records'; +import { getAlertEpisodeSuppressionsQuery } from '../queries'; + +@injectable() +export class FetchSuppressionsStep implements DispatcherStep { + public readonly name = 'fetch_suppressions'; + + constructor( + @inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { episodes } = state; + if (!episodes || episodes.length === 0) { + return { type: 'continue', data: { suppressions: [] } }; + } + + const result = await this.queryService.executeQuery({ + query: getAlertEpisodeSuppressionsQuery(episodes).query, + }); + + const suppressions = queryResponseToRecords(result); + return { type: 'continue', data: { suppressions } }; + } +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/index.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/index.ts new file mode 100644 index 0000000000000..8cdc17f92531d --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/index.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { FetchEpisodesStep } from './fetch_episodes_step'; +export { FetchSuppressionsStep } from './fetch_suppressions_step'; +export { ApplySuppressionStep } from './apply_suppression_step'; +export { FetchRulesStep } from './fetch_rules_step'; +export { FetchPoliciesStep } from './fetch_policies_step'; +export { EvaluateMatchersStep } from './evaluate_matchers_step'; +export { BuildGroupsStep } from './build_groups_step'; +export { ApplyThrottlingStep } from './apply_throttling_step'; +export { DispatchStep } from './dispatch_step'; +export { StoreActionsStep } from './record_actions_step'; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.test.ts new file mode 100644 index 0000000000000..9b50c8680b613 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.test.ts @@ -0,0 +1,339 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StoreActionsStep } from './record_actions_step'; +import type { StorageServiceContract } from '../../services/storage_service/storage_service'; +import { ALERT_ACTIONS_DATA_STREAM } from '../../../resources/alert_actions'; +import { + createDispatcherPipelineState, + createAlertEpisode, + createNotificationGroup, +} from '../fixtures/test_utils'; + +const createMockStorageService = (): jest.Mocked => ({ + bulkIndexDocs: jest.fn().mockResolvedValue(undefined), +}); + +describe('StoreActionsStep', () => { + const mockDate = new Date('2026-01-22T08:00:00.000Z'); + + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(mockDate); + }); + + afterEach(() => { + jest.useRealTimers(); + jest.clearAllMocks(); + }); + + it('halts when suppressed, throttled, and dispatch are all empty', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const state = createDispatcherPipelineState({ + suppressed: [], + throttled: [], + dispatch: [], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'no_actions' }); + expect(mockService.bulkIndexDocs).not.toHaveBeenCalled(); + }); + + it('halts when suppressed, throttled, and dispatch are undefined', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const state = createDispatcherPipelineState({}); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'halt', reason: 'no_actions' }); + expect(mockService.bulkIndexDocs).not.toHaveBeenCalled(); + }); + + it('records suppressed episodes with action_type suppress', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const episode = createAlertEpisode({ + rule_id: 'rule-1', + group_hash: 'hash-1', + last_event_timestamp: '2026-01-22T07:00:00.000Z', + }); + + const state = createDispatcherPipelineState({ + suppressed: [{ ...episode, reason: 'user acknowledged' }], + throttled: [], + dispatch: [], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue' }); + expect(mockService.bulkIndexDocs).toHaveBeenCalledTimes(1); + expect(mockService.bulkIndexDocs).toHaveBeenCalledWith({ + index: ALERT_ACTIONS_DATA_STREAM, + docs: [ + { + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-1', + last_series_event_timestamp: '2026-01-22T07:00:00.000Z', + actor: 'system', + action_type: 'suppress', + rule_id: 'rule-1', + source: 'internal', + reason: 'user acknowledged', + }, + ], + }); + }); + + it('records throttled notification groups with throttle-specific reason', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const episode = createAlertEpisode({ + rule_id: 'rule-1', + group_hash: 'hash-1', + last_event_timestamp: '2026-01-22T07:00:00.000Z', + }); + + const group = createNotificationGroup({ + id: 'group-1', + policyId: 'policy-1', + episodes: [episode], + }); + + const state = createDispatcherPipelineState({ + suppressed: [], + throttled: [group], + dispatch: [], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue' }); + expect(mockService.bulkIndexDocs).toHaveBeenCalledTimes(1); + expect(mockService.bulkIndexDocs).toHaveBeenCalledWith({ + index: ALERT_ACTIONS_DATA_STREAM, + docs: [ + { + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-1', + last_series_event_timestamp: '2026-01-22T07:00:00.000Z', + actor: 'system', + action_type: 'suppress', + rule_id: 'rule-1', + source: 'internal', + reason: 'suppressed by throttled policy policy-1', + }, + ], + }); + }); + + it('records dispatched episodes with fire and notified action types', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const episode = createAlertEpisode({ + rule_id: 'rule-1', + group_hash: 'hash-1', + last_event_timestamp: '2026-01-22T07:00:00.000Z', + }); + + const group = createNotificationGroup({ + id: 'group-1', + ruleId: 'rule-1', + policyId: 'policy-1', + episodes: [episode], + }); + + const state = createDispatcherPipelineState({ + suppressed: [], + throttled: [], + dispatch: [group], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue' }); + expect(mockService.bulkIndexDocs).toHaveBeenCalledTimes(1); + expect(mockService.bulkIndexDocs).toHaveBeenCalledWith({ + index: ALERT_ACTIONS_DATA_STREAM, + docs: [ + { + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-1', + last_series_event_timestamp: '2026-01-22T07:00:00.000Z', + actor: 'system', + action_type: 'fire', + rule_id: 'rule-1', + source: 'internal', + reason: 'dispatched by policy policy-1', + }, + { + '@timestamp': mockDate.toISOString(), + actor: 'system', + action_type: 'notified', + rule_id: 'rule-1', + group_hash: 'irrelevant', + last_series_event_timestamp: mockDate.toISOString(), + notification_group_id: 'group-1', + source: 'internal', + reason: 'notified by policy policy-1 with throttle interval', + }, + ], + }); + }); + + it('handles combined suppressed, throttled, and dispatch arrays', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const suppressedEpisode = createAlertEpisode({ + rule_id: 'rule-suppressed', + group_hash: 'hash-suppressed', + episode_id: 'ep-suppressed', + last_event_timestamp: '2026-01-22T07:00:00.000Z', + }); + + const throttledEpisode = createAlertEpisode({ + rule_id: 'rule-throttled', + group_hash: 'hash-throttled', + episode_id: 'ep-throttled', + last_event_timestamp: '2026-01-22T07:10:00.000Z', + }); + + const dispatchEpisode = createAlertEpisode({ + rule_id: 'rule-dispatch', + group_hash: 'hash-dispatch', + episode_id: 'ep-dispatch', + last_event_timestamp: '2026-01-22T07:20:00.000Z', + }); + + const throttledGroup = createNotificationGroup({ + id: 'throttled-group', + policyId: 'throttle-policy', + episodes: [throttledEpisode], + }); + + const dispatchGroup = createNotificationGroup({ + id: 'dispatch-group', + ruleId: 'rule-dispatch', + policyId: 'dispatch-policy', + episodes: [dispatchEpisode], + }); + + const state = createDispatcherPipelineState({ + suppressed: [{ ...suppressedEpisode, reason: 'manually suppressed' }], + throttled: [throttledGroup], + dispatch: [dispatchGroup], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue' }); + expect(mockService.bulkIndexDocs).toHaveBeenCalledTimes(1); + + const callArgs = mockService.bulkIndexDocs.mock.calls[0][0]; + expect(callArgs.index).toBe(ALERT_ACTIONS_DATA_STREAM); + expect(callArgs.docs).toHaveLength(4); + + expect(callArgs.docs[0]).toEqual({ + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-suppressed', + last_series_event_timestamp: '2026-01-22T07:00:00.000Z', + actor: 'system', + action_type: 'suppress', + rule_id: 'rule-suppressed', + source: 'internal', + reason: 'manually suppressed', + }); + + expect(callArgs.docs[1]).toEqual({ + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-throttled', + last_series_event_timestamp: '2026-01-22T07:10:00.000Z', + actor: 'system', + action_type: 'suppress', + rule_id: 'rule-throttled', + source: 'internal', + reason: 'suppressed by throttled policy throttle-policy', + }); + + expect(callArgs.docs[2]).toEqual({ + '@timestamp': mockDate.toISOString(), + group_hash: 'hash-dispatch', + last_series_event_timestamp: '2026-01-22T07:20:00.000Z', + actor: 'system', + action_type: 'fire', + rule_id: 'rule-dispatch', + source: 'internal', + reason: 'dispatched by policy dispatch-policy', + }); + + expect(callArgs.docs[3]).toEqual({ + '@timestamp': mockDate.toISOString(), + actor: 'system', + action_type: 'notified', + rule_id: 'rule-dispatch', + group_hash: 'irrelevant', + last_series_event_timestamp: mockDate.toISOString(), + notification_group_id: 'dispatch-group', + source: 'internal', + reason: 'notified by policy dispatch-policy with throttle interval', + }); + }); + + it('records multiple episodes within a single dispatch group', async () => { + const mockService = createMockStorageService(); + const step = new StoreActionsStep(mockService); + + const episode1 = createAlertEpisode({ + rule_id: 'rule-1', + group_hash: 'hash-1', + episode_id: 'ep-1', + last_event_timestamp: '2026-01-22T07:00:00.000Z', + }); + + const episode2 = createAlertEpisode({ + rule_id: 'rule-1', + group_hash: 'hash-2', + episode_id: 'ep-2', + last_event_timestamp: '2026-01-22T07:05:00.000Z', + }); + + const group = createNotificationGroup({ + id: 'group-1', + ruleId: 'rule-1', + policyId: 'policy-1', + episodes: [episode1, episode2], + }); + + const state = createDispatcherPipelineState({ + dispatch: [group], + }); + + const result = await step.execute(state); + + expect(result).toEqual({ type: 'continue' }); + expect(mockService.bulkIndexDocs).toHaveBeenCalledTimes(1); + + const callArgs = mockService.bulkIndexDocs.mock.calls[0][0]; + expect(callArgs.docs).toHaveLength(3); + expect(callArgs.docs[0].action_type).toBe('fire'); + expect(callArgs.docs[0].group_hash).toBe('hash-1'); + expect(callArgs.docs[1].action_type).toBe('fire'); + expect(callArgs.docs[1].group_hash).toBe('hash-2'); + expect(callArgs.docs[2].action_type).toBe('notified'); + expect(callArgs.docs[2].notification_group_id).toBe('group-1'); + }); +}); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.ts new file mode 100644 index 0000000000000..0c6d7d9198faa --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/record_actions_step.ts @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { inject, injectable } from 'inversify'; +import { ALERT_ACTIONS_DATA_STREAM, type AlertAction } from '../../../resources/alert_actions'; +import type { + AlertEpisode, + DispatcherStep, + DispatcherPipelineState, + DispatcherStepOutput, +} from '../types'; +import type { StorageServiceContract } from '../../services/storage_service/storage_service'; +import { StorageServiceInternalToken } from '../../services/storage_service/tokens'; + +@injectable() +export class StoreActionsStep implements DispatcherStep { + public readonly name = 'record_actions'; + + constructor( + @inject(StorageServiceInternalToken) private readonly storageService: StorageServiceContract + ) {} + + public async execute(state: Readonly): Promise { + const { suppressed = [], throttled = [], dispatch = [] } = state; + if (suppressed.length === 0 && throttled.length === 0 && dispatch.length === 0) { + return { type: 'halt', reason: 'no_actions' }; + } + + const now = new Date(); + + await this.storageService.bulkIndexDocs({ + index: ALERT_ACTIONS_DATA_STREAM, + docs: [ + ...suppressed.map((episode) => + toAction({ episode, actionType: 'suppress', now, reason: episode.reason }) + ), + ...throttled.flatMap((group) => + group.episodes.map((episode) => + toAction({ + episode, + actionType: 'suppress', + now, + reason: `suppressed by throttled policy ${group.policyId}`, + }) + ) + ), + ...dispatch.flatMap((group) => + group.episodes.map((episode) => + toAction({ + episode, + actionType: 'fire', + now, + reason: `dispatched by policy ${group.policyId}`, + }) + ) + ), + ...dispatch.map((group) => ({ + '@timestamp': now.toISOString(), + actor: 'system', + action_type: 'notified', + rule_id: group.ruleId, + group_hash: 'irrelevant', + last_series_event_timestamp: now.toISOString(), + notification_group_id: group.id, + source: 'internal', + reason: `notified by policy ${group.policyId} with throttle interval`, + })), + ], + }); + + return { type: 'continue' }; + } +} + +function toAction({ + episode, + actionType, + now, + reason, +}: { + episode: AlertEpisode; + actionType: 'suppress' | 'fire' | 'notified'; + now: Date; + reason?: string; +}): AlertAction { + return { + '@timestamp': now.toISOString(), + group_hash: episode.group_hash, + last_series_event_timestamp: episode.last_event_timestamp, + actor: 'system', + action_type: actionType, + rule_id: episode.rule_id, + source: 'internal', + reason, + }; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/tokens.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/tokens.ts new file mode 100644 index 0000000000000..325390d37ed7b --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/steps/tokens.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ServiceIdentifier } from 'inversify'; +import type { DispatcherStep } from '../types'; + +/** + * Token for multi-injecting the ordered dispatcher execution steps. + * Binding order defines execution order. + */ +export const DispatcherExecutionStepsToken = Symbol.for( + 'alerting_v2.DispatcherExecutionSteps' +) as ServiceIdentifier; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/task_runner.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/task_runner.ts index 75d4082a53aee..ab55405a72dd7 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/task_runner.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/task_runner.ts @@ -7,7 +7,8 @@ import type { RunContext, RunResult } from '@kbn/task-manager-plugin/server/task'; import { inject, injectable } from 'inversify'; -import { DispatcherService, type DispatcherServiceContract } from './dispatcher'; +import { type DispatcherServiceContract } from './dispatcher'; +import { DispatcherServiceInternalToken } from './tokens'; import type { DispatcherExecutionParams, DispatcherExecutionResult, @@ -19,7 +20,8 @@ type TaskRunParams = Pick; @injectable() export class DispatcherTaskRunner { constructor( - @inject(DispatcherService) private readonly dispatcherService: DispatcherServiceContract + @inject(DispatcherServiceInternalToken) + private readonly dispatcherService: DispatcherServiceContract ) {} public async run({ taskInstance, abortController }: TaskRunParams): Promise { diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/tokens.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/tokens.ts new file mode 100644 index 0000000000000..612d8eb9e4a88 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/tokens.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ServiceIdentifier } from 'inversify'; +import type { DispatcherService } from './dispatcher'; + +/** + * DispatcherService singleton + */ +export const DispatcherServiceInternalToken = Symbol.for( + 'alerting_v2.DispatcherServiceInternal' +) as ServiceIdentifier; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/types.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/types.ts index 3efb1ab7a658b..ac04f7903f54b 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/types.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/dispatcher/types.ts @@ -6,11 +6,12 @@ */ export type RuleId = string; +export type NotificationPolicyId = string; +export type NotificationGroupId = string; -export interface Policy { +export interface NotificationPolicyDestination { + type: 'workflow'; id: string; - name: string; - // other policy fields as needed } export interface AlertEpisode { @@ -26,6 +27,9 @@ export interface AlertEpisodeSuppression { group_hash: string; episode_id: string | null; should_suppress: boolean; + last_ack_action?: string | null; + last_deactivate_action?: string | null; + last_snooze_action?: string | null; } export interface DispatcherExecutionParams { @@ -40,3 +44,78 @@ export interface DispatcherExecutionResult { export interface DispatcherTaskState { previousStartedAt?: string; } + +export interface Rule { + id: RuleId; + name: string; + description: string; + notificationPolicyIds: NotificationPolicyId[]; + enabled: boolean; + createdAt: string; + updatedAt: string; +} + +export interface NotificationPolicy { + id: NotificationPolicyId; + name: string; + /** KQL expression evaluated against the alert episode context. + * An empty matcher matches all episodes (catch-all). */ + matcher?: string; // e.g. 'data.severity == "critical" AND data.env != "dev"' + /** data.* fields used to group episodes into a single notification */ + groupBy: string[]; + /** Minimum interval between notifications for the same group */ + throttle?: { + interval?: string; // e.g. '1h', '30m', '5m' + }; + /** Target destinations to dispatch matched episodes to */ + destinations: NotificationPolicyDestination[]; +} + +export interface MatchedPair { + episode: AlertEpisode; + policy: NotificationPolicy; +} + +export interface NotificationGroup { + id: NotificationGroupId; + ruleId: RuleId; + policyId: NotificationPolicyId; + destinations: NotificationPolicyDestination[]; + groupKey: Record; + episodes: AlertEpisode[]; +} + +export interface LastNotifiedRecord { + notification_group_id: NotificationGroupId; + last_notified: string; +} + +export interface DispatcherPipelineInput { + readonly startedAt: Date; + readonly previousStartedAt: Date; +} + +export interface DispatcherPipelineState { + readonly input: DispatcherPipelineInput; + readonly episodes?: AlertEpisode[]; + readonly suppressions?: AlertEpisodeSuppression[]; + readonly dispatchable?: AlertEpisode[]; + readonly suppressed?: Array; + readonly rules?: Map; + readonly policies?: Map; + readonly matched?: MatchedPair[]; + readonly groups?: NotificationGroup[]; + readonly dispatch?: NotificationGroup[]; + readonly throttled?: NotificationGroup[]; +} + +export type DispatcherHaltReason = 'no_episodes' | 'no_actions'; + +export type DispatcherStepOutput = + | { type: 'continue'; data?: Partial> } + | { type: 'halt'; reason: DispatcherHaltReason }; + +export interface DispatcherStep { + readonly name: string; + execute(state: Readonly): Promise; +} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.test.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.test.ts index 11255d18161cb..a81e673e35895 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.test.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.test.ts @@ -455,7 +455,6 @@ describe('NotificationPolicyClient', () => { destinations: [{ type: 'workflow', id: 'updated-workflow' }], updatedBy: 'elastic_profile_uid', updatedAt: '2025-01-01T00:00:00.000Z', - // Preserves original createdBy and createdAt createdBy: 'creator_profile_uid', createdAt: '2024-12-01T00:00:00.000Z', }), diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.ts index c3f6b77be99e7..c4b6a6d7e29e4 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/notification_policy_client/notification_policy_client.ts @@ -12,12 +12,12 @@ import { updateNotificationPolicyDataSchema, } from '@kbn/alerting-v2-schemas'; import { SavedObjectsErrorHelpers } from '@kbn/core-saved-objects-server'; +import { stringifyZodError } from '@kbn/zod-helpers'; import { inject, injectable } from 'inversify'; import { omit } from 'lodash'; -import { stringifyZodError } from '@kbn/zod-helpers'; import { type NotificationPolicySavedObjectAttributes } from '../../saved_objects'; import type { NotificationPolicySavedObjectServiceContract } from '../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; -import { NotificationPolicySavedObjectService } from '../services/notification_policy_saved_object_service/notification_policy_saved_object_service'; +import { NotificationPolicySavedObjectServiceScopedToken } from '../services/notification_policy_saved_object_service/tokens'; import type { UserServiceContract } from '../services/user_service/user_service'; import { UserService } from '../services/user_service/user_service'; import type { CreateNotificationPolicyParams, UpdateNotificationPolicyParams } from './types'; @@ -25,7 +25,7 @@ import type { CreateNotificationPolicyParams, UpdateNotificationPolicyParams } f @injectable() export class NotificationPolicyClient { constructor( - @inject(NotificationPolicySavedObjectService) + @inject(NotificationPolicySavedObjectServiceScopedToken) private readonly notificationPolicySavedObjectService: NotificationPolicySavedObjectServiceContract, @inject(UserService) private readonly userService: UserServiceContract ) {} @@ -112,14 +112,12 @@ export class NotificationPolicyClient { const userProfileUid = await this.getUserProfileUid(); const now = new Date().toISOString(); - const existingNotificationPolicy = await this.getNotificationPolicy({ id: params.options.id }); - const existingAttrs: NotificationPolicySavedObjectAttributes = omit( - existingNotificationPolicy, - ['id', 'version'] - ); + const existingPolicy = await this.getNotificationPolicy({ + id: params.options.id, + }); const nextAttrs: NotificationPolicySavedObjectAttributes = { - ...existingAttrs, + ...omit(existingPolicy, ['id', 'version']), ...parsed.data, updatedBy: userProfileUid, updatedAt: now, diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rules_client/rules_client.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rules_client/rules_client.ts index 79f63fcb15580..aa25663fd14ed 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/rules_client/rules_client.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/rules_client/rules_client.ts @@ -5,22 +5,22 @@ * 2.0. */ -import { getSpaceIdFromPath } from '@kbn/spaces-utils'; import Boom from '@hapi/boom'; +import { createRuleDataSchema, updateRuleDataSchema } from '@kbn/alerting-v2-schemas'; +import { PluginStart } from '@kbn/core-di'; +import { CoreStart, Request } from '@kbn/core-di-server'; +import type { HttpServiceStart, KibanaRequest } from '@kbn/core-http-server'; import { SavedObjectsErrorHelpers } from '@kbn/core-saved-objects-server'; import type { KibanaRequest as CoreKibanaRequest } from '@kbn/core/server'; -import type { HttpServiceStart, KibanaRequest } from '@kbn/core-http-server'; +import { getSpaceIdFromPath } from '@kbn/spaces-utils'; import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; -import { inject, injectable } from 'inversify'; -import { PluginStart } from '@kbn/core-di'; -import { CoreStart, Request } from '@kbn/core-di-server'; import { stringifyZodError } from '@kbn/zod-helpers'; -import { createRuleDataSchema, updateRuleDataSchema } from '@kbn/alerting-v2-schemas'; +import { inject, injectable } from 'inversify'; import { type RuleSavedObjectAttributes } from '../../saved_objects'; import { ensureRuleExecutorTaskScheduled, getRuleExecutorTaskId } from '../rule_executor/schedule'; import type { RulesSavedObjectServiceContract } from '../services/rules_saved_object_service/rules_saved_object_service'; -import { RulesSavedObjectService } from '../services/rules_saved_object_service/rules_saved_object_service'; +import { RulesSavedObjectServiceScopedToken } from '../services/rules_saved_object_service/tokens'; import type { UserServiceContract } from '../services/user_service/user_service'; import { UserService } from '../services/user_service/user_service'; import type { @@ -44,7 +44,7 @@ export class RulesClient { constructor( @inject(Request) private readonly request: KibanaRequest, @inject(CoreStart('http')) private readonly http: HttpServiceStart, - @inject(RulesSavedObjectService) + @inject(RulesSavedObjectServiceScopedToken) private readonly rulesSavedObjectService: RulesSavedObjectServiceContract, @inject(PluginStart('taskManager')) private readonly taskManager: TaskManagerStartContract, @inject(UserService) private readonly userService: UserServiceContract diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service.ts index 5bf9e8f106153..6b36fe1d38d0c 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service.ts @@ -38,16 +38,16 @@ export interface NotificationPolicySavedObjectServiceContract { id: string, spaceId?: string ): Promise<{ id: string; attributes: NotificationPolicySavedObjectAttributes; version?: string }>; + bulkGetByIds( + ids: string[], + spaceId?: string + ): Promise; update(params: { id: string; attrs: NotificationPolicySavedObjectAttributes; version: string; }): Promise<{ id: string; version?: string }>; delete(params: { id: string }): Promise; - bulkGetByIds( - ids: string[], - spaceId?: string - ): Promise; } @injectable() @@ -139,10 +139,7 @@ export class NotificationPolicySavedObjectService return result.saved_objects.map((savedObject) => { if ('error' in savedObject && savedObject.error) { - return { - id: savedObject.id, - error: savedObject.error, - }; + return { id: savedObject.id, error: savedObject.error }; } return { diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/tokens.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/tokens.ts new file mode 100644 index 0000000000000..834c20b521404 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/notification_policy_saved_object_service/tokens.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ServiceIdentifier } from 'inversify'; +import type { NotificationPolicySavedObjectServiceContract } from './notification_policy_saved_object_service'; + +/** + * NotificationPolicySavedObjectService scoped to the current request + */ +export const NotificationPolicySavedObjectServiceScopedToken = Symbol.for( + 'alerting_v2.NotificationPolicySavedObjectServiceScoped' +) as ServiceIdentifier; + +/** + * NotificationPolicySavedObjectService singleton (internal user, no request scope) + */ +export const NotificationPolicySavedObjectServiceInternalToken = Symbol.for( + 'alerting_v2.NotificationPolicySavedObjectServiceInternal' +) as ServiceIdentifier; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/rules_saved_object_service/tokens.ts b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/rules_saved_object_service/tokens.ts new file mode 100644 index 0000000000000..4c984cc3c0458 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/lib/services/rules_saved_object_service/tokens.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ServiceIdentifier } from 'inversify'; +import type { RulesSavedObjectServiceContract } from './rules_saved_object_service'; + +/** + * RulesSavedObjectService scoped to the current request + */ +export const RulesSavedObjectServiceScopedToken = Symbol.for( + 'alerting_v2.RulesSavedObjectServiceScoped' +) as ServiceIdentifier; + +/** + * RulesSavedObjectService singleton (internal user, no request scope) + */ +export const RulesSavedObjectServiceInternalToken = Symbol.for( + 'alerting_v2.RulesSavedObjectServiceInternal' +) as ServiceIdentifier; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/resources/alert_actions.ts b/x-pack/platform/plugins/shared/alerting_v2/server/resources/alert_actions.ts index a9db44424bdba..e89a8d7246c73 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/resources/alert_actions.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/resources/alert_actions.ts @@ -40,7 +40,9 @@ const mappings: MappingsDefinition = { group_hash: { type: 'keyword' }, episode_id: { type: 'keyword' }, rule_id: { type: 'keyword' }, + notification_group_id: { type: 'keyword' }, source: { type: 'keyword' }, + reason: { type: 'text' }, }, }; @@ -50,9 +52,10 @@ export const alertActionSchema = z.object({ last_series_event_timestamp: z.string(), expiry: z.string().optional(), actor: z.string().nullable(), - action_type: z.string(), // "fire" | "suppress" + action_type: z.string(), episode_id: z.string().optional(), rule_id: z.string(), + notification_group_id: z.string().optional(), source: z.string().optional(), tags: z.array(z.string()).optional(), reason: z.string().optional(), diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/routes/run_dispatch_route.ts b/x-pack/platform/plugins/shared/alerting_v2/server/routes/run_dispatch_route.ts deleted file mode 100644 index ca841b44e7a5d..0000000000000 --- a/x-pack/platform/plugins/shared/alerting_v2/server/routes/run_dispatch_route.ts +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import Boom from '@hapi/boom'; -import { schema, type TypeOf } from '@kbn/config-schema'; -import type { KibanaRequest, KibanaResponseFactory, RouteSecurity } from '@kbn/core-http-server'; -import type { RouteHandler } from '@kbn/core-di-server'; -import { Request, Response } from '@kbn/core-di-server'; -import { inject, injectable } from 'inversify'; -import { DispatcherService, type DispatcherServiceContract } from '../lib/dispatcher/dispatcher'; - -const runDispatchBodySchema = schema.object({ - previousStartedAt: schema.maybe(schema.string({ minLength: 1 })), -}); - -type RunDispatchBody = TypeOf; - -@injectable() -export class RunDispatchRoute implements RouteHandler { - static method = 'post' as const; - static path = '/internal/alerting/v2/dispatcher/_run'; - static security: RouteSecurity = { - authz: { - enabled: false, - reason: 'This is an internal testing route', - }, - }; - static options = { access: 'internal' } as const; - static validate = { - request: { - body: runDispatchBodySchema, - }, - } as const; - - constructor( - @inject(Request) - private readonly request: KibanaRequest, - @inject(Response) private readonly response: KibanaResponseFactory, - @inject(DispatcherService) private readonly dispatcherService: DispatcherServiceContract - ) {} - - async handle() { - try { - const previousStartedAt = this.request.body.previousStartedAt - ? new Date(this.request.body.previousStartedAt) - : undefined; - - const result = await this.dispatcherService.run({ previousStartedAt }); - - return this.response.ok({ body: { startedAt: result.startedAt.toISOString() } }); - } catch (e) { - const boom = Boom.isBoom(e) ? e : Boom.boomify(e); - return this.response.customError({ - statusCode: boom.output.statusCode, - body: boom.output.payload, - }); - } - } -} diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_dispatcher_executor.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_dispatcher_executor.ts new file mode 100644 index 0000000000000..e0752cbdd50a9 --- /dev/null +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_dispatcher_executor.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ContainerModuleLoadOptions } from 'inversify'; +import { DispatcherPipeline } from '../lib/dispatcher/execution_pipeline'; +import { DispatcherExecutionStepsToken } from '../lib/dispatcher/steps/tokens'; +import { + FetchEpisodesStep, + FetchSuppressionsStep, + ApplySuppressionStep, + FetchRulesStep, + FetchPoliciesStep, + EvaluateMatchersStep, + BuildGroupsStep, + ApplyThrottlingStep, + DispatchStep, + StoreActionsStep, +} from '../lib/dispatcher/steps'; + +export const bindDispatcherExecutionServices = ({ bind }: ContainerModuleLoadOptions) => { + /** + * Dispatcher execution steps via multi-injection. + * Binding order defines execution order. + */ + bind(DispatcherExecutionStepsToken).to(FetchEpisodesStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(FetchSuppressionsStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(ApplySuppressionStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(FetchRulesStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(FetchPoliciesStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(EvaluateMatchersStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(BuildGroupsStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(ApplyThrottlingStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(DispatchStep).inSingletonScope(); + bind(DispatcherExecutionStepsToken).to(StoreActionsStep).inSingletonScope(); + + bind(DispatcherPipeline).toSelf().inSingletonScope(); +}; diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_on_setup.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_on_setup.ts index c470a793acffe..2eb090d988573 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_on_setup.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_on_setup.ts @@ -5,13 +5,13 @@ * 2.0. */ -import type { ContainerModuleLoadOptions } from 'inversify'; import { Logger, OnSetup, PluginSetup } from '@kbn/core-di'; import { CoreSetup, PluginInitializer } from '@kbn/core-di-server'; +import type { ContainerModuleLoadOptions } from 'inversify'; +import type { PluginConfig } from '../config'; import { registerFeaturePrivileges } from '../lib/security/privileges'; -import { registerSavedObjects } from '../saved_objects'; import { TaskDefinition } from '../lib/services/task_run_scope_service/create_task_runner'; -import type { PluginConfig } from '../config'; +import { registerSavedObjects } from '../saved_objects'; export function bindOnSetup({ bind }: ContainerModuleLoadOptions) { bind(OnSetup).toConstantValue((container) => { diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_routes.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_routes.ts index 2f5aac72cfe83..b7fa405258e9f 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_routes.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_routes.ts @@ -12,7 +12,6 @@ import { UpdateRuleRoute } from '../routes/update_rule_route'; import { GetRulesRoute } from '../routes/get_rules_route'; import { GetRuleRoute } from '../routes/get_rule_route'; import { DeleteRuleRoute } from '../routes/delete_rule_route'; -import { RunDispatchRoute } from '../routes/run_dispatch_route'; import { CreateAlertActionRoute } from '../routes/create_alert_action_route'; import { BulkCreateAlertActionRoute } from '../routes/bulk_create_alert_action_route'; import { CreateNotificationPolicyRoute } from '../routes/notification_policies/create_notification_policy_route'; @@ -26,7 +25,6 @@ export function bindRoutes({ bind }: ContainerModuleLoadOptions) { bind(Route).toConstantValue(GetRulesRoute); bind(Route).toConstantValue(GetRuleRoute); bind(Route).toConstantValue(DeleteRuleRoute); - bind(Route).toConstantValue(RunDispatchRoute); bind(Route).toConstantValue(CreateAlertActionRoute); bind(Route).toConstantValue(BulkCreateAlertActionRoute); bind(Route).toConstantValue(CreateNotificationPolicyRoute); diff --git a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_services.ts b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_services.ts index 236fc387a62b6..0add2806323db 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_services.ts +++ b/x-pack/platform/plugins/shared/alerting_v2/server/setup/bind_services.ts @@ -5,39 +5,51 @@ * 2.0. */ +import { PluginStart } from '@kbn/core-di'; import { CoreStart, Request } from '@kbn/core-di-server'; import type { ContainerModuleLoadOptions } from 'inversify'; import { AlertActionsClient } from '../lib/alert_actions_client'; +import { DirectorService } from '../lib/director/director'; +import { BasicTransitionStrategy } from '../lib/director/strategies/basic_strategy'; +import { CountTimeframeStrategy } from '../lib/director/strategies/count_timeframe_strategy'; +import { TransitionStrategyFactory } from '../lib/director/strategies/strategy_resolver'; +import { TransitionStrategyToken } from '../lib/director/strategies/types'; import { DispatcherService } from '../lib/dispatcher/dispatcher'; -import { RulesClient } from '../lib/rules_client'; +import { DispatcherServiceInternalToken } from '../lib/dispatcher/tokens'; +import { + NotificationPolicySavedObjectServiceInternalToken, + NotificationPolicySavedObjectServiceScopedToken, +} from '../lib/services/notification_policy_saved_object_service/tokens'; +import { + RulesSavedObjectServiceInternalToken, + RulesSavedObjectServiceScopedToken, +} from '../lib/services/rules_saved_object_service/tokens'; import { NotificationPolicyClient } from '../lib/notification_policy_client'; +import { RulesClient } from '../lib/rules_client'; +import { EsServiceInternalToken, EsServiceScopedToken } from '../lib/services/es_service/tokens'; import { LoggerService, LoggerServiceToken } from '../lib/services/logger_service/logger_service'; +import { NotificationPolicySavedObjectService } from '../lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service'; import { QueryService } from '../lib/services/query_service/query_service'; import { QueryServiceInternalToken, QueryServiceScopedToken, } from '../lib/services/query_service/tokens'; +import { ResourceManager } from '../lib/services/resource_service/resource_manager'; import { AlertingRetryService } from '../lib/services/retry_service'; +import { RetryServiceToken } from '../lib/services/retry_service/tokens'; import { RulesSavedObjectService } from '../lib/services/rules_saved_object_service/rules_saved_object_service'; -import { NotificationPolicySavedObjectService } from '../lib/services/notification_policy_saved_object_service/notification_policy_saved_object_service'; import { StorageService } from '../lib/services/storage_service/storage_service'; import { StorageServiceInternalToken, StorageServiceScopedToken, } from '../lib/services/storage_service/tokens'; -import { RetryServiceToken } from '../lib/services/retry_service/tokens'; -import { EsServiceInternalToken, EsServiceScopedToken } from '../lib/services/es_service/tokens'; -import { DirectorService } from '../lib/director/director'; -import { BasicTransitionStrategy } from '../lib/director/strategies/basic_strategy'; -import { CountTimeframeStrategy } from '../lib/director/strategies/count_timeframe_strategy'; -import { ResourceManager } from '../lib/services/resource_service/resource_manager'; -import { UserService } from '../lib/services/user_service/user_service'; -import { TransitionStrategyToken } from '../lib/director/strategies/types'; -import { TransitionStrategyFactory } from '../lib/director/strategies/strategy_resolver'; import { createTaskRunnerFactory, TaskRunnerFactoryToken, } from '../lib/services/task_run_scope_service/create_task_runner'; +import { UserService } from '../lib/services/user_service/user_service'; +import { NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, RULE_SAVED_OBJECT_TYPE } from '../saved_objects'; +import type { AlertingServerStartDependencies } from '../types'; export function bindServices({ bind }: ContainerModuleLoadOptions) { bind(AlertActionsClient).toSelf().inRequestScope(); @@ -73,7 +85,30 @@ export function bindServices({ bind }: ContainerModuleLoadOptions) { ); bind(RulesSavedObjectService).toSelf().inRequestScope(); + bind(RulesSavedObjectServiceScopedToken).toService(RulesSavedObjectService); + bind(RulesSavedObjectServiceInternalToken) + .toDynamicValue(({ get }) => { + const savedObjects = get(CoreStart('savedObjects')); + const spaces = get(PluginStart('spaces')); + const internalClient = savedObjects.createInternalRepository([RULE_SAVED_OBJECT_TYPE]); + return new RulesSavedObjectService(() => internalClient, spaces); + }) + .inSingletonScope(); + bind(NotificationPolicySavedObjectService).toSelf().inRequestScope(); + bind(NotificationPolicySavedObjectServiceScopedToken).toService( + NotificationPolicySavedObjectService + ); + bind(NotificationPolicySavedObjectServiceInternalToken) + .toDynamicValue(({ get }) => { + const savedObjects = get(CoreStart('savedObjects')); + const spaces = get(PluginStart('spaces')); + const internalClient = savedObjects.createInternalRepository([ + NOTIFICATION_POLICY_SAVED_OBJECT_TYPE, + ]); + return new NotificationPolicySavedObjectService(() => internalClient, spaces); + }) + .inSingletonScope(); bind(QueryServiceScopedToken) .toDynamicValue(({ get }) => { @@ -108,6 +143,7 @@ export function bindServices({ bind }: ContainerModuleLoadOptions) { .inSingletonScope(); bind(DispatcherService).toSelf().inSingletonScope(); + bind(DispatcherServiceInternalToken).toService(DispatcherService); bind(DirectorService).toSelf().inSingletonScope(); bind(TransitionStrategyFactory).toSelf().inSingletonScope(); diff --git a/x-pack/platform/plugins/shared/alerting_v2/tsconfig.json b/x-pack/platform/plugins/shared/alerting_v2/tsconfig.json index d845d851f2af0..49ae1c8252229 100644 --- a/x-pack/platform/plugins/shared/alerting_v2/tsconfig.json +++ b/x-pack/platform/plugins/shared/alerting_v2/tsconfig.json @@ -55,7 +55,8 @@ "@kbn/core-user-profile-server-mocks", "@kbn/core-user-profile-server", "@kbn/es-mappings", - "@kbn/std" + "@kbn/std", + "@kbn/eval-kql" ], "exclude": ["target/**/*"] }