diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/constants.ts new file mode 100644 index 0000000000000..943c3bd5bb88c --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/constants.ts @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; + +import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; + +export const CAI_ACTIVITY_INDEX_NAME = '.internal.cases-activity'; + +export const CAI_ACTIVITY_INDEX_ALIAS = '.cases-activity'; + +export const CAI_ACTIVITY_INDEX_VERSION = 1; + +export const CAI_ACTIVITY_SOURCE_QUERY: QueryDslQueryContainer = { + bool: { + must: [ + { + term: { + type: 'cases-user-actions', + }, + }, + { + bool: { + should: [ + { + term: { + 'cases-user-actions.type': 'severity', + }, + }, + { + term: { + 'cases-user-actions.type': 'delete_case', + }, + }, + { + term: { + 'cases-user-actions.type': 'category', + }, + }, + { + term: { + 'cases-user-actions.type': 'status', + }, + }, + { + term: { + 'cases-user-actions.type': 'tags', + }, + }, + ], + minimum_should_match: 1, + }, + }, + ], + }, +}; + +export const CAI_ACTIVITY_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX; + +export const CAI_ACTIVITY_BACKFILL_TASK_ID = 'cai_activity_backfill_task'; + +export const CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID = 'cai_cases_activity_synchronization_task'; + +export const getActivitySynchronizationSourceQuery = ( + lastSyncAt: Date +): QueryDslQueryContainer => ({ + bool: { + must: [ + { + term: { + type: 'cases-user-actions', + }, + }, + { + range: { + 'cases-user-actions.created_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + { + bool: { + should: [ + { + term: { + 'cases-user-actions.type': 'severity', + }, + }, + { + term: { + 'cases-user-actions.type': 'delete_case', + }, + }, + { + term: { + 'cases-user-actions.type': 'category', + }, + }, + { + term: { + 'cases-user-actions.type': 'status', + }, + }, + { + term: { + 'cases-user-actions.type': 'tags', + }, + }, + ], + minimum_should_match: 1, + }, + }, + ], + }, +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/index.ts new file mode 100644 index 0000000000000..dd4256d41841b --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/index.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { AnalyticsIndex } from '../analytics_index'; +import { + CAI_ACTIVITY_INDEX_NAME, + CAI_ACTIVITY_INDEX_ALIAS, + CAI_ACTIVITY_INDEX_VERSION, + CAI_ACTIVITY_SOURCE_INDEX, + CAI_ACTIVITY_SOURCE_QUERY, + CAI_ACTIVITY_BACKFILL_TASK_ID, + CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID, +} from './constants'; +import { CAI_ACTIVITY_INDEX_MAPPINGS } from './mappings'; +import { CAI_ACTIVITY_INDEX_SCRIPT, CAI_ACTIVITY_INDEX_SCRIPT_ID } from './painless_scripts'; +import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task'; + +export const createActivityAnalyticsIndex = ({ + esClient, + logger, + isServerless, + taskManager, +}: { + esClient: ElasticsearchClient; + logger: Logger; + isServerless: boolean; + taskManager: TaskManagerStartContract; +}): AnalyticsIndex => + new AnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + indexName: CAI_ACTIVITY_INDEX_NAME, + indexAlias: CAI_ACTIVITY_INDEX_ALIAS, + indexVersion: CAI_ACTIVITY_INDEX_VERSION, + mappings: CAI_ACTIVITY_INDEX_MAPPINGS, + painlessScriptId: CAI_ACTIVITY_INDEX_SCRIPT_ID, + painlessScript: CAI_ACTIVITY_INDEX_SCRIPT, + taskId: CAI_ACTIVITY_BACKFILL_TASK_ID, + sourceIndex: CAI_ACTIVITY_SOURCE_INDEX, + sourceQuery: CAI_ACTIVITY_SOURCE_QUERY, + }); + +export const scheduleActivityAnalyticsSyncTask = ({ + taskManager, + logger, +}: { + taskManager: TaskManagerStartContract; + logger: Logger; +}) => { + scheduleCAISynchronizationTask({ + taskId: CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID, + sourceIndex: CAI_ACTIVITY_SOURCE_INDEX, + destIndex: CAI_ACTIVITY_INDEX_NAME, + taskManager, + logger, + }).catch((e) => { + logger.error( + `Error scheduling ${CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID} task, received ${e.message}` + ); + }); +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/mappings.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/mappings.ts new file mode 100644 index 0000000000000..a5d19f1a615c2 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/mappings.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; + +export const CAI_ACTIVITY_INDEX_MAPPINGS: MappingTypeMapping = { + dynamic: false, + properties: { + '@timestamp': { + type: 'date', + }, + case_id: { + type: 'keyword', + }, + action: { + type: 'keyword', + }, + type: { + type: 'keyword', + }, + payload: { + properties: { + status: { + type: 'keyword', + }, + tags: { + type: 'keyword', + }, + category: { + type: 'keyword', + }, + severity: { + type: 'keyword', + }, + }, + }, + created_at: { + type: 'date', + }, + created_at_ms: { + type: 'long', + }, + created_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + owner: { + type: 'keyword', + }, + space_ids: { + type: 'keyword', + }, + }, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/painless_scripts.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/painless_scripts.ts new file mode 100644 index 0000000000000..1244cdfac901c --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/activity_index/painless_scripts.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StoredScript } from '@elastic/elasticsearch/lib/api/types'; +import { CAI_ACTIVITY_INDEX_VERSION } from './constants'; + +export const CAI_ACTIVITY_INDEX_SCRIPT_ID = `cai_activity_script_${CAI_ACTIVITY_INDEX_VERSION}`; +export const CAI_ACTIVITY_INDEX_SCRIPT: StoredScript = { + lang: 'painless', + source: ` + def source = [:]; + source.putAll(ctx._source); + ctx._source.clear(); + + ctx._source.action = source["cases-user-actions"].action; + ctx._source.type = source["cases-user-actions"].type; + + long milliSinceEpoch = new Date().getTime(); + Instant instant = Instant.ofEpochMilli(milliSinceEpoch); + ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z')); + + ZonedDateTime zdt_created = + ZonedDateTime.parse(source["cases-user-actions"].created_at); + ctx._source.created_at_ms = zdt_created.toInstant().toEpochMilli(); + ctx._source.created_at = source["cases-user-actions"].created_at; + + if (source["cases-user-actions"].created_by != null) { + ctx._source.created_by = new HashMap(); + ctx._source.created_by.full_name = source["cases-user-actions"].created_by.full_name; + ctx._source.created_by.username = source["cases-user-actions"].created_by.username; + ctx._source.created_by.profile_uid = source["cases-user-actions"].created_by.profile_uid; + ctx._source.created_by.email = source["cases-user-actions"].created_by.email; + } + + if (source["cases-user-actions"].payload != null) { + ctx._source.payload = new HashMap(); + + if (source["cases-user-actions"].type == "severity" && source["cases-user-actions"].payload.severity != null) { + ctx._source.payload.severity = source["cases-user-actions"].payload.severity; + } + + if (source["cases-user-actions"].type == "category" && source["cases-user-actions"].payload.category != null) { + ctx._source.payload.category = source["cases-user-actions"].payload.category; + } + + if (source["cases-user-actions"].type == "status" && source["cases-user-actions"].payload.status != null) { + ctx._source.payload.status = source["cases-user-actions"].payload.status; + } + + if (source["cases-user-actions"].type == "tags" && source["cases-user-actions"].payload.tags != null) { + ctx._source.payload.tags = source["cases-user-actions"].payload.tags; + } + } + + if (source.references != null) { + for (item in source.references) { + if (item.type == "cases") { + ctx._source.case_id = item.id; + } + } + } + + ctx._source.owner = source["cases-user-actions"].owner; + ctx._source.space_ids = source.namespaces; + `, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.test.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.test.ts new file mode 100644 index 0000000000000..6117fbf6b4157 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.test.ts @@ -0,0 +1,308 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; +import { errors as esErrors } from '@elastic/elasticsearch'; + +import { AnalyticsIndex } from './analytics_index'; +import type { + IndicesCreateResponse, + IndicesPutMappingResponse, + MappingTypeMapping, + QueryDslQueryContainer, + StoredScript, +} from '@elastic/elasticsearch/lib/api/types'; +import { fullJitterBackoffFactory } from '../common/retry_service/full_jitter_backoff'; +import { scheduleCAIBackfillTask } from './tasks/backfill_task'; + +jest.mock('../common/retry_service/full_jitter_backoff'); +jest.mock('./tasks/backfill_task'); + +const fullJitterBackoffFactoryMock = fullJitterBackoffFactory as jest.Mock; +const scheduleCAIBackfillTaskMock = scheduleCAIBackfillTask as jest.Mock; + +describe('AnalyticsIndex', () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + const taskManager = taskManagerMock.createStart(); + const isServerless = false; + const indexName = '.test-index-name'; + const indexAlias = '.index-name'; + const indexVersion = 1; + const painlessScriptId = 'painless_script_id'; + const taskId = 'foobar_task_id'; + const sourceIndex = '.source-index'; + + const painlessScript: StoredScript = { + lang: 'painless', + source: 'ctx._source.remove("foobar");', + }; + const mappings: MappingTypeMapping = { + dynamic: false, + properties: { + title: { + type: 'keyword', + }, + }, + }; + const mappingsMeta = { + mapping_version: indexVersion, + painless_script_id: painlessScriptId, + }; + const sourceQuery: QueryDslQueryContainer = { + term: { + type: 'cases', + }, + }; + + let index: AnalyticsIndex; + + // 1ms delay before retrying + const nextBackOff = jest.fn().mockReturnValue(1); + + const backOffFactory = { + create: () => ({ nextBackOff }), + }; + + beforeEach(() => { + jest.clearAllMocks(); + + fullJitterBackoffFactoryMock.mockReturnValue(backOffFactory); + + index = new AnalyticsIndex({ + esClient, + logger, + indexName, + indexAlias, + indexVersion, + isServerless, + mappings, + painlessScript, + painlessScriptId, + sourceIndex, + sourceQuery, + taskId, + taskManager, + }); + }); + + it('checks if the index exists', async () => { + await index.upsertIndex(); + + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + }); + + it('creates index if it does not exist', async () => { + esClient.indices.exists.mockResolvedValueOnce(false); + + await index.upsertIndex(); + + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript }); + expect(esClient.indices.create).toBeCalledWith({ + index: indexName, + timeout: '300s', + mappings: { + ...mappings, + _meta: mappingsMeta, + }, + aliases: { + [indexAlias]: { + is_write_index: true, + }, + }, + settings: { + index: { + auto_expand_replicas: '0-1', + mode: 'lookup', + number_of_shards: 1, + refresh_interval: '15s', + }, + }, + }); + expect(scheduleCAIBackfillTaskMock).toHaveBeenCalledWith({ + taskId, + sourceIndex, + sourceQuery, + destIndex: indexName, + taskManager, + logger, + }); + }); + + it('updates index if it exists and the mapping has a lower version number', async () => { + esClient.indices.exists.mockResolvedValueOnce(true); + esClient.indices.getMapping.mockResolvedValueOnce({ + [indexName]: { + mappings: { + _meta: { + mapping_version: 0, // lower version number + painless_script_id: painlessScriptId, + }, + dynamic: mappings.dynamic, + properties: mappings.properties, + }, + }, + }); + + await index.upsertIndex(); + + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript }); + expect(esClient.indices.putMapping).toBeCalledWith({ + index: indexName, + ...mappings, + _meta: mappingsMeta, + }); + expect(scheduleCAIBackfillTaskMock).toBeCalledWith({ + taskId, + sourceIndex, + sourceQuery, + destIndex: indexName, + taskManager, + logger, + }); + }); + + it('does not update index if it exists and the mapping has a higher version number', async () => { + esClient.indices.exists.mockResolvedValueOnce(true); + esClient.indices.getMapping.mockResolvedValueOnce({ + [indexName]: { + mappings: { + _meta: { + mapping_version: 10, // higher version number + painless_script_id: painlessScriptId, + }, + dynamic: mappings.dynamic, + properties: mappings.properties, + }, + }, + }); + + await index.upsertIndex(); + + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledTimes(0); + expect(esClient.indices.putMapping).toBeCalledTimes(0); + expect(scheduleCAIBackfillTaskMock).toBeCalledTimes(0); + + expect(logger.debug).toBeCalledWith( + `[${indexName}] Mapping version is up to date. Skipping update.`, + { tags: ['cai-index-creation', `${indexName}`] } + ); + }); + + it('does not update index if it exists and the mapping has the same version number', async () => { + esClient.indices.exists.mockResolvedValueOnce(true); + esClient.indices.getMapping.mockResolvedValueOnce({ [indexName]: { mappings } }); + + await index.upsertIndex(); + + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledTimes(0); + expect(esClient.indices.putMapping).toBeCalledTimes(0); + expect(scheduleCAIBackfillTaskMock).toBeCalledTimes(0); + + expect(logger.debug).toBeCalledWith( + `[${indexName}] Mapping version is up to date. Skipping update.`, + { tags: ['cai-index-creation', `${indexName}`] } + ); + }); + + describe('Error handling', () => { + it('retries if the esClient throws a retryable error', async () => { + esClient.indices.exists + .mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A')) + .mockRejectedValueOnce(new esErrors.TimeoutError('My retryable error B')) + .mockResolvedValue(true); + await index.upsertIndex(); + + expect(nextBackOff).toBeCalledTimes(2); + expect(esClient.indices.exists).toBeCalledTimes(3); + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + }); + + it('retries if the esClient throws a retryable error when creating an index', async () => { + esClient.indices.exists.mockResolvedValue(false); + esClient.indices.create + .mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A')) + .mockResolvedValue({} as IndicesCreateResponse); + + await index.upsertIndex(); + + expect(nextBackOff).toBeCalledTimes(1); + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript }); + expect(esClient.indices.create).toBeCalledTimes(2); + expect(scheduleCAIBackfillTaskMock).toHaveBeenCalledWith({ + taskId, + sourceIndex, + sourceQuery, + destIndex: indexName, + taskManager, + logger, + }); + }); + + it('retries if the esClient throws a retryable error when updating an index', async () => { + esClient.indices.exists.mockResolvedValue(true); + esClient.indices.getMapping.mockResolvedValue({ + [indexName]: { + mappings: { + _meta: { + mapping_version: 0, // lower version number + painless_script_id: painlessScriptId, + }, + dynamic: mappings.dynamic, + properties: mappings.properties, + }, + }, + }); + + esClient.indices.putMapping + .mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A')) + .mockResolvedValue({} as IndicesPutMappingResponse); + + await index.upsertIndex(); + + expect(nextBackOff).toBeCalledTimes(1); + expect(esClient.indices.exists).toBeCalledWith({ index: indexName }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName }); + expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript }); + + expect(esClient.indices.putMapping).toBeCalledTimes(2); + expect(esClient.indices.putMapping).toBeCalledWith({ + index: indexName, + ...mappings, + _meta: mappingsMeta, + }); + + expect(scheduleCAIBackfillTaskMock).toBeCalledWith({ + taskId, + sourceIndex, + sourceQuery, + destIndex: indexName, + taskManager, + logger, + }); + }); + + it('does not retry if the eexecution throws a non-retryable error', async () => { + esClient.indices.exists.mockRejectedValue(new Error('My terrible error')); + + await expect(index.upsertIndex()).resolves.not.toThrow(); + + expect(nextBackOff).toBeCalledTimes(0); + // Paths in the algorithm after the error are not called. + expect(esClient.indices.getMapping).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.ts new file mode 100644 index 0000000000000..325b2a4ce5420 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/analytics_index.ts @@ -0,0 +1,254 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { errors as EsErrors } from '@elastic/elasticsearch'; +import type { + IndicesIndexSettings, + MappingTypeMapping, + QueryDslQueryContainer, + StoredScript, +} from '@elastic/elasticsearch/lib/api/types'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; + +import { + CAI_NUMBER_OF_SHARDS, + CAI_AUTO_EXPAND_REPLICAS, + CAI_REFRESH_INTERVAL, + CAI_INDEX_MODE, + CAI_DEFAULT_TIMEOUT, +} from './constants'; +import { fullJitterBackoffFactory } from '../common/retry_service/full_jitter_backoff'; +import { scheduleCAIBackfillTask } from './tasks/backfill_task'; +import { CasesAnalyticsRetryService } from './retry_service'; + +interface AnalyticsIndexParams { + esClient: ElasticsearchClient; + logger: Logger; + indexName: string; + indexAlias: string; + indexVersion: number; + isServerless: boolean; + mappings: MappingTypeMapping; + painlessScript: StoredScript; + painlessScriptId: string; + sourceIndex: string; + sourceQuery: QueryDslQueryContainer; + taskId: string; + taskManager: TaskManagerStartContract; +} + +interface MappingMeta { + mapping_version: number; + painless_script_id: string; +} + +export class AnalyticsIndex { + private readonly logger: Logger; + private readonly indexName: string; + private readonly indexAlias: string; + private readonly indexVersion: number; + private readonly esClient: ElasticsearchClient; + private readonly mappings: MappingTypeMapping; + private readonly indexSettings?: IndicesIndexSettings; + private readonly painlessScriptId: string; + private readonly painlessScript: StoredScript; + private readonly retryService: CasesAnalyticsRetryService; + private readonly taskManager: TaskManagerStartContract; + private readonly taskId: string; + private readonly sourceIndex: string; + private readonly sourceQuery: QueryDslQueryContainer; + + constructor({ + logger, + esClient, + isServerless, + indexName, + indexAlias, + indexVersion, + mappings, + painlessScriptId, + painlessScript, + taskManager, + taskId, + sourceIndex, + sourceQuery, + }: AnalyticsIndexParams) { + this.logger = logger; + this.esClient = esClient; + this.indexName = indexName; + this.indexAlias = indexAlias; + this.indexVersion = indexVersion; + + this.mappings = mappings; + this.mappings._meta = this.getMappingMeta({ indexVersion, painlessScriptId }); + + this.painlessScriptId = painlessScriptId; + this.painlessScript = painlessScript; + this.taskManager = taskManager; + this.taskId = taskId; + this.sourceIndex = sourceIndex; + this.sourceQuery = sourceQuery; + this.indexSettings = { + // settings are not supported on serverless ES + ...(isServerless + ? {} + : { + number_of_shards: CAI_NUMBER_OF_SHARDS, + auto_expand_replicas: CAI_AUTO_EXPAND_REPLICAS, + refresh_interval: CAI_REFRESH_INTERVAL, + mode: CAI_INDEX_MODE, + }), + }; + /** + * We should wait at least 5ms before retrying and no more that 2sec + */ + const backOffFactory = fullJitterBackoffFactory({ baseDelay: 5, maxBackoffTime: 2000 }); + this.retryService = new CasesAnalyticsRetryService(this.logger, backOffFactory); + } + + public async upsertIndex() { + try { + await this.retryService.retryWithBackoff(() => this._upsertIndex()); + } catch (error) { + // We do not throw because errors should not break execution + this.logger.error( + `[${this.indexName}] Failed to create index. Error message: ${error.message}` + ); + } + } + + private async _upsertIndex() { + try { + const indexExists = await this.indexExists(); + + if (!indexExists) { + this.logDebug('Index does not exist. Creating.'); + await this.createIndexMapping(); + } else { + this.logDebug('Index exists. Checking mapping.'); + await this.updateIndexMapping(); + } + } catch (error) { + this.handleError('Failed to create the index.', error); + } + } + + private async updateIndexMapping() { + try { + const shouldUpdateMapping = await this.shouldUpdateMapping(); + + if (shouldUpdateMapping) { + await this.updateMapping(); + } else { + this.logDebug('Mapping version is up to date. Skipping update.'); + } + } catch (error) { + this.handleError('Failed to update the index mapping.', error); + } + } + + private async getCurrentMapping() { + return this.esClient.indices.getMapping({ + index: this.indexName, + }); + } + + private async updateMapping() { + this.logDebug(`Updating the painless script.`); + await this.putScript(); + + this.logDebug(`Updating index mapping.`); + await this.esClient.indices.putMapping({ + index: this.indexName, + ...this.mappings, + }); + + this.logDebug(`Scheduling the backfill task.`); + await this.scheduleBackfillTask(); + } + + private async createIndexMapping() { + this.logDebug(`Creating painless script.`); + await this.putScript(); + + this.logDebug(`Creating index.`); + await this.esClient.indices.create({ + index: this.indexName, + timeout: CAI_DEFAULT_TIMEOUT, + mappings: this.mappings, + settings: { + index: this.indexSettings, + }, + aliases: { + [this.indexAlias]: { + is_write_index: true, + }, + }, + }); + + this.logDebug(`Scheduling the backfill task.`); + await this.scheduleBackfillTask(); + } + + private async indexExists(): Promise { + this.logDebug(`Checking if index exists.`); + return this.esClient.indices.exists({ + index: this.indexName, + }); + } + + private async shouldUpdateMapping(): Promise { + const currentMapping = await this.getCurrentMapping(); + return currentMapping[this.indexName].mappings._meta?.mapping_version < this.indexVersion; + } + + private async putScript() { + await this.esClient.putScript({ + id: this.painlessScriptId, + script: this.painlessScript, + }); + } + + private getMappingMeta({ + indexVersion, + painlessScriptId, + }: { + indexVersion: number; + painlessScriptId: string; + }): MappingMeta { + this.logDebug( + `Construction mapping._meta. Index version: ${indexVersion}. Painless script: ${painlessScriptId}.` + ); + + return { + mapping_version: indexVersion, + painless_script_id: painlessScriptId, + }; + } + + public logDebug(message: string) { + this.logger.debug(`[${this.indexName}] ${message}`, { + tags: ['cai-index-creation', this.indexName], + }); + } + + private handleError(message: string, error: EsErrors.ElasticsearchClientError) { + this.logger.error(`[${this.indexName}] ${message} Error message: ${error.message}`); + + throw error; + } + private async scheduleBackfillTask() { + await scheduleCAIBackfillTask({ + taskId: this.taskId, + sourceIndex: this.sourceIndex, + sourceQuery: this.sourceQuery, + destIndex: this.indexName, + taskManager: this.taskManager, + logger: this.logger, + }); + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/constants.ts new file mode 100644 index 0000000000000..7a2f991aa439f --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/constants.ts @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; + +import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; + +export const CAI_ATTACHMENTS_INDEX_NAME = '.internal.cases-attachments'; + +export const CAI_ATTACHMENTS_INDEX_ALIAS = '.cases-attachments'; + +export const CAI_ATTACHMENTS_INDEX_VERSION = 1; + +export const CAI_ATTACHMENTS_SOURCE_QUERY: QueryDslQueryContainer = { + bool: { + must: [ + { + term: { + type: 'cases-comments', + }, + }, + { + bool: { + should: [ + { + term: { + 'cases-comments.type': 'externalReference', + }, + }, + { + term: { + 'cases-comments.type': 'alert', + }, + }, + ], + minimum_should_match: 1, + }, + }, + ], + }, +}; + +export const CAI_ATTACHMENTS_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX; + +export const CAI_ATTACHMENTS_BACKFILL_TASK_ID = 'cai_attachments_backfill_task'; + +export const CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID = 'cai_cases_attachments_synchronization_task'; + +export const getAttachmentsSynchronizationSourceQuery = ( + lastSyncAt: Date +): QueryDslQueryContainer => ({ + bool: { + must: [ + { + term: { + type: 'cases-comments', + }, + }, + { + bool: { + should: [ + { + term: { + 'cases-comments.type': 'externalReference', + }, + }, + { + term: { + 'cases-comments.type': 'alert', + }, + }, + ], + minimum_should_match: 1, + }, + }, + { + bool: { + should: [ + { + range: { + 'cases-comments.created_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + { + range: { + 'cases-comments.updated_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/index.ts new file mode 100644 index 0000000000000..b95b9f579ce76 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/index.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { AnalyticsIndex } from '../analytics_index'; +import { + CAI_ATTACHMENTS_INDEX_NAME, + CAI_ATTACHMENTS_INDEX_ALIAS, + CAI_ATTACHMENTS_INDEX_VERSION, + CAI_ATTACHMENTS_SOURCE_INDEX, + CAI_ATTACHMENTS_SOURCE_QUERY, + CAI_ATTACHMENTS_BACKFILL_TASK_ID, + CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID, +} from './constants'; +import { CAI_ATTACHMENTS_INDEX_MAPPINGS } from './mappings'; +import { CAI_ATTACHMENTS_INDEX_SCRIPT, CAI_ATTACHMENTS_INDEX_SCRIPT_ID } from './painless_scripts'; +import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task'; + +export const createAttachmentsAnalyticsIndex = ({ + esClient, + logger, + isServerless, + taskManager, +}: { + esClient: ElasticsearchClient; + logger: Logger; + isServerless: boolean; + taskManager: TaskManagerStartContract; +}): AnalyticsIndex => + new AnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + indexName: CAI_ATTACHMENTS_INDEX_NAME, + indexAlias: CAI_ATTACHMENTS_INDEX_ALIAS, + indexVersion: CAI_ATTACHMENTS_INDEX_VERSION, + mappings: CAI_ATTACHMENTS_INDEX_MAPPINGS, + painlessScriptId: CAI_ATTACHMENTS_INDEX_SCRIPT_ID, + painlessScript: CAI_ATTACHMENTS_INDEX_SCRIPT, + taskId: CAI_ATTACHMENTS_BACKFILL_TASK_ID, + sourceIndex: CAI_ATTACHMENTS_SOURCE_INDEX, + sourceQuery: CAI_ATTACHMENTS_SOURCE_QUERY, + }); + +export const scheduleAttachmentsAnalyticsSyncTask = ({ + taskManager, + logger, +}: { + taskManager: TaskManagerStartContract; + logger: Logger; +}) => { + scheduleCAISynchronizationTask({ + taskId: CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID, + sourceIndex: CAI_ATTACHMENTS_SOURCE_INDEX, + destIndex: CAI_ATTACHMENTS_INDEX_NAME, + taskManager, + logger, + }).catch((e) => { + logger.error( + `Error scheduling ${CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID} task, received ${e.message}` + ); + }); +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/mappings.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/mappings.ts new file mode 100644 index 0000000000000..cb389d54e4ca4 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/mappings.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; + +export const CAI_ATTACHMENTS_INDEX_MAPPINGS: MappingTypeMapping = { + dynamic: false, + properties: { + '@timestamp': { + type: 'date', + }, + case_id: { + type: 'keyword', + }, + type: { + type: 'keyword', + }, + created_at: { + type: 'date', + }, + created_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + payload: { + properties: { + alerts: { + properties: { + id: { + type: 'keyword', + }, + index: { + type: 'keyword', + }, + }, + }, + file: { + properties: { + id: { + type: 'keyword', + }, + extension: { + type: 'keyword', + }, + mimeType: { + type: 'keyword', + }, + name: { + type: 'keyword', + }, + }, + }, + }, + }, + owner: { + type: 'keyword', + }, + space_ids: { + type: 'keyword', + }, + }, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/painless_scripts.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/painless_scripts.ts new file mode 100644 index 0000000000000..78425fcad1dc4 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/attachments_index/painless_scripts.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StoredScript } from '@elastic/elasticsearch/lib/api/types'; +import { CAI_ATTACHMENTS_INDEX_VERSION } from './constants'; + +export const CAI_ATTACHMENTS_INDEX_SCRIPT_ID = `cai_attachments_script_${CAI_ATTACHMENTS_INDEX_VERSION}`; +export const CAI_ATTACHMENTS_INDEX_SCRIPT: StoredScript = { + lang: 'painless', + source: ` + def source = [:]; + source.putAll(ctx._source); + ctx._source.clear(); + + if ( + ( + source["cases-comments"].type == "externalReference" && + source["cases-comments"].externalReferenceAttachmentTypeId != ".files" + ) && + source["cases-comments"].type != "alert" + ) { + ctx.op = "noop"; + return; + } + + long timestampInMillis = new Date().getTime(); + Instant timestampInstance = Instant.ofEpochMilli(timestampInMillis); + ctx._source['@timestamp'] = ZonedDateTime.ofInstant(timestampInstance, ZoneId.of('Z')); + + ctx._source.type = source["cases-comments"].type; + + if ( + ctx._source.type == "alert" && + source["cases-comments"].alertId != null && + source["cases-comments"].index != null + ) { + ctx._source.payload = new HashMap(); + ctx._source.payload.alerts = new ArrayList(); + + for (int y = 0; y < source["cases-comments"].alertId.size(); y++) { + Map alert = new HashMap(); + + alert.id = source["cases-comments"].alertId[y]; + + if ( y < source["cases-comments"].index.size() ) { + alert.index = source["cases-comments"].index[y]; + } + + ctx._source.payload.alerts.add(alert); + } + } + + if ( + ctx._source.type == "externalReference" && + source["cases-comments"].externalReferenceAttachmentTypeId == ".files" && + source["cases-comments"].externalReferenceMetadata.files.size() > 0 + ) { + ctx._source.payload = new HashMap(); + ctx._source.payload.file = new HashMap(); + ctx._source.payload.file.extension = source["cases-comments"].externalReferenceMetadata.files[0].extension; + ctx._source.payload.file.mimeType = source["cases-comments"].externalReferenceMetadata.files[0].mimeType; + ctx._source.payload.file.name = source["cases-comments"].externalReferenceMetadata.files[0].name; + } + + if (source.references != null) { + for (item in source.references) { + if (item.type == "file") { + ctx._source.payload.file.id = item.id; + } else if (item.type == "cases") { + ctx._source.case_id = item.id; + } + } + } + + ctx._source.owner = source["cases-comments"].owner; + ctx._source.space_ids = source.namespaces; + `, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/constants.ts new file mode 100644 index 0000000000000..9a80c46e941d1 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/constants.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; + +import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; + +export const CAI_CASES_INDEX_NAME = '.internal.cases'; + +export const CAI_CASES_INDEX_ALIAS = '.cases'; + +export const CAI_CASES_INDEX_VERSION = 1; + +export const CAI_CASES_SOURCE_QUERY: QueryDslQueryContainer = { + term: { + type: 'cases', + }, +}; + +export const CAI_CASES_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX; + +export const CAI_CASES_BACKFILL_TASK_ID = 'cai_cases_backfill_task'; + +export const CAI_CASES_SYNCHRONIZATION_TASK_ID = 'cai_cases_synchronization_task'; + +export const getCasesSynchronizationSourceQuery = (lastSyncAt: Date): QueryDslQueryContainer => ({ + bool: { + must: [ + { + term: { + type: 'cases', + }, + }, + { + bool: { + should: [ + { + range: { + 'cases.created_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + { + range: { + 'cases.updated_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/index.ts new file mode 100644 index 0000000000000..25dab93128df0 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/index.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { AnalyticsIndex } from '../analytics_index'; +import { + CAI_CASES_INDEX_NAME, + CAI_CASES_INDEX_ALIAS, + CAI_CASES_INDEX_VERSION, + CAI_CASES_SOURCE_INDEX, + CAI_CASES_SOURCE_QUERY, + CAI_CASES_BACKFILL_TASK_ID, + CAI_CASES_SYNCHRONIZATION_TASK_ID, +} from './constants'; +import { CAI_CASES_INDEX_MAPPINGS } from './mappings'; +import { CAI_CASES_INDEX_SCRIPT_ID, CAI_CASES_INDEX_SCRIPT } from './painless_scripts'; +import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task'; + +export const createCasesAnalyticsIndex = ({ + esClient, + logger, + isServerless, + taskManager, +}: { + esClient: ElasticsearchClient; + logger: Logger; + isServerless: boolean; + taskManager: TaskManagerStartContract; +}): AnalyticsIndex => + new AnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + indexName: CAI_CASES_INDEX_NAME, + indexAlias: CAI_CASES_INDEX_ALIAS, + indexVersion: CAI_CASES_INDEX_VERSION, + mappings: CAI_CASES_INDEX_MAPPINGS, + painlessScriptId: CAI_CASES_INDEX_SCRIPT_ID, + painlessScript: CAI_CASES_INDEX_SCRIPT, + taskId: CAI_CASES_BACKFILL_TASK_ID, + sourceIndex: CAI_CASES_SOURCE_INDEX, + sourceQuery: CAI_CASES_SOURCE_QUERY, + }); + +export const scheduleCasesAnalyticsSyncTask = ({ + taskManager, + logger, +}: { + taskManager: TaskManagerStartContract; + logger: Logger; +}) => { + scheduleCAISynchronizationTask({ + taskId: CAI_CASES_SYNCHRONIZATION_TASK_ID, + sourceIndex: CAI_CASES_SOURCE_INDEX, + destIndex: CAI_CASES_INDEX_NAME, + taskManager, + logger, + }).catch((e) => { + logger.error( + `Error scheduling ${CAI_CASES_SYNCHRONIZATION_TASK_ID} task, received ${e.message}` + ); + }); +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/mappings.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/mappings.ts new file mode 100644 index 0000000000000..422064be98873 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/mappings.ts @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; + +export const CAI_CASES_INDEX_MAPPINGS: MappingTypeMapping = { + dynamic: false, + properties: { + '@timestamp': { + type: 'date', + }, + title: { + type: 'text', + fields: { + keyword: { + type: 'keyword', + }, + }, + }, + description: { + type: 'text', + }, + tags: { + type: 'keyword', + }, + category: { + type: 'keyword', + }, + status: { + type: 'keyword', + }, + status_sort: { + type: 'short', + }, + severity: { + type: 'keyword', + }, + severity_sort: { + type: 'short', + }, + created_at: { + type: 'date', + }, + created_at_ms: { + type: 'long', + }, + created_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + updated_at: { + type: 'date', + }, + updated_at_ms: { + type: 'long', + }, + updated_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + closed_at: { + type: 'date', + }, + closed_at_ms: { + type: 'long', + }, + closed_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + assignees: { + type: 'keyword', + }, + time_to_resolve: { + type: 'long', + }, + time_to_acknowledge: { + type: 'long', + }, + time_to_investigate: { + type: 'long', + }, + custom_fields: { + properties: { + type: { + type: 'keyword', + }, + key: { + type: 'keyword', + }, + value: { + type: 'keyword', + }, + }, + }, + observables: { + properties: { + type: { + // called typeKey in the cases mapping + type: 'keyword', + }, + value: { + type: 'keyword', + }, + }, + }, + total_assignees: { + type: 'integer', + }, + owner: { + type: 'keyword', + }, + space_ids: { + type: 'keyword', + }, + total_alerts: { + type: 'integer', + }, + total_comments: { + type: 'integer', + }, + }, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/painless_scripts.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/painless_scripts.ts new file mode 100644 index 0000000000000..d2d0134a7b109 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/cases_index/painless_scripts.ts @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StoredScript } from '@elastic/elasticsearch/lib/api/types'; +import { CAI_CASES_INDEX_VERSION } from './constants'; + +export const CAI_CASES_INDEX_SCRIPT_ID = `cai_cases_script_${CAI_CASES_INDEX_VERSION}`; +export const CAI_CASES_INDEX_SCRIPT: StoredScript = { + lang: 'painless', + source: ` + String statusDecoder(def x) { + if (x == 0) { + return "open"; + } + if (x == 10) { + return "in-progress"; + } + if (x == 20) { + return "closed"; + } + return ""; + } + + String severityDecoder(def x) { + if (x == 0) { + return "low" + } + if (x == 10) { + return "medium" + } + if (x == 20) { + return "high" + } + if (x == 30) { + return "critical" + } + return "" + } + + def source = [:]; + source.putAll(ctx._source); + ctx._source.clear(); + + long milliSinceEpoch = new Date().getTime(); + Instant instant = Instant.ofEpochMilli(milliSinceEpoch); + ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z')); + + ctx._source.title = source.cases.title; + ctx._source.description = source.cases.description; + ctx._source.tags = source.cases.tags; + ctx._source.category = source.cases.category; + + ctx._source.status_sort = source.cases.status; + ctx._source.status = statusDecoder(ctx._source.status_sort); + + ctx._source.severity_sort = source.cases.severity; + ctx._source.severity = severityDecoder(ctx._source.severity_sort); + + ZonedDateTime zdt_created = + ZonedDateTime.parse(source.cases.created_at); + ctx._source.created_at_ms = zdt_created.toInstant().toEpochMilli(); + ctx._source.created_at = source.cases.created_at; + + if (source.cases.created_by != null) { + ctx._source.created_by = new HashMap(); + ctx._source.created_by.full_name = source.cases.created_by.full_name; + ctx._source.created_by.username = source.cases.created_by.username; + ctx._source.created_by.profile_uid = source.cases.created_by.profile_uid; + ctx._source.created_by.email = source.cases.created_by.email; + } + + if ( source.cases.updated_at != null ) { + ZonedDateTime zdt_updated = + ZonedDateTime.parse(source.cases.updated_at); + ctx._source.updated_at_ms = zdt_updated.toInstant().toEpochMilli(); + ctx._source.updated_at = source.cases.updated_at; + } + + if (source.cases.updated_by != null) { + ctx._source.updated_by = new HashMap(); + ctx._source.updated_by.full_name = source.cases.updated_by.full_name; + ctx._source.updated_by.username = source.cases.updated_by.username; + ctx._source.updated_by.profile_uid = source.cases.updated_by.profile_uid; + ctx._source.updated_by.email = source.cases.updated_by.email; + } + + if ( source.cases.closed_at != null ) { + ZonedDateTime zdt_closed = + ZonedDateTime.parse(source.cases.closed_at); + ctx._source.closed_at_ms = zdt_closed.toInstant().toEpochMilli(); + ctx._source.closed_at = source.cases.closed_at; + } + + if (source.cases.closed_by != null) { + ctx._source.closed_by = new HashMap(); + ctx._source.closed_by.full_name = source.cases.closed_by.full_name; + ctx._source.closed_by.username = source.cases.closed_by.username; + ctx._source.closed_by.profile_uid = source.cases.closed_by.profile_uid; + ctx._source.closed_by.email = source.cases.closed_by.email; + } + + ctx._source.assignees = []; + + if (source.cases.assignees != null) { + for (item in source.cases.assignees) { + ctx._source.assignees.add(item.uid); + } + ctx._source.total_assignees = source.cases.assignees.size(); + } + + ctx._source.custom_fields = []; + if (source.cases.customFields != null) { + for (item in source.cases.customFields) { + Map customField = new HashMap(); + + customField.type = item.type; + customField.value = item.value; + customField.key = item.key; + + ctx._source.custom_fields.add(customField); + } + } + + ctx._source.observables = []; + if (source.cases.observables != null) { + for (item in source.cases.observables) { + Map observable = new HashMap(); + + observable.label = item.label; + observable.type = item.typeKey; + observable.value = item.value; + + ctx._source.observables.add(observable); + } + } + + ctx._source.owner = source.cases.owner; + ctx._source.space_ids = source.namespaces; + + if (source.cases.time_to_acknowledge != null){ + ctx._source.time_to_acknowledge = source.cases.time_to_acknowledge; + } + + if (source.cases.time_to_investigate != null){ + ctx._source.time_to_investigate = source.cases.time_to_investigate; + } + + if (source.cases.time_to_resolve != null){ + ctx._source.time_to_resolve = source.cases.time_to_resolve; + } + + if (source.cases.total_alerts != null && source.cases.total_alerts >= 0){ + ctx._source.total_alerts = source.cases.total_alerts; + } + + if (source.cases.total_comments != null && source.cases.total_comments >= 0){ + ctx._source.total_comments = source.cases.total_comments; + } + `, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/constants.ts new file mode 100644 index 0000000000000..885da39e4bb1f --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/constants.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; + +import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; + +export const CAI_COMMENTS_INDEX_NAME = '.internal.cases-comments'; + +export const CAI_COMMENTS_INDEX_ALIAS = '.cases-comments'; + +export const CAI_COMMENTS_INDEX_VERSION = 1; + +export const CAI_COMMENTS_SOURCE_QUERY: QueryDslQueryContainer = { + bool: { + filter: [ + { + term: { + type: 'cases-comments', + }, + }, + { + term: { + 'cases-comments.type': 'user', + }, + }, + ], + }, +}; + +export const CAI_COMMENTS_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX; + +export const CAI_COMMENTS_BACKFILL_TASK_ID = 'cai_comments_backfill_task'; + +export const CAI_COMMENTS_SYNCHRONIZATION_TASK_ID = 'cai_cases_comments_synchronization_task'; + +export const getCommentsSynchronizationSourceQuery = ( + lastSyncAt: Date +): QueryDslQueryContainer => ({ + bool: { + must: [ + { + term: { + 'cases-comments.type': 'user', + }, + }, + { + term: { + type: 'cases-comments', + }, + }, + { + bool: { + should: [ + { + range: { + 'cases-comments.created_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + { + range: { + 'cases-comments.updated_at': { + gte: lastSyncAt.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/index.ts new file mode 100644 index 0000000000000..a95dea5a33e7e --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/index.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { AnalyticsIndex } from '../analytics_index'; +import { + CAI_COMMENTS_INDEX_NAME, + CAI_COMMENTS_INDEX_ALIAS, + CAI_COMMENTS_INDEX_VERSION, + CAI_COMMENTS_SOURCE_INDEX, + CAI_COMMENTS_SOURCE_QUERY, + CAI_COMMENTS_BACKFILL_TASK_ID, + CAI_COMMENTS_SYNCHRONIZATION_TASK_ID, +} from './constants'; +import { CAI_COMMENTS_INDEX_MAPPINGS } from './mappings'; +import { CAI_COMMENTS_INDEX_SCRIPT, CAI_COMMENTS_INDEX_SCRIPT_ID } from './painless_scripts'; +import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task'; + +export const createCommentsAnalyticsIndex = ({ + esClient, + logger, + isServerless, + taskManager, +}: { + esClient: ElasticsearchClient; + logger: Logger; + isServerless: boolean; + taskManager: TaskManagerStartContract; +}): AnalyticsIndex => + new AnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + indexName: CAI_COMMENTS_INDEX_NAME, + indexAlias: CAI_COMMENTS_INDEX_ALIAS, + indexVersion: CAI_COMMENTS_INDEX_VERSION, + mappings: CAI_COMMENTS_INDEX_MAPPINGS, + painlessScriptId: CAI_COMMENTS_INDEX_SCRIPT_ID, + painlessScript: CAI_COMMENTS_INDEX_SCRIPT, + taskId: CAI_COMMENTS_BACKFILL_TASK_ID, + sourceIndex: CAI_COMMENTS_SOURCE_INDEX, + sourceQuery: CAI_COMMENTS_SOURCE_QUERY, + }); + +export const scheduleCommentsAnalyticsSyncTask = ({ + taskManager, + logger, +}: { + taskManager: TaskManagerStartContract; + logger: Logger; +}) => { + scheduleCAISynchronizationTask({ + taskId: CAI_COMMENTS_SYNCHRONIZATION_TASK_ID, + sourceIndex: CAI_COMMENTS_SOURCE_INDEX, + destIndex: CAI_COMMENTS_INDEX_NAME, + taskManager, + logger, + }).catch((e) => { + logger.error( + `Error scheduling ${CAI_COMMENTS_SYNCHRONIZATION_TASK_ID} task, received ${e.message}` + ); + }); +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/mappings.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/mappings.ts new file mode 100644 index 0000000000000..7e8598981ddb8 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/mappings.ts @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; + +export const CAI_COMMENTS_INDEX_MAPPINGS: MappingTypeMapping = { + dynamic: false, + properties: { + '@timestamp': { + type: 'date', + }, + case_id: { + type: 'keyword', + }, + comment: { + type: 'text', + }, + created_at: { + type: 'date', + }, + created_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + updated_at: { + type: 'date', + }, + updated_by: { + properties: { + username: { + type: 'keyword', + }, + profile_uid: { + type: 'keyword', + }, + full_name: { + type: 'keyword', + }, + email: { + type: 'keyword', + }, + }, + }, + owner: { + type: 'keyword', + }, + space_ids: { + type: 'keyword', + }, + }, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/painless_scripts.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/painless_scripts.ts new file mode 100644 index 0000000000000..f2b061cb6df2d --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/comments_index/painless_scripts.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StoredScript } from '@elastic/elasticsearch/lib/api/types'; +import { CAI_COMMENTS_INDEX_VERSION } from './constants'; + +export const CAI_COMMENTS_INDEX_SCRIPT_ID = `cai_comments_script_${CAI_COMMENTS_INDEX_VERSION}`; +export const CAI_COMMENTS_INDEX_SCRIPT: StoredScript = { + lang: 'painless', + source: ` + def source = [:]; + source.putAll(ctx._source); + ctx._source.clear(); + + long milliSinceEpoch = new Date().getTime(); + Instant instant = Instant.ofEpochMilli(milliSinceEpoch); + ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z')); + + ctx._source.comment = source["cases-comments"].comment; + ctx._source.created_at = source["cases-comments"].created_at; + ctx._source.created_by = source["cases-comments"].created_by; + ctx._source.owner = source["cases-comments"].owner; + ctx._source.space_ids = source.namespaces; + + if ( source["cases-comments"].updated_at != null ) { + ctx._source.updated_at = source["cases-comments"].updated_at; + } + + if (source["cases-comments"].updated_by != null) { + ctx._source.updated_by = new HashMap(); + ctx._source.updated_by.full_name = source["cases-comments"].updated_by.full_name; + ctx._source.updated_by.username = source["cases-comments"].updated_by.username; + ctx._source.updated_by.profile_uid = source["cases-comments"].updated_by.profile_uid; + ctx._source.updated_by.email = source["cases-comments"].updated_by.email; + } + + if (source.references != null) { + for (item in source.references) { + if (item.type == "cases") { + ctx._source.case_id = item.id; + } + } + } + `, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/constants.ts new file mode 100644 index 0000000000000..de72f8812556d --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/constants.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; + +import { + CAI_ATTACHMENTS_INDEX_NAME, + getAttachmentsSynchronizationSourceQuery, +} from './attachments_index/constants'; +import { CAI_CASES_INDEX_NAME, getCasesSynchronizationSourceQuery } from './cases_index/constants'; +import { + CAI_COMMENTS_INDEX_NAME, + getCommentsSynchronizationSourceQuery, +} from './comments_index/constants'; +import { + CAI_ACTIVITY_INDEX_NAME, + getActivitySynchronizationSourceQuery, +} from './activity_index/constants'; + +export const CAI_NUMBER_OF_SHARDS = 1; +/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */ +export const CAI_AUTO_EXPAND_REPLICAS = '0-1'; +export const CAI_REFRESH_INTERVAL = '15s'; +export const CAI_INDEX_MODE = 'lookup'; +/** + * When a request takes a long time to complete and hits the timeout or the + * client aborts that request due to the requestTimeout, our only course of + * action is to retry that request. This places our request at the end of the + * queue and adds more load to Elasticsearch just making things worse. + * + * So we want to choose as long a timeout as possible. Some load balancers / + * reverse proxies like ELB ignore TCP keep-alive packets so unless there's a + * request or response sent over the socket it will be dropped after 60s. + */ +export const CAI_DEFAULT_TIMEOUT = '300s'; + +export const SYNCHRONIZATION_QUERIES_DICTIONARY: Record< + string, + (lastSyncAt: Date) => QueryDslQueryContainer +> = { + [CAI_CASES_INDEX_NAME]: getCasesSynchronizationSourceQuery, + [CAI_COMMENTS_INDEX_NAME]: getCommentsSynchronizationSourceQuery, + [CAI_ATTACHMENTS_INDEX_NAME]: getAttachmentsSynchronizationSourceQuery, + [CAI_ACTIVITY_INDEX_NAME]: getActivitySynchronizationSourceQuery, +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/index.ts new file mode 100644 index 0000000000000..e55a7ad8de29b --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/index.ts @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server'; +import type { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { CasesServerStartDependencies } from '../types'; +import { registerCAIBackfillTask } from './tasks/backfill_task'; +import { registerCAISynchronizationTask } from './tasks/synchronization_task'; +import { + createAttachmentsAnalyticsIndex, + scheduleAttachmentsAnalyticsSyncTask, +} from './attachments_index'; +import { createCasesAnalyticsIndex, scheduleCasesAnalyticsSyncTask } from './cases_index'; +import { createCommentsAnalyticsIndex, scheduleCommentsAnalyticsSyncTask } from './comments_index'; +import { createActivityAnalyticsIndex, scheduleActivityAnalyticsSyncTask } from './activity_index'; + +export const createCasesAnalyticsIndexes = ({ + esClient, + logger, + isServerless, + taskManager, +}: { + esClient: ElasticsearchClient; + logger: Logger; + isServerless: boolean; + taskManager: TaskManagerStartContract; +}) => { + const casesIndex = createCasesAnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + }); + const casesAttachmentsIndex = createCommentsAnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + }); + const casesCommentsIndex = createAttachmentsAnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + }); + const casesActivityIndex = createActivityAnalyticsIndex({ + logger, + esClient, + isServerless, + taskManager, + }); + + return Promise.all([ + casesIndex.upsertIndex(), + casesAttachmentsIndex.upsertIndex(), + casesCommentsIndex.upsertIndex(), + casesActivityIndex.upsertIndex(), + ]); +}; + +export const registerCasesAnalyticsIndexesTasks = ({ + taskManager, + logger, + core, +}: { + taskManager: TaskManagerSetupContract; + logger: Logger; + core: CoreSetup; +}) => { + registerCAIBackfillTask({ taskManager, logger, core }); + registerCAISynchronizationTask({ taskManager, logger, core }); +}; + +export const scheduleCasesAnalyticsSyncTasks = ({ + taskManager, + logger, +}: { + taskManager: TaskManagerStartContract; + logger: Logger; +}) => { + scheduleActivityAnalyticsSyncTask({ taskManager, logger }); + scheduleCasesAnalyticsSyncTask({ taskManager, logger }); + scheduleCommentsAnalyticsSyncTask({ taskManager, logger }); + scheduleAttachmentsAnalyticsSyncTask({ taskManager, logger }); +}; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.test.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.test.ts new file mode 100644 index 0000000000000..19b02a8fef933 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.test.ts @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core-logging-browser-mocks'; +import type { Logger } from '@kbn/core/server'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { CasesAnalyticsRetryService } from './cases_analytics_retry_service'; +import type { BackoffFactory } from '../../common/retry_service/types'; + +describe('CasesAnalyticsRetryService', () => { + const nextBackOff = jest.fn(); + const cb = jest.fn(); + const retryableError = new EsErrors.ConnectionError('My retryable error'); + + const backOffFactory: BackoffFactory = { + create: () => ({ nextBackOff }), + }; + + const mockLogger = loggingSystemMock.create().get() as jest.Mocked; + + let service: CasesAnalyticsRetryService; + + beforeEach(() => { + jest.clearAllMocks(); + + nextBackOff.mockReturnValue(1); + service = new CasesAnalyticsRetryService(mockLogger, backOffFactory); + }); + + it('should not retry if the error is not a retryable ElasticsearchClientError', async () => { + cb.mockRejectedValue(new Error('My error')); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My error"` + ); + + expect(cb).toBeCalledTimes(1); + expect(nextBackOff).not.toBeCalled(); + }); + + it('should not retry after trying more than the max attempts', async () => { + const maxAttempts = 3; + service = new CasesAnalyticsRetryService(mockLogger, backOffFactory, maxAttempts); + + cb.mockRejectedValue(retryableError); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My retryable error"` + ); + + expect(cb).toBeCalledTimes(maxAttempts + 1); + expect(nextBackOff).toBeCalledTimes(maxAttempts); + }); + + it('should succeed if cb does not throw', async () => { + service = new CasesAnalyticsRetryService(mockLogger, backOffFactory); + + cb.mockResolvedValue({ status: 'ok' }); + + const res = await service.retryWithBackoff(cb); + + expect(nextBackOff).toBeCalledTimes(0); + expect(cb).toBeCalledTimes(1); + expect(res).toEqual({ status: 'ok' }); + }); + + describe('Logging', () => { + it('should log a warning when retrying', async () => { + service = new CasesAnalyticsRetryService(mockLogger, backOffFactory, 2); + + cb.mockRejectedValue(retryableError); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My retryable error"` + ); + + expect(mockLogger.warn).toBeCalledTimes(2); + expect(mockLogger.warn).toHaveBeenNthCalledWith( + 1, + '[CasesAnalytics][retryWithBackoff] Failed with error "My retryable error". Attempt for retry: 1' + ); + + expect(mockLogger.warn).toHaveBeenNthCalledWith( + 2, + '[CasesAnalytics][retryWithBackoff] Failed with error "My retryable error". Attempt for retry: 2' + ); + }); + + it('should not log a warning when the error is not supported', async () => { + cb.mockRejectedValue(new Error('My error')); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My error"` + ); + + expect(mockLogger.warn).not.toBeCalled(); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.ts new file mode 100644 index 0000000000000..450647dc29b96 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/cases_analytics_retry_service.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/core/server'; +import type { errors as EsErrors } from '@elastic/elasticsearch'; +import { isRetryableEsClientError } from '../utils'; + +import type { BackoffFactory } from '../../common/retry_service/types'; +import { RetryService } from '../../common/retry_service'; + +export class CasesAnalyticsRetryService extends RetryService { + constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) { + super(logger, backOffFactory, 'CasesAnalytics', maxAttempts); + } + + protected isRetryableError(error: EsErrors.ElasticsearchClientError) { + if (isRetryableEsClientError(error)) { + return true; + } + + this.logger.debug(`[${this.serviceName}][isRetryableError] Error is not retryable`, { + tags: ['cai:retry-error'], + }); + + return false; + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/index.ts new file mode 100644 index 0000000000000..a01a6b3e0e5da --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/retry_service/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { CasesAnalyticsRetryService } from './cases_analytics_retry_service'; diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_factory.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_factory.ts new file mode 100644 index 0000000000000..79ba1a0b14f5e --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_factory.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { Logger } from '@kbn/logging'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { RunContext } from '@kbn/task-manager-plugin/server'; +import { BackfillTaskRunner } from './backfill_task_runner'; + +interface CaseAnalyticsIndexBackfillTaskFactoryParams { + logger: Logger; + getESClient: () => Promise; +} + +export class CaseAnalyticsIndexBackfillTaskFactory { + private readonly logger: Logger; + private readonly getESClient: () => Promise; + + constructor({ logger, getESClient }: CaseAnalyticsIndexBackfillTaskFactoryParams) { + this.logger = logger; + this.getESClient = getESClient; + } + + public create(context: RunContext) { + return new BackfillTaskRunner({ + taskInstance: context.taskInstance, + logger: this.logger, + getESClient: this.getESClient, + }); + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.test.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.test.ts new file mode 100644 index 0000000000000..72310cd63703d --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.test.ts @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import { errors as esErrors } from '@elastic/elasticsearch'; + +import { BackfillTaskRunner } from './backfill_task_runner'; +import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running'; + +describe('BackfillTaskRunner', () => { + const logger = loggingSystemMock.createLogger(); + const sourceIndex = '.source-index'; + const destIndex = '.dest-index'; + const sourceQuery = 'source-query'; + const taskInstance = { + params: { + sourceIndex, + destIndex, + sourceQuery, + }, + } as unknown as ConcreteTaskInstance; + + let taskRunner: BackfillTaskRunner; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('reindexes as expected', async () => { + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + const painlessScriptId = 'painlessScriptId'; + const painlessScript = { + lang: 'painless', + source: 'ctx._source.remove("foobar");', + }; + + esClient.indices.getMapping.mockResolvedValue({ + [destIndex]: { + mappings: { + _meta: { + painless_script_id: painlessScriptId, + }, + }, + }, + }); + + esClient.getScript.mockResolvedValueOnce({ + found: true, + _id: painlessScriptId, + script: painlessScript, + }); + + const getESClient = async () => esClient; + + taskRunner = new BackfillTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + const result = await taskRunner.run(); + + expect(esClient.cluster.health).toBeCalledWith({ + index: destIndex, + wait_for_status: 'green', + timeout: '300ms', + wait_for_active_shards: 'all', + }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: destIndex }); + expect(esClient.getScript).toBeCalledWith({ id: painlessScriptId }); + expect(esClient.reindex).toBeCalledWith({ + source: { + index: sourceIndex, + query: sourceQuery, + }, + dest: { index: destIndex }, + script: { + id: painlessScriptId, + }, + refresh: true, + wait_for_completion: false, + }); + expect(result).toEqual({ state: {} }); + }); + + describe('Error handling', () => { + it('calls throwRetryableError if the esClient throws a retryable error', async () => { + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + esClient.cluster.health.mockRejectedValueOnce( + new esErrors.ConnectionError('My retryable error') + ); + + const getESClient = async () => esClient; + + taskRunner = new BackfillTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + try { + await taskRunner.run(); + } catch (e) { + expect(isRetryableError(e)).toBe(true); + } + + expect(esClient.cluster.health).toBeCalledWith({ + index: destIndex, + wait_for_status: 'green', + timeout: '300ms', + wait_for_active_shards: 'all', + }); + + expect(logger.error).toBeCalledWith( + '[.dest-index] Backfill reindex failed. Error: My retryable error', + { tags: ['cai-backfill', 'cai-backfill-error', '.dest-index'] } + ); + }); + + it('calls throwUnrecoverableError if execution throws a non-retryable error', async () => { + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + esClient.cluster.health.mockRejectedValueOnce(new Error('My unrecoverable error')); + + const getESClient = async () => esClient; + + taskRunner = new BackfillTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + try { + await taskRunner.run(); + } catch (e) { + expect(isRetryableError(e)).toBe(null); + } + + expect(logger.error).toBeCalledWith( + '[.dest-index] Backfill reindex failed. Error: My unrecoverable error', + { tags: ['cai-backfill', 'cai-backfill-error', '.dest-index'] } + ); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.ts new file mode 100644 index 0000000000000..8dd67e466a06c --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/backfill_task_runner.ts @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/logging'; +import { + createTaskRunError, + TaskErrorSource, + throwRetryableError, + throwUnrecoverableError, + type ConcreteTaskInstance, +} from '@kbn/task-manager-plugin/server'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { CancellableTask } from '@kbn/task-manager-plugin/server/task'; +import type { + IndicesGetMappingResponse, + QueryDslQueryContainer, +} from '@elastic/elasticsearch/lib/api/types'; +import { isRetryableEsClientError } from '../../utils'; + +interface BackfillTaskRunnerFactoryConstructorParams { + taskInstance: ConcreteTaskInstance; + getESClient: () => Promise; + logger: Logger; +} + +export class BackfillTaskRunner implements CancellableTask { + private readonly sourceIndex: string; + private readonly destIndex: string; + private readonly sourceQuery: QueryDslQueryContainer; + private readonly getESClient: () => Promise; + private readonly logger: Logger; + private readonly errorSource = TaskErrorSource.FRAMEWORK; + + constructor({ taskInstance, getESClient, logger }: BackfillTaskRunnerFactoryConstructorParams) { + this.sourceIndex = taskInstance.params.sourceIndex; + this.destIndex = taskInstance.params.destIndex; + this.sourceQuery = taskInstance.params.sourceQuery; + this.getESClient = getESClient; + this.logger = logger; + } + + public async run() { + const esClient = await this.getESClient(); + try { + await this.waitForDestIndex(esClient); + await this.backfillReindex(esClient); + + return { + // one time only tasks get deleted so this state is not enough + // for the periodic tasks to know the backfill was complete + state: {}, // ? + }; + } catch (e) { + if (isRetryableEsClientError(e)) { + throwRetryableError( + createTaskRunError(new Error(this.getErrorMessage(e.message)), this.errorSource), + true + ); + } + + this.logger.error(`[${this.destIndex}] Backfill reindex failed. Error: ${e.message}`, { + tags: ['cai-backfill', 'cai-backfill-error', this.destIndex], + }); + throwUnrecoverableError(createTaskRunError(e, this.errorSource)); + } + } + + public async cancel() {} + + private async backfillReindex(esClient: ElasticsearchClient) { + const painlessScript = await this.getPainlessScript(esClient); + + if (painlessScript.found) { + this.logDebug(`Reindexing from ${this.sourceIndex} to ${this.destIndex}.`); + const painlessScriptId = await this.getPainlessScriptId(esClient); + + await esClient.reindex({ + source: { + index: this.sourceIndex, + query: this.sourceQuery, + }, + dest: { index: this.destIndex }, + script: { + id: painlessScriptId, + }, + /** If `true`, the request refreshes affected shards to make this operation visible to search. */ + refresh: true, + /** We do not wait for the es reindex operation to be completed. */ + wait_for_completion: false, + }); + } else { + throw createTaskRunError( + new Error(this.getErrorMessage('Painless script not found.')), + this.errorSource + ); + } + } + + private async getPainlessScript(esClient: ElasticsearchClient) { + const painlessScriptId = await this.getPainlessScriptId(esClient); + + this.logDebug(`Getting painless script with id ${painlessScriptId}.`); + return esClient.getScript({ + id: painlessScriptId, + }); + } + + private async getPainlessScriptId(esClient: ElasticsearchClient): Promise { + const currentMapping = await this.getCurrentMapping(esClient); + const painlessScriptId = currentMapping[this.destIndex].mappings._meta?.painless_script_id; + + if (!painlessScriptId) { + throw createTaskRunError( + new Error(this.getErrorMessage('Painless script id missing from mapping.')), + this.errorSource + ); + } + + return painlessScriptId; + } + + private async getCurrentMapping( + esClient: ElasticsearchClient + ): Promise { + this.logDebug('Getting index mapping.'); + return esClient.indices.getMapping({ + index: this.destIndex, + }); + } + + private async waitForDestIndex(esClient: ElasticsearchClient) { + this.logDebug('Checking index availability.'); + return esClient.cluster.health({ + index: this.destIndex, + wait_for_status: 'green', + timeout: '300ms', // this is probably too much + wait_for_active_shards: 'all', + }); + } + + public logDebug(message: string) { + this.logger.debug(`[${this.destIndex}] ${message}`, { + tags: ['cai-backfill', this.destIndex], + }); + } + + public getErrorMessage(message: string) { + const errorMessage = `[${this.destIndex}] Backfill reindex failed. Error: ${message}`; + + this.logger.error(errorMessage, { + tags: ['cai-backfill', 'cai-backfill-error', this.destIndex], + }); + + return errorMessage; + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/constants.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/constants.ts new file mode 100644 index 0000000000000..8e0bcc0a20fff --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/constants.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const TASK_TYPE = 'cai:cases_analytics_index_backfill'; +export const BACKFILL_RUN_AT = 60 * 1000; // milliseconds diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/index.ts new file mode 100644 index 0000000000000..ae0b54dd0b2cc --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/backfill_task/index.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/logging'; +import type { + RunContext, + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; +import type { CasesServerStartDependencies } from '../../../types'; +import { CaseAnalyticsIndexBackfillTaskFactory } from './backfill_task_factory'; +import { TASK_TYPE, BACKFILL_RUN_AT } from './constants'; + +export function registerCAIBackfillTask({ + taskManager, + logger, + core, +}: { + taskManager: TaskManagerSetupContract; + logger: Logger; + core: CoreSetup; +}) { + const getESClient = async (): Promise => { + const [{ elasticsearch }] = await core.getStartServices(); + return elasticsearch.client.asInternalUser; + }; + + taskManager.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Backfill cases analytics indexes.', + maxAttempts: 3, + createTaskRunner: (context: RunContext) => { + return new CaseAnalyticsIndexBackfillTaskFactory({ getESClient, logger }).create(context); + }, + }, + }); +} + +export async function scheduleCAIBackfillTask({ + taskId, + sourceIndex, + sourceQuery, + destIndex, + taskManager, + logger, +}: { + taskId: string; + sourceIndex: string; + sourceQuery: QueryDslQueryContainer; + destIndex: string; + taskManager: TaskManagerStartContract; + logger: Logger; +}) { + try { + await taskManager.ensureScheduled({ + id: taskId, + taskType: TASK_TYPE, + params: { sourceIndex, destIndex, sourceQuery }, + runAt: new Date(Date.now() + BACKFILL_RUN_AT), // todo, value is short for testing but should run after 5 minutes + state: {}, + }); + } catch (e) { + logger.error(`Error scheduling ${taskId} task, received ${e.message}`); + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/index.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/index.ts new file mode 100644 index 0000000000000..1e4c8e73f135d --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/index.ts @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/logging'; +import type { + IntervalSchedule, + RunContext, + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; +import type { CasesServerStartDependencies } from '../../../types'; +import { AnalyticsIndexSynchronizationTaskFactory } from './synchronization_task_factory'; + +const TASK_TYPE = 'cai:cases_analytics_index_synchronization'; +const SCHEDULE: IntervalSchedule = { interval: '5m' }; + +export function registerCAISynchronizationTask({ + taskManager, + logger, + core, +}: { + taskManager: TaskManagerSetupContract; + logger: Logger; + core: CoreSetup; +}) { + const getESClient = async (): Promise => { + const [{ elasticsearch }] = await core.getStartServices(); + return elasticsearch.client.asInternalUser; + }; + + taskManager.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Synchronization for the cases analytics index', + createTaskRunner: (context: RunContext) => { + return new AnalyticsIndexSynchronizationTaskFactory({ getESClient, logger }).create( + context + ); + }, + }, + }); +} + +/** + * @param {destIndex} string Should be a key of SYNCHRONIZATION_QUERIES_DICTIONARY + */ +export async function scheduleCAISynchronizationTask({ + taskId, + sourceIndex, + destIndex, + taskManager, + logger, +}: { + taskId: string; + sourceIndex: string; + destIndex: string; + taskManager: TaskManagerStartContract; + logger: Logger; +}) { + try { + await taskManager.ensureScheduled({ + id: taskId, + taskType: TASK_TYPE, + params: { sourceIndex, destIndex }, + schedule: SCHEDULE, // every 5 minutes + state: {}, + }); + } catch (e) { + logger.error(`Error scheduling ${taskId} task, received ${e.message}`); + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_factory.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_factory.ts new file mode 100644 index 0000000000000..1bb04ab7d3034 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_factory.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { Logger } from '@kbn/logging'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { RunContext } from '@kbn/task-manager-plugin/server'; +import { SynchronizationTaskRunner } from './synchronization_task_runner'; + +interface AnalyticsIndexSynchronizationTaskFactoryParams { + logger: Logger; + getESClient: () => Promise; +} + +export class AnalyticsIndexSynchronizationTaskFactory { + private readonly logger: Logger; + private readonly getESClient: () => Promise; + + constructor({ logger, getESClient }: AnalyticsIndexSynchronizationTaskFactoryParams) { + this.logger = logger; + this.getESClient = getESClient; + } + + public create(context: RunContext) { + return new SynchronizationTaskRunner({ + taskInstance: context.taskInstance, + logger: this.logger, + getESClient: this.getESClient, + }); + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.test.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.test.ts new file mode 100644 index 0000000000000..1114641365105 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.test.ts @@ -0,0 +1,387 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import type { TasksTaskInfo } from '@elastic/elasticsearch/lib/api/types'; +import { errors as esErrors } from '@elastic/elasticsearch'; + +import { SynchronizationTaskRunner } from './synchronization_task_runner'; +import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running'; +import { CAI_CASES_INDEX_NAME } from '../../cases_index/constants'; + +describe('SynchronizationTaskRunner', () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createElasticsearchClient(); + + const sourceIndex = '.source-index'; + const destIndex = CAI_CASES_INDEX_NAME; + + const painlessScriptId = 'painlessScriptId'; + const painlessScript = { + lang: 'painless', + source: 'ctx._source.remove("foobar");', + }; + + const lastSyncSuccess = new Date('2025-06-10T09:25:00.000Z'); + const lastSyncAttempt = new Date('2025-06-10T09:30:00.000Z'); + const newAttemptTime = new Date('2025-06-10T09:40:00.000Z'); + + const esReindexTaskId = 'foobar'; + + const taskInstance = { + params: { + sourceIndex, + destIndex, + }, + state: { + lastSyncSuccess, + lastSyncAttempt, + esReindexTaskId, + }, + } as unknown as ConcreteTaskInstance; + + let taskRunner: SynchronizationTaskRunner; + + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(newAttemptTime); + esClient.indices.getMapping.mockResolvedValue({ + [destIndex]: { + mappings: { + _meta: { + painless_script_id: painlessScriptId, + }, + }, + }, + }); + + esClient.getScript.mockResolvedValue({ + found: true, + _id: painlessScriptId, + script: painlessScript, + }); + + esClient.reindex.mockResolvedValue({ + task: esReindexTaskId, + }); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('reindexes when the previous sync task is completed and the index is available', async () => { + esClient.tasks.get.mockResolvedValueOnce({ + completed: true, + task: {} as TasksTaskInfo, + }); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + const result = await taskRunner.run(); + + expect(esClient.tasks.get).toBeCalledWith({ task_id: esReindexTaskId }); + expect(esClient.cluster.health).toBeCalledWith({ + index: destIndex, + wait_for_status: 'green', + timeout: '300ms', + wait_for_active_shards: 'all', + }); + expect(esClient.indices.getMapping).toBeCalledWith({ index: destIndex }); + expect(esClient.getScript).toBeCalledWith({ id: painlessScriptId }); + expect(esClient.reindex).toBeCalledWith({ + source: { + index: sourceIndex, + /* + * The previous attempt was successful so we will reindex with + * a new time. + * + * SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncAttempt) + */ + query: { + bool: { + must: [ + { + term: { + type: 'cases', + }, + }, + { + bool: { + should: [ + { + range: { + 'cases.created_at': { + gte: lastSyncAttempt.toISOString(), + }, + }, + }, + { + range: { + 'cases.updated_at': { + gte: lastSyncAttempt.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, + }, + }, + dest: { index: destIndex }, + script: { + id: painlessScriptId, + }, + refresh: true, + wait_for_completion: false, + }); + + expect(result).toEqual({ + state: { + // because the previous sync task was completed lastSyncSuccess is now lastSyncAttempt + lastSyncSuccess: lastSyncAttempt, + // we set a new value for lastSyncAttempt + lastSyncAttempt: newAttemptTime, + esReindexTaskId, + }, + }); + }); + + it('reindexes using the lookback window when there is no previous sync task and the index is available', async () => { + /* + * If lastSyncSuccess is missing we reindex only SOs that were + * created/updated in the last 5 minutes. + */ + const expectedSyncTime = new Date(newAttemptTime.getTime() - 5 * 60 * 1000); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance: { + ...taskInstance, + state: {}, + }, + }); + + const result = await taskRunner.run(); + + expect(esClient.reindex).toBeCalledWith({ + source: { + index: sourceIndex, + /* + * The previous attempt was successful so we will reindex with + * a new time. + * + * SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncAttempt) + */ + query: { + bool: { + must: [ + { + term: { + type: 'cases', + }, + }, + { + bool: { + should: [ + { + range: { + 'cases.created_at': { + gte: expectedSyncTime.toISOString(), + }, + }, + }, + { + range: { + 'cases.updated_at': { + gte: expectedSyncTime.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, + }, + }, + dest: { index: destIndex }, + script: { + id: painlessScriptId, + }, + refresh: true, + wait_for_completion: false, + }); + + expect(result).toEqual({ + state: { + lastSyncSuccess: undefined, + lastSyncAttempt: newAttemptTime, + esReindexTaskId, + }, + }); + }); + + it('returns the previous state if the previous task is still running', async () => { + esClient.tasks.get.mockResolvedValueOnce({ + completed: false, + task: {} as TasksTaskInfo, + }); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + const result = await taskRunner.run(); + + expect(esClient.reindex).not.toBeCalled(); + expect(result).toEqual({ + state: taskInstance.state, + }); + }); + + it('reindexes when the previous sync task failed', async () => { + esClient.tasks.get.mockResolvedValueOnce({ + completed: true, + task: {} as TasksTaskInfo, + error: { type: 'reindex_error', reason: 'Reindex failed' }, + }); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + const result = await taskRunner.run(); + + expect(esClient.reindex).toBeCalledWith({ + source: { + index: sourceIndex, + /* + * The previous attempt was unsuccessful so we will reindex with + * the old lastSyncSuccess. And updated the attempt time. + * + * SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncSuccess) + */ + query: { + bool: { + must: [ + { + term: { + type: 'cases', + }, + }, + { + bool: { + should: [ + { + range: { + 'cases.created_at': { + gte: lastSyncSuccess.toISOString(), + }, + }, + }, + { + range: { + 'cases.updated_at': { + gte: lastSyncSuccess.toISOString(), + }, + }, + }, + ], + }, + }, + ], + }, + }, + }, + dest: { index: destIndex }, + script: { + id: painlessScriptId, + }, + refresh: true, + wait_for_completion: false, + }); + + expect(result).toEqual({ + state: { + // because the previous sync task failed we do not update this value + lastSyncSuccess, + // we set a new value for lastSyncAttempt + lastSyncAttempt: newAttemptTime, + esReindexTaskId, + }, + }); + }); + + describe('Error handling', () => { + it('calls throwRetryableError if the esClient throws a retryable error', async () => { + esClient.tasks.get.mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error')); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + try { + await taskRunner.run(); + } catch (e) { + expect(isRetryableError(e)).toBe(true); + } + + expect(logger.error).toBeCalledWith( + '[.internal.cases] Synchronization reindex failed. Error: My retryable error', + { tags: ['cai-synchronization', 'cai-synchronization-error', '.internal.cases'] } + ); + }); + + it('calls throwUnrecoverableError if execution throws a non-retryable error', async () => { + esClient.tasks.get.mockRejectedValueOnce(new Error('My unrecoverable error')); + + const getESClient = async () => esClient; + + taskRunner = new SynchronizationTaskRunner({ + logger, + getESClient, + taskInstance, + }); + + try { + await taskRunner.run(); + } catch (e) { + expect(isRetryableError(e)).toBe(null); + } + + expect(logger.error).toBeCalledWith( + '[.internal.cases] Synchronization reindex failed. Error: My unrecoverable error', + { tags: ['cai-synchronization', 'cai-synchronization-error', '.internal.cases'] } + ); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.ts new file mode 100644 index 0000000000000..62fcfcf21ca19 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/tasks/synchronization_task/synchronization_task_runner.ts @@ -0,0 +1,300 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/logging'; +import { + createTaskRunError, + TaskErrorSource, + throwRetryableError, + throwUnrecoverableError, + type ConcreteTaskInstance, +} from '@kbn/task-manager-plugin/server'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { CancellableTask } from '@kbn/task-manager-plugin/server/task'; +import type { + IndicesGetMappingResponse, + QueryDslQueryContainer, +} from '@elastic/elasticsearch/lib/api/types'; +import { isRetryableEsClientError } from '../../utils'; +import { SYNCHRONIZATION_QUERIES_DICTIONARY } from '../../constants'; + +interface SynchronizationTaskRunnerFactoryConstructorParams { + taskInstance: ConcreteTaskInstance; + getESClient: () => Promise; + logger: Logger; +} + +interface SynchronizationTaskState { + lastSyncSuccess?: Date | undefined; + lastSyncAttempt?: Date | undefined; + esReindexTaskId?: TaskId | undefined; +} + +enum ReindexStatus { + RUNNING = 'running', + COMPLETED = 'completed', + FAILED = 'failed', + MISSING_TASK_ID = 'missing_task_id', +} + +const LOOKBACK_WINDOW = 5 * 60 * 1000; + +export class SynchronizationTaskRunner implements CancellableTask { + private readonly sourceIndex: string; + private readonly destIndex: string; + private readonly getESClient: () => Promise; + private readonly logger: Logger; + private readonly errorSource = TaskErrorSource.FRAMEWORK; + private readonly esReindexTaskId: TaskId | undefined; + private lastSyncSuccess: Date | undefined; + private lastSyncAttempt: Date | undefined; + + constructor({ + taskInstance, + getESClient, + logger, + }: SynchronizationTaskRunnerFactoryConstructorParams) { + if (taskInstance.state.lastSyncSuccess) + this.lastSyncSuccess = new Date(taskInstance.state.lastSyncSuccess); + if (taskInstance.state.lastSyncAttempt) + this.lastSyncAttempt = new Date(taskInstance.state.lastSyncAttempt); + this.esReindexTaskId = taskInstance.state.esReindexTaskId; + this.sourceIndex = taskInstance.params.sourceIndex; + this.destIndex = taskInstance.params.destIndex; + this.getESClient = getESClient; + this.logger = logger; + } + + public async run() { + const esClient = await this.getESClient(); + + try { + const previousReindexStatus = await this.getPreviousReindexStatus(esClient); + this.logDebug(`Previous synchronization task status: "${previousReindexStatus}".`); + + if (previousReindexStatus === ReindexStatus.RUNNING) { + /* + * If the reindex task is still running we return the + * same state and the next run will cover any missing + * updates. + **/ + return { + state: this.getSyncState() as Record, + }; + } + + if (previousReindexStatus === ReindexStatus.COMPLETED) { + /* + * If the previous reindex task is completed we reindex + * with a new time window. + **/ + await this.isIndexAvailable(esClient); + + this.updateLastSyncTimes({ updateSuccessTime: true }); + + const esReindexTaskId = await this.synchronizeIndex({ esClient }); + + return { + state: { + lastSyncSuccess: this.lastSyncSuccess, + lastSyncAttempt: this.lastSyncAttempt, + esReindexTaskId, + } as Record, + }; + } + + if ( + /* + * A missing task id can only happen if this is + * the first task execution. + **/ + previousReindexStatus === ReindexStatus.MISSING_TASK_ID || + previousReindexStatus === ReindexStatus.FAILED + ) { + await this.isIndexAvailable(esClient); + + /* + * There are two possible scenarios here: + * 1. If the previous task failed (ReindexStatus.FAILED) + * 2. If the state is missing + * + * In both cases we try to reindex without updating lastSyncSuccess. + * This will ensure the reindex query is built with the correct value. + **/ + this.updateLastSyncTimes({ updateSuccessTime: false }); + + const esReindexTaskId = await this.synchronizeIndex({ esClient }); + + return { + state: { + lastSyncSuccess: this.lastSyncSuccess, + lastSyncAttempt: this.lastSyncAttempt, + esReindexTaskId, + } as Record, + }; + } + + throw new Error('Invalid task state.'); + } catch (e) { + if (isRetryableEsClientError(e)) { + throwRetryableError( + createTaskRunError(new Error(this.handleErrorMessage(e.message)), this.errorSource), + true + ); + } + + this.handleErrorMessage(e.message); + throwUnrecoverableError(createTaskRunError(e, this.errorSource)); + } + } + + private updateLastSyncTimes({ updateSuccessTime }: { updateSuccessTime?: boolean }) { + this.logDebug('Updating lastSyncAttempt and lastSyncSuccess before synchronization.'); + + if (updateSuccessTime) { + this.lastSyncSuccess = this.lastSyncAttempt; + } + this.lastSyncAttempt = new Date(); + } + + /** + * This method does a call to elasticsearch that reindexes from this.destIndex + * to this.sourceIndex. The query used takes into account the lastSyncSuccess + * and lastSyncAttempt values in the class. + * + * @returns {SynchronizationTaskState} The updated task state + */ + private async synchronizeIndex({ + esClient, + }: { + esClient: ElasticsearchClient; + }): Promise { + const painlessScript = await this.getPainlessScript(esClient); + + if (painlessScript.found) { + this.logDebug(`Synchronizing with ${this.sourceIndex}.`); + + const sourceQuery = this.buildSourceQuery(); + const reindexResponse = await esClient.reindex({ + source: { + index: this.sourceIndex, + query: sourceQuery, + }, + dest: { index: this.destIndex }, + script: { + id: painlessScript._id, + }, + /** If `true`, the request refreshes affected shards to make this operation visible to search. */ + refresh: true, + /** We do not wait for the es reindex operation to be completed. */ + wait_for_completion: false, + }); + + return reindexResponse.task; + } else { + throw createTaskRunError( + new Error(this.handleErrorMessage('Painless script not found.')), + this.errorSource + ); + } + } + + private async getPreviousReindexStatus(esClient: ElasticsearchClient): Promise { + this.logDebug('Checking previous synchronization task status.'); + + if (!this.esReindexTaskId) { + return ReindexStatus.MISSING_TASK_ID; + } + + const taskResponse = await esClient.tasks.get({ task_id: this.esReindexTaskId.toString() }); + + if (!taskResponse.completed) { + return ReindexStatus.RUNNING; + } + + if (taskResponse.response?.failures?.length || taskResponse?.error) { + return ReindexStatus.FAILED; + } + + return ReindexStatus.COMPLETED; + } + + private buildSourceQuery(): QueryDslQueryContainer { + return SYNCHRONIZATION_QUERIES_DICTIONARY[this.destIndex]( + new Date(this.lastSyncSuccess ? this.lastSyncSuccess : Date.now() - LOOKBACK_WINDOW) + ); + } + + private getSyncState(): SynchronizationTaskState { + return { + lastSyncSuccess: this.lastSyncSuccess, + lastSyncAttempt: this.lastSyncAttempt, + esReindexTaskId: this.esReindexTaskId, + }; + } + + private async getMapping(esClient: ElasticsearchClient): Promise { + this.logDebug('Getting index mapping.'); + + return esClient.indices.getMapping({ + index: this.destIndex, + }); + } + + private async getPainlessScript(esClient: ElasticsearchClient) { + const painlessScriptId = await this.getPainlessScriptId(esClient); + + this.logDebug(`Getting painless script with id: "${painlessScriptId}".`); + + return esClient.getScript({ + id: painlessScriptId, + }); + } + + private async getPainlessScriptId(esClient: ElasticsearchClient): Promise { + const mapping = await this.getMapping(esClient); + const painlessScriptId = mapping[this.destIndex].mappings._meta?.painless_script_id; + + if (!painlessScriptId) { + throw createTaskRunError( + new Error(this.handleErrorMessage('Painless script id missing from mapping.')), + this.errorSource + ); + } + + return painlessScriptId; + } + + private async isIndexAvailable(esClient: ElasticsearchClient) { + this.logDebug('Checking index availability.'); + + return esClient.cluster.health({ + index: this.destIndex, + wait_for_status: 'green', + timeout: '300ms', // this is probably too much + wait_for_active_shards: 'all', + }); + } + + public logDebug(message: string) { + this.logger.debug(`[${this.destIndex}] ${message}`, { + tags: ['cai-synchronization', this.destIndex], + }); + } + + public handleErrorMessage(message: string) { + const errorMessage = `[${this.destIndex}] Synchronization reindex failed. Error: ${message}`; + + this.logger.error(errorMessage, { + tags: ['cai-synchronization', 'cai-synchronization-error', this.destIndex], + }); + + return errorMessage; + } + + public async cancel() {} +} diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.test.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.test.ts new file mode 100644 index 0000000000000..40d82c070f9a8 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.test.ts @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { errors as esErrors } from '@elastic/elasticsearch'; +import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; +import { isRetryableEsClientError } from './utils'; + +describe('isRetryableEsClientError', () => { + describe('returns `false` for', () => { + test('non-retryable response errors', async () => { + const error = new esErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + body: { error: { type: 'cluster_block_exception' } }, + statusCode: 400, + }) + ); + + expect(isRetryableEsClientError(error)).toEqual(false); + }); + }); + + describe('returns `true` for', () => { + it('NoLivingConnectionsError', () => { + const error = new esErrors.NoLivingConnectionsError( + 'reason', + elasticsearchClientMock.createApiResponse() + ); + + expect(isRetryableEsClientError(error)).toEqual(true); + }); + + it('ConnectionError', () => { + const error = new esErrors.ConnectionError( + 'reason', + elasticsearchClientMock.createApiResponse() + ); + expect(isRetryableEsClientError(error)).toEqual(true); + }); + + it('TimeoutError', () => { + const error = new esErrors.TimeoutError( + 'reason', + elasticsearchClientMock.createApiResponse() + ); + expect(isRetryableEsClientError(error)).toEqual(true); + }); + + it('ResponseError of type snapshot_in_progress_exception', () => { + const error = new esErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + body: { error: { type: 'snapshot_in_progress_exception' } }, + }) + ); + expect(isRetryableEsClientError(error)).toEqual(true); + }); + + it.each([503, 504, 401, 403, 408, 410, 429])( + 'ResponseError with %p status code', + (statusCode) => { + const error = new esErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode, + body: { error: { type: 'reason' } }, + }) + ); + + expect(isRetryableEsClientError(error)).toEqual(true); + } + ); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.ts b/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.ts new file mode 100644 index 0000000000000..3e9c6e7fa6b46 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/cases_analytics/utils.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { errors as EsErrors } from '@elastic/elasticsearch'; + +const retryResponseStatuses = [ + 401, // AuthorizationException + 403, // AuthenticationException + 408, // RequestTimeout + 410, // Gone + 429, // TooManyRequests -> ES circuit breaker + 503, // ServiceUnavailable + 504, // GatewayTimeout +]; + +/** + * Returns true if the given elasticsearch error should be retried + * by retry-based resiliency systems such as the SO migration, false otherwise. + */ +export const isRetryableEsClientError = (e: EsErrors.ElasticsearchClientError): boolean => { + if ( + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + (e instanceof EsErrors.ResponseError && + ((e?.statusCode && retryResponseStatuses.includes(e?.statusCode)) || + // ES returns a 400 Bad Request when trying to close or delete an + // index while snapshots are in progress. This should have been a 503 + // so once https://github.com/elastic/elasticsearch/issues/65883 is + // fixed we can remove this. + e?.body?.error?.type === 'snapshot_in_progress_exception')) + ) { + return true; + } + return false; +}; diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/full_jitter_backoff.test.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/full_jitter_backoff.test.ts similarity index 100% rename from x-pack/platform/plugins/shared/cases/server/connectors/cases/full_jitter_backoff.test.ts rename to x-pack/platform/plugins/shared/cases/server/common/retry_service/full_jitter_backoff.test.ts diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/full_jitter_backoff.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/full_jitter_backoff.ts similarity index 100% rename from x-pack/platform/plugins/shared/cases/server/connectors/cases/full_jitter_backoff.ts rename to x-pack/platform/plugins/shared/cases/server/common/retry_service/full_jitter_backoff.ts diff --git a/x-pack/platform/plugins/shared/cases/server/common/retry_service/index.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/index.ts new file mode 100644 index 0000000000000..4adbed0283ba9 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/common/retry_service/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { fullJitterBackoffFactory } from './full_jitter_backoff'; +export { RetryService } from './retry_service'; diff --git a/x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.test.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.test.ts new file mode 100644 index 0000000000000..787f3f60c2704 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.test.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core-logging-browser-mocks'; +import type { Logger } from '@kbn/core/server'; +import { RetryService } from './retry_service'; +import type { BackoffFactory } from './types'; + +class RetryServiceTestClass extends RetryService { + protected isRetryableError(error: Error) { + return true; + } +} + +describe('RetryService', () => { + const nextBackOff = jest.fn(); + const cb = jest.fn(); + + const backOffFactory: BackoffFactory = { + create: () => ({ nextBackOff }), + }; + + const mockLogger = loggingSystemMock.create().get() as jest.Mocked; + + let service: RetryService; + + beforeEach(() => { + jest.clearAllMocks(); + + nextBackOff.mockReturnValue(1); + service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar'); + }); + + it('should not retry after trying more than the max attempts', async () => { + const maxAttempts = 3; + service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', maxAttempts); + + cb.mockRejectedValue(new Error('My transient error')); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My transient error"` + ); + + expect(cb).toBeCalledTimes(maxAttempts + 1); + expect(nextBackOff).toBeCalledTimes(maxAttempts); + }); + + it.each([409, 429, 503])( + 'should retry and succeed retryable status code: %s', + async (statusCode) => { + const maxAttempts = 3; + service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', maxAttempts); + + const error = new Error('My transient error'); + cb.mockRejectedValueOnce(error) + .mockRejectedValueOnce(error) + .mockResolvedValue({ status: 'ok' }); + + const res = await service.retryWithBackoff(cb); + + expect(nextBackOff).toBeCalledTimes(maxAttempts - 1); + expect(cb).toBeCalledTimes(maxAttempts); + expect(res).toEqual({ status: 'ok' }); + } + ); + + it('should succeed if cb does not throw', async () => { + service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar'); + + cb.mockResolvedValue({ status: 'ok' }); + + const res = await service.retryWithBackoff(cb); + + expect(nextBackOff).toBeCalledTimes(0); + expect(cb).toBeCalledTimes(1); + expect(res).toEqual({ status: 'ok' }); + }); + + describe('Logging', () => { + it('should log a warning when retrying', async () => { + service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', 2); + + cb.mockRejectedValue(new Error('My transient error')); + + await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot( + `"My transient error"` + ); + + expect(mockLogger.warn).toBeCalledTimes(2); + expect(mockLogger.warn).toHaveBeenNthCalledWith( + 1, + '[foobar][retryWithBackoff] Failed with error "My transient error". Attempt for retry: 1' + ); + + expect(mockLogger.warn).toHaveBeenNthCalledWith( + 2, + '[foobar][retryWithBackoff] Failed with error "My transient error". Attempt for retry: 2' + ); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.ts similarity index 52% rename from x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.ts rename to x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.ts index 1aba265af20b1..8a758a80048cb 100644 --- a/x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.ts +++ b/x-pack/platform/plugins/shared/cases/server/common/retry_service/retry_service.ts @@ -6,48 +6,44 @@ */ import type { Logger } from '@kbn/core/server'; -import { CasesConnectorError } from './cases_connector_error'; -import type { BackoffStrategy, BackoffFactory } from './types'; +import type { BackoffFactory, BackoffStrategy } from './types'; -export class CaseConnectorRetryService { - private logger: Logger; +export abstract class RetryService { + protected logger: Logger; + protected readonly serviceName: string; private maxAttempts: number; - /** - * 409 - Conflict - * 429 - Too Many Requests - * 503 - ES Unavailable - * - * Full list of errors: src/core/packages/saved-objects/server/src/saved_objects_error_helpers.ts - */ - private readonly RETRY_ERROR_STATUS_CODES: number[] = [409, 429, 503]; private readonly backOffStrategy: BackoffStrategy; private timer: NodeJS.Timeout | null = null; private attempt: number = 0; - constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) { + constructor( + logger: Logger, + backOffFactory: BackoffFactory, + serviceName: string, + maxAttempts: number = 10 + ) { this.logger = logger; this.backOffStrategy = backOffFactory.create(); this.maxAttempts = maxAttempts; + this.serviceName = serviceName; } public async retryWithBackoff(cb: () => Promise): Promise { try { this.logger.debug( - `[CasesConnector][retryWithBackoff] Running case connector. Attempt: ${this.attempt}`, + `[${this.serviceName}][retryWithBackoff] Running. Attempt: ${this.attempt}`, { labels: { attempt: this.attempt }, - tags: ['case-connector:retry-start'], } ); const res = await cb(); this.logger.debug( - `[CasesConnector][retryWithBackoff] Case connector run successfully after ${this.attempt} attempts`, + `[${this.serviceName}][retryWithBackoff] Run successfully after ${this.attempt} attempts.`, { labels: { attempt: this.attempt }, - tags: ['case-connector:retry-success'], } ); @@ -59,9 +55,15 @@ export class CaseConnectorRetryService { await this.delay(); - this.logger.warn( - `[CaseConnector] Case connector failed with status code ${error.statusCode}. Attempt for retry: ${this.attempt}` - ); + if (error.statusCode) { + this.logger.warn( + `[${this.serviceName}][retryWithBackoff] Failed with status code ${error.statusCode}. Attempt for retry: ${this.attempt}` + ); + } else { + this.logger.warn( + `[${this.serviceName}][retryWithBackoff] Failed with error "${error.message}". Attempt for retry: ${this.attempt}` + ); + } return this.retryWithBackoff(cb); } @@ -69,10 +71,9 @@ export class CaseConnectorRetryService { throw error; } finally { this.logger.debug( - `[CasesConnector][retryWithBackoff] Case connector run ended after ${this.attempt} attempts`, + `[${this.serviceName}][retryWithBackoff] Run ended after ${this.attempt} attempts.`, { labels: { attempt: this.attempt }, - tags: ['case-connector:retry-end'], } ); } @@ -82,20 +83,7 @@ export class CaseConnectorRetryService { return this.attempt < this.maxAttempts; } - private isRetryableError(error: Error) { - if ( - error instanceof CasesConnectorError && - this.RETRY_ERROR_STATUS_CODES.includes(error.statusCode) - ) { - return true; - } - - this.logger.debug(`[CasesConnector][isRetryableError] Error is not retryable`, { - tags: ['case-connector:retry-error'], - }); - - return false; - } + protected abstract isRetryableError(error: Error): boolean; private async delay() { const ms = this.backOffStrategy.nextBackOff(); diff --git a/x-pack/platform/plugins/shared/cases/server/common/retry_service/types.ts b/x-pack/platform/plugins/shared/cases/server/common/retry_service/types.ts new file mode 100644 index 0000000000000..ce58b9662a704 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/common/retry_service/types.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export interface BackoffStrategy { + nextBackOff: () => number; +} + +export interface BackoffFactory { + create: () => BackoffStrategy; +} diff --git a/x-pack/platform/plugins/shared/cases/server/config.test.ts b/x-pack/platform/plugins/shared/cases/server/config.test.ts index 352faac983f29..9e652cb52ffe1 100644 --- a/x-pack/platform/plugins/shared/cases/server/config.test.ts +++ b/x-pack/platform/plugins/shared/cases/server/config.test.ts @@ -12,6 +12,7 @@ describe('config validation', () => { it('sets the defaults correctly', () => { expect(ConfigSchema.validate({})).toMatchInlineSnapshot(` Object { + "analytics": Object {}, "files": Object { "allowedMimeTypes": Array [ "image/aces", diff --git a/x-pack/platform/plugins/shared/cases/server/config.ts b/x-pack/platform/plugins/shared/cases/server/config.ts index 7e30671ee4734..b27bb725d209c 100644 --- a/x-pack/platform/plugins/shared/cases/server/config.ts +++ b/x-pack/platform/plugins/shared/cases/server/config.ts @@ -23,6 +23,13 @@ export const ConfigSchema = schema.object({ stack: schema.object({ enabled: schema.boolean({ defaultValue: true }), }), + analytics: schema.object({ + index: schema.maybe( + schema.object({ + enabled: schema.boolean({ defaultValue: true }), + }) + ), + }), }); export type ConfigType = TypeOf; diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.test.ts b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.test.ts index 8eebaba88d769..ef6e139f00bd8 100644 --- a/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.test.ts +++ b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.test.ts @@ -17,10 +17,10 @@ import { CasesOracleService } from './cases_oracle_service'; import { CasesService } from './cases_service'; import { CasesConnectorError } from './cases_connector_error'; import { CaseError } from '../../common/error'; -import { fullJitterBackoffFactory } from './full_jitter_backoff'; +import { fullJitterBackoffFactory } from '../../common/retry_service/full_jitter_backoff'; jest.mock('./cases_connector_executor'); -jest.mock('./full_jitter_backoff'); +jest.mock('../../common/retry_service/full_jitter_backoff'); const CasesConnectorExecutorMock = CasesConnectorExecutor as jest.Mock; const fullJitterBackoffFactoryMock = fullJitterBackoffFactory as jest.Mock; diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.ts b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.ts index 89c133f828b77..e1841f862b78d 100644 --- a/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.ts +++ b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector.ts @@ -22,8 +22,8 @@ import { isCasesConnectorError, } from './cases_connector_error'; import { CasesConnectorExecutor } from './cases_connector_executor'; -import { CaseConnectorRetryService } from './retry_service'; -import { fullJitterBackoffFactory } from './full_jitter_backoff'; +import { CasesConnectorRetryService } from './cases_connector_retry_service'; +import { fullJitterBackoffFactory } from '../../common/retry_service'; import { CASE_RULES_SAVED_OBJECT, CASES_CONNECTOR_SUB_ACTION } from '../../../common/constants'; interface CasesConnectorParams { @@ -43,7 +43,7 @@ export class CasesConnector extends SubActionConnector< CasesConnectorSecrets > { private readonly casesService: CasesService; - private readonly retryService: CaseConnectorRetryService; + private readonly retryService: CasesConnectorRetryService; private readonly casesParams: CasesConnectorParams['casesParams']; constructor({ connectorParams, casesParams }: CasesConnectorParams) { @@ -55,7 +55,7 @@ export class CasesConnector extends SubActionConnector< * We should wait at least 5ms before retrying and no more that 2sec */ const backOffFactory = fullJitterBackoffFactory({ baseDelay: 5, maxBackoffTime: 2000 }); - this.retryService = new CaseConnectorRetryService(this.logger, backOffFactory); + this.retryService = new CasesConnectorRetryService(this.logger, backOffFactory); this.casesParams = casesParams; diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.test.ts b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.test.ts similarity index 81% rename from x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.test.ts rename to x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.test.ts index c3a8d419d1557..eabf0e0ef4ce9 100644 --- a/x-pack/platform/plugins/shared/cases/server/connectors/cases/retry_service.test.ts +++ b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.test.ts @@ -5,13 +5,14 @@ * 2.0. */ -import { loggingSystemMock } from '@kbn/core-logging-browser-mocks'; import type { Logger } from '@kbn/core/server'; +import { loggingSystemMock } from '@kbn/core-logging-browser-mocks'; + +import type { BackoffFactory } from '../../common/retry_service/types'; import { CasesConnectorError } from './cases_connector_error'; -import { CaseConnectorRetryService } from './retry_service'; -import type { BackoffFactory } from './types'; +import { CasesConnectorRetryService } from './cases_connector_retry_service'; -describe('CryptoService', () => { +describe('CasesConnectorRetryService', () => { const nextBackOff = jest.fn(); const cb = jest.fn(); @@ -21,13 +22,13 @@ describe('CryptoService', () => { const mockLogger = loggingSystemMock.create().get() as jest.Mocked; - let service: CaseConnectorRetryService; + let service: CasesConnectorRetryService; beforeEach(() => { jest.clearAllMocks(); nextBackOff.mockReturnValue(1); - service = new CaseConnectorRetryService(mockLogger, backOffFactory); + service = new CasesConnectorRetryService(mockLogger, backOffFactory); }); it('should not retry if the error is not CasesConnectorError', async () => { @@ -54,7 +55,7 @@ describe('CryptoService', () => { it('should not retry after trying more than the max attempts', async () => { const maxAttempts = 3; - service = new CaseConnectorRetryService(mockLogger, backOffFactory, maxAttempts); + service = new CasesConnectorRetryService(mockLogger, backOffFactory, maxAttempts); cb.mockRejectedValue(new CasesConnectorError('My transient error', 409)); @@ -70,7 +71,7 @@ describe('CryptoService', () => { 'should retry and succeed retryable status code: %s', async (statusCode) => { const maxAttempts = 3; - service = new CaseConnectorRetryService(mockLogger, backOffFactory, maxAttempts); + service = new CasesConnectorRetryService(mockLogger, backOffFactory, maxAttempts); const error = new CasesConnectorError('My transient error', statusCode); cb.mockRejectedValueOnce(error) @@ -86,7 +87,7 @@ describe('CryptoService', () => { ); it('should succeed if cb does not throw', async () => { - service = new CaseConnectorRetryService(mockLogger, backOffFactory); + service = new CasesConnectorRetryService(mockLogger, backOffFactory); cb.mockResolvedValue({ status: 'ok' }); @@ -99,7 +100,7 @@ describe('CryptoService', () => { describe('Logging', () => { it('should log a warning when retrying', async () => { - service = new CaseConnectorRetryService(mockLogger, backOffFactory, 2); + service = new CasesConnectorRetryService(mockLogger, backOffFactory, 2); cb.mockRejectedValue(new CasesConnectorError('My transient error', 409)); @@ -110,12 +111,12 @@ describe('CryptoService', () => { expect(mockLogger.warn).toBeCalledTimes(2); expect(mockLogger.warn).toHaveBeenNthCalledWith( 1, - '[CaseConnector] Case connector failed with status code 409. Attempt for retry: 1' + '[CasesConnector][retryWithBackoff] Failed with status code 409. Attempt for retry: 1' ); expect(mockLogger.warn).toHaveBeenNthCalledWith( 2, - '[CaseConnector] Case connector failed with status code 409. Attempt for retry: 2' + '[CasesConnector][retryWithBackoff] Failed with status code 409. Attempt for retry: 2' ); }); diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.ts b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.ts new file mode 100644 index 0000000000000..897e370c76b63 --- /dev/null +++ b/x-pack/platform/plugins/shared/cases/server/connectors/cases/cases_connector_retry_service.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { Logger } from '@kbn/core/server'; +import type { BackoffFactory } from '../../common/retry_service/types'; + +import { CasesConnectorError } from './cases_connector_error'; +import { RetryService } from '../../common/retry_service'; + +export class CasesConnectorRetryService extends RetryService { + /** + * 409 - Conflict + * 429 - Too Many Requests + * 503 - ES Unavailable + * + * Full list of errors: src/core/packages/saved-objects/server/src/saved_objects_error_helpers.ts + */ + private readonly RETRY_ERROR_STATUS_CODES: number[] = [409, 429, 503]; + + constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) { + super(logger, backOffFactory, 'CasesConnector', maxAttempts); + } + + protected isRetryableError(error: Error) { + if ( + error instanceof CasesConnectorError && + this.RETRY_ERROR_STATUS_CODES.includes(error.statusCode) + ) { + return true; + } + + this.logger.debug(`[CasesConnector][isRetryableError] Error is not retryable`, { + tags: ['case-connector:retry-error'], + }); + + return false; + } +} diff --git a/x-pack/platform/plugins/shared/cases/server/connectors/cases/types.ts b/x-pack/platform/plugins/shared/cases/server/connectors/cases/types.ts index 41b380cc7f1ff..0443c32444093 100644 --- a/x-pack/platform/plugins/shared/cases/server/connectors/cases/types.ts +++ b/x-pack/platform/plugins/shared/cases/server/connectors/cases/types.ts @@ -79,13 +79,5 @@ export type BulkUpdateOracleRecordRequest = Array<{ payload: Pick; }>; -export interface BackoffStrategy { - nextBackOff: () => number; -} - -export interface BackoffFactory { - create: () => BackoffStrategy; -} - export type CasesConnectorRuleActionParams = TypeOf; export type CasesConnectorParams = TypeOf; diff --git a/x-pack/platform/plugins/shared/cases/server/plugin.test.ts b/x-pack/platform/plugins/shared/cases/server/plugin.test.ts index ac328f25de391..2ec89e323ab07 100644 --- a/x-pack/platform/plugins/shared/cases/server/plugin.test.ts +++ b/x-pack/platform/plugins/shared/cases/server/plugin.test.ts @@ -28,6 +28,7 @@ function getConfig(overrides = {}) { markdownPlugins: { lens: true }, files: { maxSize: 1, allowedMimeTypes: ALLOWED_MIME_TYPES }, stack: { enabled: true }, + analytics: {}, ...overrides, }; } @@ -74,6 +75,7 @@ describe('Cases Plugin', () => { security: securityMock.createStart(), notifications: notificationsMock.createStart(), ruleRegistry: { getRacClientWithRequest: jest.fn(), alerting: alertsMock.createStart() }, + taskManager: taskManagerMock.createStart(), }; }); diff --git a/x-pack/platform/plugins/shared/cases/server/plugin.ts b/x-pack/platform/plugins/shared/cases/server/plugin.ts index f7f00b3c49b6c..f642b77b47390 100644 --- a/x-pack/platform/plugins/shared/cases/server/plugin.ts +++ b/x-pack/platform/plugins/shared/cases/server/plugin.ts @@ -47,6 +47,11 @@ import { registerCaseFileKinds } from './files'; import type { ConfigType } from './config'; import { registerConnectorTypes } from './connectors'; import { registerSavedObjects } from './saved_object_types'; +import { + createCasesAnalyticsIndexes, + registerCasesAnalyticsIndexesTasks, + scheduleCasesAnalyticsSyncTasks, +} from './cases_analytics'; export class CasePlugin implements @@ -66,6 +71,7 @@ export class CasePlugin private persistableStateAttachmentTypeRegistry: PersistableStateAttachmentTypeRegistry; private externalReferenceAttachmentTypeRegistry: ExternalReferenceAttachmentTypeRegistry; private userProfileService: UserProfileService; + private readonly isServerless: boolean; constructor(private readonly initializerContext: PluginInitializerContext) { this.caseConfig = initializerContext.config.get(); @@ -75,9 +81,13 @@ export class CasePlugin this.persistableStateAttachmentTypeRegistry = new PersistableStateAttachmentTypeRegistry(); this.externalReferenceAttachmentTypeRegistry = new ExternalReferenceAttachmentTypeRegistry(); this.userProfileService = new UserProfileService(this.logger); + this.isServerless = initializerContext.env.packageInfo.buildFlavor === 'serverless'; } - public setup(core: CoreSetup, plugins: CasesServerSetupDependencies): CasesServerSetup { + public setup( + core: CoreSetup, + plugins: CasesServerSetupDependencies + ): CasesServerSetup { this.logger.debug( `Setting up Case Workflow with core contract [${Object.keys( core @@ -90,6 +100,11 @@ export class CasePlugin ); registerCaseFileKinds(this.caseConfig.files, plugins.files, core.security.fips.isEnabled()); + registerCasesAnalyticsIndexesTasks({ + taskManager: plugins.taskManager, + logger: this.logger, + core, + }); this.securityPluginSetup = plugins.security; this.lensEmbeddableFactory = plugins.lens.lensEmbeddableFactory; @@ -188,6 +203,15 @@ export class CasePlugin if (plugins.taskManager) { scheduleCasesTelemetryTask(plugins.taskManager, this.logger); + if (this.caseConfig.analytics.index?.enabled) { + scheduleCasesAnalyticsSyncTasks({ taskManager: plugins.taskManager, logger: this.logger }); + createCasesAnalyticsIndexes({ + esClient: core.elasticsearch.client.asInternalUser, + logger: this.logger, + isServerless: this.isServerless, + taskManager: plugins.taskManager, + }).catch(() => {}); // it shouldn't reject, but just in case + } } this.userProfileService.initialize({ diff --git a/x-pack/platform/plugins/shared/cases/server/types.ts b/x-pack/platform/plugins/shared/cases/server/types.ts index 8606808e1c183..45563fa39757f 100644 --- a/x-pack/platform/plugins/shared/cases/server/types.ts +++ b/x-pack/platform/plugins/shared/cases/server/types.ts @@ -44,7 +44,7 @@ export interface CasesServerSetupDependencies { files: FilesSetup; security: SecurityPluginSetup; licensing: LicensingPluginSetup; - taskManager?: TaskManagerSetupContract; + taskManager: TaskManagerSetupContract; usageCollection?: UsageCollectionSetup; spaces?: SpacesPluginSetup; cloud?: CloudSetup; @@ -55,7 +55,7 @@ export interface CasesServerStartDependencies { features: FeaturesPluginStart; files: FilesStart; licensing: LicensingPluginStart; - taskManager?: TaskManagerStartContract; + taskManager: TaskManagerStartContract; security: SecurityPluginStart; spaces?: SpacesPluginStart; notifications: NotificationsPluginStart; diff --git a/x-pack/platform/plugins/shared/cases/tsconfig.json b/x-pack/platform/plugins/shared/cases/tsconfig.json index 4486d3447ee74..1b7ab9b99d150 100644 --- a/x-pack/platform/plugins/shared/cases/tsconfig.json +++ b/x-pack/platform/plugins/shared/cases/tsconfig.json @@ -83,6 +83,8 @@ "@kbn/code-editor-mock", "@kbn/monaco", "@kbn/code-editor", + "@kbn/logging", + "@kbn/core-elasticsearch-client-server-mocks", "@kbn/core-test-helpers-model-versions", ], "exclude": [ diff --git a/x-pack/platform/plugins/shared/task_manager/server/mocks.ts b/x-pack/platform/plugins/shared/task_manager/server/mocks.ts index 756f672c658e2..b800cb2c9af61 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/mocks.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/mocks.ts @@ -30,7 +30,7 @@ const createStartMock = () => { schedule: jest.fn(), runSoon: jest.fn(), ephemeralRunNow: jest.fn(), - ensureScheduled: jest.fn(), + ensureScheduled: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()` removeIfExists: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()` supportsEphemeralTasks: jest.fn(), bulkUpdateSchedules: jest.fn(), diff --git a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index 672bb99b3b76f..de230f81daed3 100644 --- a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -144,6 +144,8 @@ export default function ({ getService }: FtrProviderContext) { 'alerts_invalidate_api_keys', 'apm-source-map-migration-task', 'apm-telemetry-task', + 'cai:cases_analytics_index_backfill', + 'cai:cases_analytics_index_synchronization', 'cases-telemetry-task', 'cloud_security_posture-stats_task', 'dashboard_telemetry',