Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/platform/packages/shared/kbn-es-mappings/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type SupportedMappingPropertyType = AllMappingPropertyType &
| 'date_nanos'
| 'double'
| 'long'
| 'flattened'
| 'object'
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import { elasticsearchServiceMock } from '@kbn/core-elasticsearch-server-mocks';
import type { ResourceDefinition } from '../../../resources/types';
import { ResourceInitializer } from './resource_initializer';
import type { DeeplyMockedApi } from '@kbn/core-elasticsearch-client-server-mocks';
import { LoggerService } from '../logger_service/logger_service';
import { loggerMock } from '@kbn/logging-mocks';

describe('ResourceInitializer', () => {
let esClient: DeeplyMockedApi<ElasticsearchClient>;
let mockLogger: jest.Mocked<Logger>;
let mockLoggerService: LoggerService;

const resourceDefinition: ResourceDefinition = {
key: 'data_stream:.alerts-test',
dataStreamName: '.alerts-test',
version: 1,
mappings: {
dynamic: false,
properties: {
Expand Down Expand Up @@ -51,50 +50,52 @@ describe('ResourceInitializer', () => {
beforeEach(() => {
jest.clearAllMocks();
mockLogger = loggerMock.create();
mockLoggerService = new LoggerService(mockLogger);
// data streams uses the esClient internally
esClient = elasticsearchServiceMock.createElasticsearchClient();

esClient.ilm.putLifecycle.mockResolvedValue({ acknowledged: true });
esClient.cluster.putComponentTemplate.mockResolvedValue({ acknowledged: true });
esClient.indices.getDataStream.mockResolvedValue({ data_streams: [] });
esClient.indices.getIndexTemplate.mockResolvedValue({ index_templates: [] });
esClient.indices.putIndexTemplate.mockResolvedValue({ acknowledged: true });
esClient.indices.createDataStream.mockResolvedValue({ acknowledged: true });
});

it('installs ILM policy, component template, index template, then creates the data stream', async () => {
const initializer = new ResourceInitializer(mockLoggerService, esClient, resourceDefinition);
it('installs ILM policy, index template, then creates the data stream', async () => {
const initializer = new ResourceInitializer(mockLogger, esClient, resourceDefinition);

await initializer.initialize();

expect(esClient.ilm.putLifecycle).toHaveBeenCalled();
expect(esClient.cluster.putComponentTemplate).toHaveBeenCalled();
expect(esClient.indices.putIndexTemplate).toHaveBeenCalled();
expect(esClient.indices.createDataStream).toHaveBeenCalled();

const componentOrder = esClient.cluster.putComponentTemplate.mock.invocationCallOrder[0];
const indexOrder = esClient.indices.putIndexTemplate.mock.invocationCallOrder[0];

// Order matters: the index template references the component template.
expect(componentOrder).toBeLessThan(indexOrder);
expect(esClient.ilm.putLifecycle).toHaveBeenCalledWith({
name: resourceDefinition.ilmPolicy.name,
policy: resourceDefinition.ilmPolicy.policy,
});
expect(esClient.indices.putIndexTemplate).toHaveBeenCalledWith(
expect.objectContaining({
name: resourceDefinition.dataStreamName,
})
);
expect(esClient.indices.createDataStream).toHaveBeenCalledWith({
name: resourceDefinition.dataStreamName,
});
});

it('ignores 409 errors when creating the data stream', async () => {
esClient.indices.createDataStream.mockRejectedValueOnce(
new errors.ResponseError({ statusCode: 409 } as DiagnosticResult)
);

const initializer = new ResourceInitializer(mockLoggerService, esClient, resourceDefinition);
const initializer = new ResourceInitializer(mockLogger, esClient, resourceDefinition);
await expect(initializer.initialize()).resolves.toBeUndefined();
});

it('ignores 400 errors when creating the data stream', async () => {
it('ignores 400 errors of type resource_already_exists_exception when creating the data stream', async () => {
esClient.indices.createDataStream.mockRejectedValueOnce(
new errors.ResponseError({
statusCode: 400,
body: { error: { type: 'resource_already_exists_exception' } },
} as DiagnosticResult)
);

const initializer = new ResourceInitializer(mockLoggerService, esClient, resourceDefinition);
const initializer = new ResourceInitializer(mockLogger, esClient, resourceDefinition);
await expect(initializer.initialize()).resolves.toBeUndefined();
});

Expand All @@ -105,7 +106,7 @@ describe('ResourceInitializer', () => {
} as DiagnosticResult)
);

const initializer = new ResourceInitializer(mockLoggerService, esClient, resourceDefinition);
const initializer = new ResourceInitializer(mockLogger, esClient, resourceDefinition);
await expect(initializer.initialize()).rejects.toThrow();
});

Expand All @@ -116,7 +117,7 @@ describe('ResourceInitializer', () => {
} as DiagnosticResult)
);

const initializer = new ResourceInitializer(mockLoggerService, esClient, resourceDefinition);
const initializer = new ResourceInitializer(mockLogger, esClient, resourceDefinition);
await expect(initializer.initialize()).rejects.toThrow();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
*/

import type { ElasticsearchClient } from '@kbn/core/server';
import type {
ClusterPutComponentTemplateRequest,
IndicesPutIndexTemplateRequest,
} from '@elastic/elasticsearch/lib/api/types';
import { isResponseError } from '@kbn/es-errors';
import { DataStreamClient, type DataStreamDefinition } from '@kbn/data-streams';
import { Logger as LoggerToken } from '@kbn/core-di';
import type { Logger } from '@kbn/logging';
import { inject, injectable } from 'inversify';
import { isResponseError } from '@kbn/es-errors';
import type { ResourceDefinition } from '../../../resources/types';
import { EsServiceInternalToken } from '../es_service/tokens';
import type { LoggerServiceContract } from '../logger_service/logger_service';
import { LoggerServiceToken } from '../logger_service/logger_service';

export interface IResourceInitializer {
initialize(): Promise<void>;
Expand All @@ -31,7 +28,7 @@ const TOTAL_FIELDS_LIMIT = 2500;
@injectable()
export class ResourceInitializer implements IResourceInitializer {
constructor(
@inject(LoggerServiceToken) private readonly logger: LoggerServiceContract,
@inject(LoggerToken) private readonly logger: Logger,
@inject(EsServiceInternalToken) private readonly esClient: ElasticsearchClient,
private readonly resourceDefinition: ResourceDefinition
) {}
Expand All @@ -42,68 +39,43 @@ export class ResourceInitializer implements IResourceInitializer {
policy: this.resourceDefinition.ilmPolicy.policy,
});

const componentTemplateName = `${this.resourceDefinition.dataStreamName}-schema@component`;
const indexTemplateName = `${this.resourceDefinition.dataStreamName}-schema@index-template`;

const componentTemplate: ClusterPutComponentTemplateRequest = {
name: componentTemplateName,
const dataStreamDefinition: DataStreamDefinition<typeof this.resourceDefinition.mappings> = {
name: this.resourceDefinition.dataStreamName,
hidden: true,
version: this.resourceDefinition.version,
template: {
aliases: {},
priority: 500,
mappings: this.resourceDefinition.mappings,
},
_meta: {
managed: true,
description: `${this.resourceDefinition.dataStreamName} schema component template`,
},
};

const indexTemplate: IndicesPutIndexTemplateRequest = {
name: indexTemplateName,
index_patterns: [this.resourceDefinition.dataStreamName],
data_stream: { hidden: true },
composed_of: [componentTemplateName],
priority: 500,
template: {
settings: {
'index.lifecycle.name': this.resourceDefinition.ilmPolicy.name,
'index.mapping.total_fields.limit': TOTAL_FIELDS_LIMIT,
'index.mapping.total_fields.ignore_dynamic_beyond_limit': true,
},
},
_meta: {
managed: true,
description: `${this.resourceDefinition.dataStreamName} index template`,
_meta: {
managed: true,
description: `${this.resourceDefinition.dataStreamName} index template`,
},
},
};

await this.esClient.cluster.putComponentTemplate(componentTemplate);
await this.esClient.indices.putIndexTemplate(indexTemplate);

try {
await this.esClient.indices.createDataStream({
name: this.resourceDefinition.dataStreamName,
await DataStreamClient.initialize({
logger: this.logger,
dataStream: dataStreamDefinition,
elasticsearchClient: this.esClient,
});
} catch (error) {
if (!isResponseError(error)) {
throw error;
}

if (isResourceAlreadyExistsException(error)) {
this.logger.debug({
message: `Data stream already exists: ${this.resourceDefinition.dataStreamName}`,
});

if (error.statusCode === 409) {
this.logger.debug(`Data stream already exists: ${this.resourceDefinition.dataStreamName}.`);
return;
}

throw error;
}
}
}

const isResourceAlreadyExistsException = (error: unknown): boolean => {
return (
isResponseError(error) &&
((error.statusCode === 400 && error.body?.error.type === 'resource_already_exists_exception') ||
error.statusCode === 409)
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
* 2.0.
*/

import type { estypes } from '@elastic/elasticsearch';
import type { IlmPolicy } from '@elastic/elasticsearch/lib/api/types';
import type { MappingsDefinition } from '@kbn/es-mappings';
import { z } from '@kbn/zod';
import type { ResourceDefinition } from './types';

export const ALERT_ACTIONS_DATA_STREAM = '.alerts-actions';
export const ALERT_ACTIONS_DATA_STREAM_VERSION = 1;
export const ALERT_ACTIONS_BACKING_INDEX = '.ds-.alerts-actions-*';
export const ALERT_ACTIONS_ILM_POLICY_NAME = '.alerts-actions-ilm-policy';

Expand All @@ -28,7 +29,7 @@ export const ALERT_ACTIONS_ILM_POLICY: IlmPolicy = {
},
};

const mappings: estypes.MappingTypeMapping = {
const mappings: MappingsDefinition = {
dynamic: false,
properties: {
'@timestamp': { type: 'date' },
Expand Down Expand Up @@ -62,6 +63,7 @@ export type AlertAction = z.infer<typeof alertActionSchema>;
export const getAlertActionsResourceDefinition = (): ResourceDefinition => ({
key: `data_stream:${ALERT_ACTIONS_DATA_STREAM}`,
dataStreamName: ALERT_ACTIONS_DATA_STREAM,
version: ALERT_ACTIONS_DATA_STREAM_VERSION,
mappings,
ilmPolicy: { name: ALERT_ACTIONS_ILM_POLICY_NAME, policy: ALERT_ACTIONS_ILM_POLICY },
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
* 2.0.
*/

import type { estypes } from '@elastic/elasticsearch';
import type { IlmPolicy } from '@elastic/elasticsearch/lib/api/types';
import type { MappingsDefinition } from '@kbn/es-mappings';
import { z } from '@kbn/zod';
import type { ResourceDefinition } from './types';

export const ALERT_EVENTS_DATA_STREAM = '.alerts-events';
export const ALERT_EVENTS_DATA_STREAM_VERSION = 1;
export const ALERT_EVENTS_BACKING_INDEX = '.ds-.alerts-events-*';
export const ALERT_EVENTS_ILM_POLICY_NAME = '.alerts-events-ilm-policy';

Expand All @@ -28,13 +29,14 @@ export const ALERT_EVENTS_ILM_POLICY: IlmPolicy = {
},
};

const mappings: estypes.MappingTypeMapping = {
const mappings: MappingsDefinition = {
dynamic: false,
properties: {
// Document '_id' is used as the unique alert event identifier
'@timestamp': { type: 'date' },
scheduled_timestamp: { type: 'date' },
rule: {
type: 'object',
properties: {
id: { type: 'keyword' },
version: { type: 'long' },
Expand All @@ -46,6 +48,7 @@ const mappings: estypes.MappingTypeMapping = {
source: { type: 'keyword' },
type: { type: 'keyword' }, // signal | alert
episode: {
type: 'object',
properties: {
id: { type: 'keyword' },
status: { type: 'keyword' }, // inactive | pending | active | recovering
Expand Down Expand Up @@ -90,6 +93,7 @@ export type AlertEpisodeStatus = z.infer<typeof alertEpisodeStatusSchema>;
export const getAlertEventsResourceDefinition = (): ResourceDefinition => ({
key: `data_stream:${ALERT_EVENTS_DATA_STREAM}`,
dataStreamName: ALERT_EVENTS_DATA_STREAM,
version: ALERT_EVENTS_DATA_STREAM_VERSION,
mappings,
ilmPolicy: { name: ALERT_EVENTS_ILM_POLICY_NAME, policy: ALERT_EVENTS_ILM_POLICY },
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import type { ElasticsearchClient } from '@kbn/core/server';
import type { LoggerServiceContract } from '../lib/services/logger_service/logger_service';
import type { Logger } from '@kbn/logging';
import { ResourceInitializer } from '../lib/services/resource_service/resource_initializer';
import type { ResourceManagerContract } from '../lib/services/resource_service/resource_manager';
import { getAlertActionsResourceDefinition } from './alert_actions';
Expand All @@ -16,7 +16,7 @@ import type { ResourceDefinition } from './types';
export interface RegisterResourcesOptions {
resourceManager: ResourceManagerContract;
esClient: ElasticsearchClient;
logger: LoggerServiceContract;
logger: Logger;
}

export function initializeResources({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
* 2.0.
*/

import type { IlmPolicy, MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import type { IlmPolicy } from '@elastic/elasticsearch/lib/api/types';
import type { MappingsDefinition } from '@kbn/es-mappings';

export interface ResourceDefinition {
key: string;
dataStreamName: string;
mappings: MappingTypeMapping;
version: number;
mappings: MappingsDefinition;
ilmPolicy: {
name: string;
policy: IlmPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* 2.0.
*/

import { OnStart, PluginStart } from '@kbn/core-di';
import { Logger, OnStart, PluginStart } from '@kbn/core-di';
import type { ContainerModuleLoadOptions } from 'inversify';
import { EsServiceInternalToken } from '../lib/services/es_service/tokens';
import { LoggerServiceToken } from '../lib/services/logger_service/logger_service';
import { ResourceManager } from '../lib/services/resource_service/resource_manager';
import { initializeResources } from '../resources/register_resources';
import type { AlertingServerStartDependencies } from '../types';
Expand All @@ -17,7 +16,7 @@ import { scheduleDispatcherTask } from '../lib/dispatcher/schedule_task';
export function bindOnStart({ bind }: ContainerModuleLoadOptions) {
bind(OnStart).toConstantValue(async (container) => {
const resourceManager = container.get(ResourceManager);
const logger = container.get(LoggerServiceToken);
const logger = container.get(Logger);
Copy link
Copy Markdown
Member

@cnasikas cnasikas Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach would be to keep assing our logging service (LoggerService) but expose a method from called getBaseLogger and pass it to the data stream client. This way we always have one logger service. It seems more robust. Wdyt?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk, it feels like extra steps to achieve the same. If I am thinking about adding the BaseLogger to my class i'd rather go directly there 🤔

const esClient = container.get(EsServiceInternalToken);
const taskManager = container.get(
PluginStart<AlertingServerStartDependencies['taskManager']>('taskManager')
Expand All @@ -30,10 +29,11 @@ export function bindOnStart({ bind }: ContainerModuleLoadOptions) {
});

scheduleDispatcherTask({ taskManager }).catch((error) => {
logger.error({
error,
code: 'DISPATCHER_TASK_SCHEDULE_FAILURE',
type: 'DispatcherTask',
logger.error(error as Error, {
error: {
code: 'DISPATCHER_TASK_SCHEDULE_FAILURE',
type: 'DispatcherTask',
},
});
});
});
Expand Down
Loading