Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
* 2.0.
*/

import type { ElasticsearchClient } from '@kbn/core/server';
import { inject, injectable } from 'inversify';
import moment from 'moment';
import { ALERT_ACTIONS_DATA_STREAM, type AlertAction } from '../../resources/alert_actions';
import { EsServiceScopedToken } from '../services/es_service/tokens';
import {
LoggerServiceToken,
type LoggerServiceContract,
} from '../services/logger_service/logger_service';
import { queryResponseToRecords } from '../services/query_service/query_response_to_records';
import type { QueryServiceContract } from '../services/query_service/query_service';
import { QueryServiceInternalToken } from '../services/query_service/tokens';
import type { StorageServiceContract } from '../services/storage_service/storage_service';
import { StorageServiceInternalToken } from '../services/storage_service/tokens';
import { LOOKBACK_WINDOW_MINUTES } from './constants';
Expand All @@ -28,7 +28,7 @@ export interface DispatcherServiceContract {
@injectable()
export class DispatcherService implements DispatcherServiceContract {
constructor(
@inject(EsServiceScopedToken) private readonly esClient: ElasticsearchClient,
@inject(QueryServiceInternalToken) private readonly queryService: QueryServiceContract,
@inject(LoggerServiceToken) private readonly logger: LoggerServiceContract,
@inject(StorageServiceInternalToken) private readonly storageService: StorageServiceContract
) {}
Expand All @@ -37,8 +37,7 @@ export class DispatcherService implements DispatcherServiceContract {
const startedAt = new Date();
const { query } = getDispatchableAlertEventsQuery();

// TODO: Use QueryService as soon as it uses esClient instead of data plugin client
const result = await this.esClient.esql.query({
const result = await this.queryService.executeQuery({
query,
filter: {
range: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import type { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
import type { ElasticsearchClient } from '@kbn/core/server';
import { ALERT_ACTIONS_DATA_STREAM } from '../../../resources/alert_actions';
import { ALERT_EVENTS_DATA_STREAM } from '../../../resources/alert_events';
import { ALERT_EVENTS_DATA_STREAM, type AlertEvent } from '../../../resources/alert_events';
import type { LoggerServiceContract } from '../../services/logger_service/logger_service';
import { createLoggerService } from '../../services/logger_service/logger_service.mock';
import {
QueryService,
type QueryServiceContract,
} from '../../services/query_service/query_service';
import {
StorageService,
type StorageServiceContract,
Expand All @@ -25,55 +29,70 @@ import { setupTestServers } from './setup_test_servers';
* Episode 2: active -> inactive
* Episode 3: active
*/
const ALERT_EVENTS_TEST_DATA = [
const ALERT_EVENTS_TEST_DATA: AlertEvent[] = [
{
'@timestamp': '2026-01-22T07:50:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-3',
status: 'breach',
episode_status: 'active',
episode: {
id: 'rule-1-series-1-episode-3',
status: 'active',
},
data: {},
status: 'breached',
source: 'internal',
},
{
'@timestamp': '2026-01-22T07:25:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-2',
episode: {
id: 'rule-1-series-1-episode-2',
status: 'inactive',
},
data: {},
status: 'recovered',
episode_status: 'inactive',
source: 'internal',
},
{
'@timestamp': '2026-01-22T07:20:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-2',
episode: {
id: 'rule-1-series-1-episode-2',
status: 'active',
},
data: {},
status: 'breached',
episode_status: 'active',
source: 'internal',
},
{
'@timestamp': '2026-01-22T07:15:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-1',
episode: {
id: 'rule-1-series-1-episode-1',
status: 'inactive',
},
data: {},
status: 'recovered',
episode_status: 'inactive',
source: 'internal',
},
{
'@timestamp': '2026-01-22T07:10:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-1',
episode: {
id: 'rule-1-series-1-episode-1',
status: 'active',
},
data: {},
status: 'breached',
episode_status: 'active',
source: 'internal',
},
];
Expand All @@ -83,6 +102,7 @@ describe('DispatcherService integration tests', () => {
let kibanaServer: TestKibanaUtils;
let esClient: ElasticsearchClient;
let dispatcherService: DispatcherServiceContract;
let queryService: QueryServiceContract;
let storageService: StorageServiceContract;
let mockLoggerService: LoggerServiceContract;

Expand All @@ -109,8 +129,9 @@ describe('DispatcherService integration tests', () => {

mockLoggerService = createLoggerService().loggerService;

queryService = new QueryService(esClient, mockLoggerService);
storageService = new StorageService(esClient, mockLoggerService);
dispatcherService = new DispatcherService(esClient, mockLoggerService, storageService);
dispatcherService = new DispatcherService(queryService, mockLoggerService, storageService);
});

describe('when there are no alert events', () => {
Expand Down Expand Up @@ -184,11 +205,14 @@ describe('DispatcherService integration tests', () => {
{
'@timestamp': '2026-01-22T07:55:00.000Z',
type: 'alert',
rule: { id: 'rule-1' },
rule: { id: 'rule-1', version: 1 },
group_hash: 'rule-1-series-1',
episode_id: 'rule-1-series-1-episode-3',
episode: {
id: 'rule-1-series-1-episode-3',
status: 'inactive',
},
data: {},
status: 'recovered',
episode_status: 'inactive',
source: 'internal',
},
]);
Expand Down Expand Up @@ -245,10 +269,7 @@ async function cleanupDataStreams(esClient: ElasticsearchClient): Promise<void>
}
}

async function seedAlertEvents(
esClient: ElasticsearchClient,
events: Array<Record<string, any>>
): Promise<void> {
async function seedAlertEvents(esClient: ElasticsearchClient, events: AlertEvent[]): Promise<void> {
const operations = events.flatMap((doc) => [
{ create: { _index: ALERT_EVENTS_DATA_STREAM } },
doc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ export const getDispatchableAlertEventsQuery = (): EsqlRequest => {

return esql`FROM ${ALERT_EVENTS_DATA_STREAM},${ALERT_ACTIONS_DATA_STREAM} METADATA _index
| WHERE (_index LIKE ${ALERT_ACTIONS_BACKING_INDEX}) OR (_index LIKE ${ALERT_EVENTS_BACKING_INDEX} and type == ${alertEventType})
| EVAL rule_id = COALESCE(rule.id, rule_id)
| EVAL
rule_id = COALESCE(rule.id, rule_id),
episode_id = COALESCE(episode.id, episode_id),
episode_status = episode.status
| DROP episode.id, rule.id, episode.status
| INLINE STATS last_fired = max(last_series_event_timestamp) WHERE _index LIKE ${ALERT_ACTIONS_BACKING_INDEX} AND action_type == "fire-event" BY rule_id, group_hash
| WHERE (last_fired IS NULL OR last_fired < @timestamp) or (_index LIKE ${ALERT_ACTIONS_BACKING_INDEX})
| STATS
Expand Down