diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d7f97d0646f7c..73a0347de7cd8 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -596,6 +596,7 @@ x-pack/solutions/security/plugins/lists @elastic/security-detection-engine x-pack/platform/plugins/shared/ai_infra/llm_tasks @elastic/appex-ai-infra examples/locator_examples @elastic/appex-sharedux examples/locator_explorer @elastic/appex-sharedux +packages/kbn-lock-manager @elastic/obs-ai-assistant src/platform/packages/shared/kbn-logging @elastic/kibana-core src/platform/packages/shared/kbn-logging-mocks @elastic/kibana-core x-pack/platform/plugins/shared/logs_data_access @elastic/obs-ux-logs-team diff --git a/package.json b/package.json index cd9cfbefd0df6..9293e499d82bb 100644 --- a/package.json +++ b/package.json @@ -632,6 +632,7 @@ "@kbn/llm-tasks-plugin": "link:x-pack/platform/plugins/shared/ai_infra/llm_tasks", "@kbn/locator-examples-plugin": "link:examples/locator_examples", "@kbn/locator-explorer-plugin": "link:examples/locator_explorer", + "@kbn/lock-manager": "link:packages/kbn-lock-manager", "@kbn/logging": "link:src/platform/packages/shared/kbn-logging", "@kbn/logging-mocks": "link:src/platform/packages/shared/kbn-logging-mocks", "@kbn/logs-data-access-plugin": "link:x-pack/platform/plugins/shared/logs_data_access", diff --git a/packages/kbn-lock-manager/README.md b/packages/kbn-lock-manager/README.md new file mode 100644 index 0000000000000..7d6e5886ccc6d --- /dev/null +++ b/packages/kbn-lock-manager/README.md @@ -0,0 +1,54 @@ +# Kibana Lock Manager + +A simple, distributed lock manager built on top of Elasticsearch. +Ensures that only one process at a time can hold a named lock, with automatic lease renewal and token fencing for safe release. + +# API Documentation + +## `withLock(lockId, callback, options)` + +Acquires a lock and executes the provided callback. If the lock is already held by another process, the method will throw a `LockAcquisitionError` and the callback will not be executed. When the callback returns the lock is released. + +### Parameters + +- **`lockId`** (`string`): Unique identifier for the lock + +- **`callback`** (`() => Promise`): Asynchronous function to execute once the lock is acquired. This function will be executed only if the lock acquisition succeeds. + +- **`options`** (`object`, optional): Additional configuration options. + - **`metadata`** (`Record`, optional): Custom metadata to store with the lock. + +## Example + +```ts +import { LockManagerService, LockAcquisitionError } from '@kbn/lock-manager'; + + +async function reIndexWithLock() { + // Attempt to acquire "my_lock"; if successful, runs the callback. + const lmService = new LockManagerService(coreSetup, logger); + return lmService.withLock('my_lock', async () => { + // …perform your exclusive operation here… + }); +} + +reIndexWithLock().catch((err) => { + if (err instanceof LockAcquisitionError) { + logger.debug('Re-index already in progress, skipping.'); + return; + } + logger.error(`Failed to re-index: ${err.message}`); +}); +``` + +## How It Works +**Atomic Acquire** +Performs one atomic Elasticsearch update that creates a new lock or renews an existing one - so if multiple processes race for the same lock, only one succeeds. + +**TTL-Based Lease** +Each lock has a short, fixed lifespan (default 30s) and will automatically expire if not renewed. While the callback is executing, the lock will automatically extend the TTL to keep the lock active. This safeguards against deadlocks because if a Kibana node crashes after having obtained a lock it will automatically be released after 30 seconds. + +Note: If Kibana node crashes, another process could acquire the same lock and start that task again when the lock automatically expires. To prevent your operation from running multiple times, include an application-level check (for example, querying Elasticsearch or your own status flag) to verify the operation isn’t already in progress before proceeding. + +**Token Fencing** +Each lock operation carries a unique token. Only the process with the matching token can extend or release the lock, preventing stale holders from interfering. \ No newline at end of file diff --git a/packages/kbn-lock-manager/index.ts b/packages/kbn-lock-manager/index.ts new file mode 100644 index 0000000000000..1ce7a5cc9aa22 --- /dev/null +++ b/packages/kbn-lock-manager/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +export { LockAcquisitionError } from './src/lock_manager_client'; +export { LockManagerService } from './src/lock_manager_service'; diff --git a/packages/kbn-lock-manager/jest.config.js b/packages/kbn-lock-manager/jest.config.js new file mode 100644 index 0000000000000..8dabddd8b1618 --- /dev/null +++ b/packages/kbn-lock-manager/jest.config.js @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +module.exports = { + preset: '@kbn/test/jest_node', + rootDir: '../..', + roots: ['/packages/kbn-lock-manager'], +}; diff --git a/packages/kbn-lock-manager/kibana.jsonc b/packages/kbn-lock-manager/kibana.jsonc new file mode 100644 index 0000000000000..949af71daec55 --- /dev/null +++ b/packages/kbn-lock-manager/kibana.jsonc @@ -0,0 +1,6 @@ +{ + "type": "shared-server", + "id": "@kbn/lock-manager", + "owner": ["@elastic/obs-ai-assistant"], + "devOnly": false +} diff --git a/packages/kbn-lock-manager/package.json b/packages/kbn-lock-manager/package.json new file mode 100644 index 0000000000000..254b94959770a --- /dev/null +++ b/packages/kbn-lock-manager/package.json @@ -0,0 +1,6 @@ +{ + "name": "@kbn/lock-manager", + "private": true, + "version": "1.0.0", + "license": "Elastic License 2.0 OR AGPL-3.0-only OR SSPL-1.0" +} \ No newline at end of file diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts b/packages/kbn-lock-manager/src/lock_manager_client.ts similarity index 53% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts rename to packages/kbn-lock-manager/src/lock_manager_client.ts index 8fedfd81d978e..ebc0a8f2c7e7f 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts +++ b/packages/kbn-lock-manager/src/lock_manager_client.ts @@ -1,22 +1,21 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ // eslint-disable-next-line max-classes-per-file import { errors } from '@elastic/elasticsearch'; import { Logger } from '@kbn/logging'; import { v4 as uuid } from 'uuid'; -import pRetry from 'p-retry'; import prettyMilliseconds from 'pretty-ms'; import { once } from 'lodash'; import { duration } from 'moment'; import { ElasticsearchClient } from '@kbn/core/server'; - -export const LOCKS_INDEX_ALIAS = '.kibana_locks'; -export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`; +import { LOCKS_CONCRETE_INDEX_NAME, setupLockManagerIndex } from './setup_lock_manager_index'; export type LockId = string; export interface LockDocument { @@ -33,13 +32,18 @@ export interface AcquireOptions { */ metadata?: Record; /** - * Time to live (TTL) for the lock in milliseconds. Default is 5 minutes. + * Time to live (TTL) for the lock in milliseconds. Default is 30 seconds. * When a lock expires it can be acquired by another process */ ttl?: number; } -const createLocksWriteIndexOnce = once(createLocksWriteIndex); +// The index assets should only be set up once +// For testing purposes, we need to be able to set it up every time +let runSetupIndexAssetOnce = once(setupLockManagerIndex); +export function runSetupIndexAssetEveryTime() { + runSetupIndexAssetOnce = setupLockManagerIndex; +} export class LockManager { private token = uuid(); @@ -56,165 +60,168 @@ export class LockManager { */ public async acquire({ metadata = {}, - ttl = duration(5, 'minutes').asMilliseconds(), + ttl = duration(30, 'seconds').asMilliseconds(), }: AcquireOptions = {}): Promise { - await createLocksWriteIndexOnce(this.esClient); + let response: Awaited>; + + await runSetupIndexAssetOnce(this.esClient, this.logger); this.token = uuid(); - this.logger.debug( - `Acquiring lock "${this.lockId}" with ttl = ${prettyMilliseconds(ttl)} and token = ${ - this.token - }` - ); try { - const response = await this.esClient.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: this.lockId, - scripted_upsert: true, - script: { - lang: 'painless', - source: ` + response = await this.esClient.update( + { + index: LOCKS_CONCRETE_INDEX_NAME, + id: this.lockId, + scripted_upsert: true, + script: { + lang: 'painless', + source: ` // Get the current time on the ES server. long now = System.currentTimeMillis(); - // If creating the document or if the lock is expired, update it. - if (ctx.op == 'create' || Instant.parse(ctx._source.expiresAt).toEpochMilli() < now) { + // If creating the document, or if the lock is expired, + // or if the current document is owned by the same token, then update it. + if (ctx.op == 'create' || + Instant.parse(ctx._source.expiresAt).toEpochMilli() < now || + ctx._source.token == params.token) { def instantNow = Instant.ofEpochMilli(now); ctx._source.createdAt = instantNow.toString(); ctx._source.expiresAt = instantNow.plusMillis(params.ttl).toString(); } else { - ctx.op = 'noop' + ctx.op = 'noop'; } `, - params: { - ttl, + params: { + ttl, + token: this.token, + }, + }, + // @ts-expect-error + upsert: { + metadata, + token: this.token, }, }, - // @ts-expect-error - upsert: { - metadata, - token: this.token, - }, - refresh: true, - }); + { + retryOnTimeout: true, + maxRetries: 3, + } + ); + } catch (e) { + if (isVersionConflictException(e)) { + this.logger.debug(`Lock "${this.lockId}" already held (version conflict)`); + return false; + } + + this.logger.error(`Failed to acquire lock "${this.lockId}": ${e.message}`); + return false; + } - if (response.result === 'created') { + switch (response.result) { + case 'created': { this.logger.debug( - `Lock "${this.lockId}" acquired with ttl = ${prettyMilliseconds(ttl)} and token = ${ + `Lock "${this.lockId}" with token = ${ this.token - }` + } acquired with ttl = ${prettyMilliseconds(ttl)}` ); return true; - } else if (response.result === 'updated') { + } + case 'updated': { this.logger.debug( - ` Lock "${this.lockId}" was expired and re-acquired with ttl = ${prettyMilliseconds( + `Lock "${this.lockId}" was expired and re-acquired with ttl = ${prettyMilliseconds( ttl )} and token = ${this.token}` ); return true; - } else if (response.result === 'noop') { + } + case 'noop': { this.logger.debug( - `Lock "${this.lockId}" could not be acquired with token ${this.token} because it is already held` + `Lock "${this.lockId}" with token = ${this.token} could not be acquired. It is already held` ); return false; - } else { - throw new Error(`Unexpected response: ${response.result}`); } - } catch (e) { - if (isVersionConflictException(e)) { - this.logger.debug(`Lock "${this.lockId}" already held (version conflict)`); - return false; - } - - this.logger.error(`Failed to acquire lock "${this.lockId}": ${e.message}`); - this.logger.debug(e); - return false; } + + this.logger.warn(`Unexpected response: ${response.result}`); + return false; } /** * Releases the lock by deleting the document with the given lockId and token */ public async release(): Promise { + let response: Awaited>; try { - const response = await this.esClient.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: this.lockId, - scripted_upsert: false, - script: { - lang: 'painless', - source: ` + response = await this.esClient.update( + { + index: LOCKS_CONCRETE_INDEX_NAME, + id: this.lockId, + scripted_upsert: false, + script: { + lang: 'painless', + source: ` if (ctx._source.token == params.token) { ctx.op = 'delete'; } else { ctx.op = 'noop'; } `, - params: { token: this.token }, + params: { token: this.token }, + }, }, - refresh: true, - }); - - if (response.result === 'deleted') { - this.logger.debug(`Lock "${this.lockId}" released with token ${this.token}.`); - return true; - } else if (response.result === 'noop') { - this.logger.debug( - `Lock "${this.lockId}" was not released. Token ${this.token} does not match.` - ); - return false; - } else { - throw new Error(`Unexpected response: ${response.result}`); - } + { + retryOnTimeout: true, + maxRetries: 3, + } + ); } catch (error: any) { - if ( - error instanceof errors.ResponseError && - error.body?.error?.type === 'document_missing_exception' - ) { + if (isDocumentMissingException(error)) { this.logger.debug(`Lock "${this.lockId}" already released.`); return false; } this.logger.error(`Failed to release lock "${this.lockId}": ${error.message}`); - this.logger.debug(error); - return false; + throw error; + } + + switch (response.result) { + case 'deleted': + this.logger.debug(`Lock "${this.lockId}" released with token ${this.token}.`); + return true; + case 'noop': + this.logger.debug( + `Lock "${this.lockId}" with token = ${this.token} could not be released. Token does not match.` + ); + return false; } + + this.logger.warn(`Unexpected response: ${response.result}`); + return false; } /** * Retrieves the lock document for a given lockId. - * If the lock is expired, it will be released. + * If the lock is expired, it will not be returned */ public async get(): Promise { - const result = await this.esClient.search({ - index: LOCKS_CONCRETE_INDEX_NAME, - query: { - bool: { - filter: [{ term: { _id: this.lockId } }, { range: { expiresAt: { gt: 'now' } } }], - }, - }, - }); + const result = await this.esClient.get( + { index: LOCKS_CONCRETE_INDEX_NAME, id: this.lockId }, + { ignore: [404] } + ); - const hits = result.hits.hits; - return hits[0]?._source; - } + if (!result._source) { + return undefined; + } - public async acquireWithRetry({ - metadata, - ttl, - ...retryOptions - }: AcquireOptions & pRetry.Options = {}): Promise { - return pRetry(async () => { - const acquired = await this.acquire({ metadata, ttl }); - if (!acquired) { - this.logger.debug(`Lock "${this.lockId}" not available yet.`); - throw new Error(`Lock "${this.lockId}" not available yet`); - } - return acquired; - }, retryOptions ?? { forever: true, maxTimeout: 10_000 }); + const isExpired = new Date(result._source?.expiresAt).getTime() < Date.now(); + if (isExpired) { + return undefined; + } + + return result._source; } - public async extendTtl(ttl = 300000): Promise { + public async extendTtl(ttl: number): Promise { try { await this.esClient.update({ index: LOCKS_CONCRETE_INDEX_NAME, @@ -233,12 +240,11 @@ export class LockManager { token: this.token, }, }, - refresh: true, }); this.logger.debug(`Lock "${this.lockId}" extended ttl with ${prettyMilliseconds(ttl)}.`); return true; } catch (error) { - if (isVersionConflictException(error)) { + if (isVersionConflictException(error) || isDocumentMissingException(error)) { this.logger.debug(`Lock "${this.lockId}" was released concurrently. Not extending TTL.`); return false; } @@ -256,23 +262,16 @@ export async function withLock( logger, lockId, metadata, - ttl = duration(5, 'minutes').asMilliseconds(), - waitForLock = false, - retryOptions, + ttl = duration(30, 'seconds').asMilliseconds(), }: { esClient: ElasticsearchClient; logger: Logger; lockId: LockId; - waitForLock?: boolean; - retryOptions?: pRetry.Options; } & AcquireOptions, callback: () => Promise -): Promise { +): Promise { const lockManager = new LockManager(lockId, esClient, logger); - const acquired = - waitForLock ?? retryOptions - ? await lockManager.acquireWithRetry({ metadata, ttl, ...retryOptions }) - : await lockManager.acquire({ metadata, ttl }); + const acquired = await lockManager.acquire({ metadata, ttl }); if (!acquired) { logger.debug(`Lock "${lockId}" not acquired. Exiting.`); @@ -280,7 +279,7 @@ export async function withLock( } // extend the ttl periodically - const extendInterval = Math.floor(ttl / 2); + const extendInterval = Math.floor(ttl / 4); logger.debug( `Lock "${lockId}" acquired. Extending TTL every ${prettyMilliseconds(extendInterval)}` ); @@ -289,7 +288,7 @@ export async function withLock( const intervalId = setInterval(() => { // wait for the previous extendTtl request to finish before sending the next one. This is to avoid flooding ES with extendTtl requests in cases where ES is slow to respond. extendTTlPromise = extendTTlPromise - .then(() => lockManager.extendTtl()) + .then(() => lockManager.extendTtl(ttl)) .catch((err) => { logger.error(`Failed to extend lock "${lockId}":`, err); return false; @@ -305,55 +304,21 @@ export async function withLock( await lockManager.release(); } catch (error) { logger.error(`Failed to release lock "${lockId}" in withLock: ${error.message}`); + logger.debug(error); } } } -export async function ensureTemplatesAndIndexCreated(esClient: ElasticsearchClient): Promise { - const COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`; - const INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`; - const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; - - await esClient.cluster.putComponentTemplate({ - name: COMPONENT_TEMPLATE_NAME, - template: { - mappings: { - dynamic: false, - properties: { - token: { type: 'keyword' }, - metadata: { enabled: false }, - createdAt: { type: 'date' }, - expiresAt: { type: 'date' }, - }, - }, - }, - }); - - await esClient.indices.putIndexTemplate({ - name: INDEX_TEMPLATE_NAME, - index_patterns: [INDEX_PATTERN], - composed_of: [COMPONENT_TEMPLATE_NAME], - priority: 500, - template: { - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - hidden: true, - }, - }, - }); -} - -async function createLocksWriteIndex(esClient: ElasticsearchClient): Promise { - await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] }); -} - function isVersionConflictException(e: Error): boolean { return ( e instanceof errors.ResponseError && e.body?.error?.type === 'version_conflict_engine_exception' ); } +function isDocumentMissingException(e: Error): boolean { + return e instanceof errors.ResponseError && e.body?.error?.type === 'document_missing_exception'; +} + export class LockAcquisitionError extends Error { constructor(message: string) { super(message); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts b/packages/kbn-lock-manager/src/lock_manager_service.ts similarity index 69% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts rename to packages/kbn-lock-manager/src/lock_manager_service.ts index a293083e4361a..b7c03a43fd47c 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts +++ b/packages/kbn-lock-manager/src/lock_manager_service.ts @@ -1,8 +1,10 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ import { CoreSetup, Logger } from '@kbn/core/server'; @@ -27,16 +29,14 @@ export class LockManagerService { callback: () => Promise, { metadata, - waitForLock, }: { metadata?: Record; - waitForLock?: boolean; } = {} ) { const [coreStart] = await this.coreSetup.getStartServices(); const esClient = coreStart.elasticsearch.client.asInternalUser; const logger = this.logger.get('LockManager'); - return withLock({ esClient, logger, lockId, metadata, waitForLock }, callback); + return withLock({ esClient, logger, lockId, metadata }, callback); } } diff --git a/packages/kbn-lock-manager/src/setup_lock_manager_index.ts b/packages/kbn-lock-manager/src/setup_lock_manager_index.ts new file mode 100644 index 0000000000000..626f5d9ec1546 --- /dev/null +++ b/packages/kbn-lock-manager/src/setup_lock_manager_index.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { errors } from '@elastic/elasticsearch'; +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types'; + +const LOCKS_INDEX_ALIAS = '.kibana_locks'; +const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; +export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`; +export const LOCKS_COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`; +export const LOCKS_INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`; + +export async function removeLockIndexWithIncorrectMappings( + esClient: ElasticsearchClient, + logger: Logger +) { + let res: IndicesGetMappingResponse; + try { + res = await esClient.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME }); + } catch (error) { + const isNotFoundError = error instanceof errors.ResponseError && error.statusCode === 404; + if (!isNotFoundError) { + logger.error( + `Failed to get mapping for lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}` + ); + } + + return; + } + + const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME]; + const hasIncorrectMappings = + mappings.properties?.token?.type !== 'keyword' || + mappings.properties?.expiresAt?.type !== 'date'; + + if (hasIncorrectMappings) { + logger.warn(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" has incorrect mappings.`); + try { + await esClient.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME }); + logger.info(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" removed successfully.`); + } catch (error) { + logger.error(`Failed to remove lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`); + } + } +} + +export async function ensureTemplatesAndIndexCreated( + esClient: ElasticsearchClient, + logger: Logger +): Promise { + await esClient.cluster.putComponentTemplate({ + name: LOCKS_COMPONENT_TEMPLATE_NAME, + template: { + mappings: { + dynamic: false, + properties: { + token: { type: 'keyword' }, + metadata: { enabled: false }, + createdAt: { type: 'date' }, + expiresAt: { type: 'date' }, + }, + }, + }, + }); + logger.info( + `Component template ${LOCKS_COMPONENT_TEMPLATE_NAME} created or updated successfully.` + ); + + await esClient.indices.putIndexTemplate({ + name: LOCKS_INDEX_TEMPLATE_NAME, + index_patterns: [INDEX_PATTERN], + composed_of: [LOCKS_COMPONENT_TEMPLATE_NAME], + priority: 500, + template: { + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + hidden: true, + }, + }, + }); + logger.info(`Index template ${LOCKS_INDEX_TEMPLATE_NAME} created or updated successfully.`); + + try { + await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }); + logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created successfully.`); + } catch (error) { + const isIndexAlreadyExistsError = + error instanceof errors.ResponseError && + error.body.error.type === 'resource_already_exists_exception'; + + if (isIndexAlreadyExistsError) { + logger.debug(`Index ${LOCKS_CONCRETE_INDEX_NAME} already exists. Skipping creation.`); + return; + } + + logger.error(`Unable to create index ${LOCKS_CONCRETE_INDEX_NAME}: ${error.message}`); + throw error; + } +} + +export async function setupLockManagerIndex(esClient: ElasticsearchClient, logger: Logger) { + await removeLockIndexWithIncorrectMappings(esClient, logger); // TODO: should be removed in the future (after 9.1). See https://github.com/elastic/kibana/issues/218944 + await ensureTemplatesAndIndexCreated(esClient, logger); +} diff --git a/packages/kbn-lock-manager/tsconfig.json b/packages/kbn-lock-manager/tsconfig.json new file mode 100644 index 0000000000000..b854afaadbd5b --- /dev/null +++ b/packages/kbn-lock-manager/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "target/types", + "types": [ + "jest", + "node" + ] + }, + "include": [ + "**/*.ts" + ], + "exclude": [ + "target/**/*" + ], + "kbn_references": [ + "@kbn/logging", + "@kbn/core", + ] +} diff --git a/tsconfig.base.json b/tsconfig.base.json index 5257f2804863c..308138590dfba 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1186,6 +1186,8 @@ "@kbn/locator-examples-plugin/*": ["examples/locator_examples/*"], "@kbn/locator-explorer-plugin": ["examples/locator_explorer"], "@kbn/locator-explorer-plugin/*": ["examples/locator_explorer/*"], + "@kbn/lock-manager": ["packages/kbn-lock-manager"], + "@kbn/lock-manager/*": ["packages/kbn-lock-manager/*"], "@kbn/logging": ["src/platform/packages/shared/kbn-logging"], "@kbn/logging/*": ["src/platform/packages/shared/kbn-logging/*"], "@kbn/logging-mocks": ["src/platform/packages/shared/kbn-logging-mocks"], diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts index 54ef970bd2dcc..8e96245d2a097 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts @@ -31,8 +31,8 @@ import { registerFunctions } from './functions'; import { recallRankingEvent } from './analytics/recall_ranking'; import { initLangtrace } from './service/client/instrumentation/init_langtrace'; import { aiAssistantCapabilities } from '../common/capabilities'; -import { registerAndScheduleKbSemanticTextMigrationTask } from './service/task_manager_definitions/register_kb_semantic_text_migration_task'; -import { updateExistingIndexAssets } from './service/create_or_update_index_assets'; +import { populateMissingSemanticTextFieldMigration } from './service/startup_migrations/populate_missing_semantic_text_field_migration'; +import { updateExistingIndexAssets } from './service/startup_migrations/create_or_update_index_assets'; export class ObservabilityAIAssistantPlugin implements @@ -128,23 +128,19 @@ export class ObservabilityAIAssistantPlugin })); // Update existing index assets (mappings, templates, etc). This will not create assets if they do not exist. - const indexAssetsUpdatedPromise = updateExistingIndexAssets({ - logger: this.logger.get('index_assets'), - core, - }).catch((e) => this.logger.error(`Index assets could not be updated: ${e.message}`)); - - // register task to migrate knowledge base entries to include semantic_text field - registerAndScheduleKbSemanticTextMigrationTask({ - core, - taskManager: plugins.taskManager, - logger: this.logger.get('kb_semantic_text_migration_task'), - config: this.config, - indexAssetsUpdatedPromise, - }).catch((e) => - this.logger.error( - `Knowledge base semantic_text migration task could not be registered: ${e.message}` + updateExistingIndexAssets({ logger: this.logger, core }) + .then(() => + populateMissingSemanticTextFieldMigration({ + core, + logger: this.logger, + config: this.config, + }) ) - ); + .catch((e) => + this.logger.error( + `Error during knowledge base migration in AI Assistant plugin startup: ${e.message}` + ) + ); service.register(registerFunctions); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts index e9c7078c0062e..9957a68178ab3 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts @@ -100,8 +100,23 @@ const resetKnowledgeBase = createObservabilityAIAssistantServerRoute({ }, }); +const reIndexKnowledgeBase = createObservabilityAIAssistantServerRoute({ + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', + security: { + authz: { + requiredPrivileges: ['ai_assistant'], + }, + }, + handler: async (resources): Promise<{ result: boolean }> => { + const client = await resources.service.getClient({ request: resources.request }); + const result = await client.reIndexKnowledgeBaseWithLock(); + return { result }; + }, +}); + const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerRoute({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', security: { authz: { requiredPrivileges: ['ai_assistant'], @@ -320,6 +335,7 @@ const importKnowledgeBaseEntries = createObservabilityAIAssistantServerRoute({ }); export const knowledgeBaseRoutes = { + ...reIndexKnowledgeBase, ...semanticTextMigrationKnowledgeBase, ...setupKnowledgeBase, ...resetKnowledgeBase, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts index dc817f378263f..b56b2e1f07bd2 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { createOrUpdateIndexAssets } from '../../service/create_or_update_index_assets'; +import { createOrUpdateIndexAssets } from '../../service/startup_migrations/create_or_update_index_assets'; import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route'; const createOrUpdateIndexAssetsRoute = createObservabilityAIAssistantServerRoute({ diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts index c3698c55396cd..10447b8beb0f4 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts @@ -67,14 +67,12 @@ import { continueConversation } from './operators/continue_conversation'; import { convertInferenceEventsToStreamingEvents } from './operators/convert_inference_events_to_streaming_events'; import { extractMessages } from './operators/extract_messages'; import { getGeneratedTitle } from './operators/get_generated_title'; -import { - reIndexKnowledgeBaseAndPopulateSemanticTextField, - scheduleKbSemanticTextMigrationTask, -} from '../task_manager_definitions/register_kb_semantic_text_migration_task'; +import { populateMissingSemanticTextFieldMigration } from '../startup_migrations/populate_missing_semantic_text_field_migration'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; import { getElserModelId } from '../knowledge_base_service/get_elser_model_id'; import { apmInstrumentation } from './operators/apm_instrumentation'; +import { reIndexKnowledgeBaseWithLock } from '../knowledge_base_service/reindex_knowledge_base'; const MAX_FUNCTION_CALLS = 8; @@ -684,14 +682,15 @@ export class ObservabilityAIAssistantClient { // setup the knowledge base const res = await knowledgeBaseService.setup(esClient, modelId); - core - .getStartServices() - .then(([_, pluginsStart]) => - scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }) - ) - .catch((error) => { - logger.error(`Failed to schedule semantic text migration task: ${error}`); - }); + populateMissingSemanticTextFieldMigration({ + core, + logger, + config: this.dependencies.config, + }).catch((e) => { + this.dependencies.logger.error( + `Failed to populate missing semantic text fields: ${e.message}` + ); + }); return res; }; @@ -701,14 +700,21 @@ export class ObservabilityAIAssistantClient { return this.dependencies.knowledgeBaseService.reset(esClient); }; - reIndexKnowledgeBaseAndPopulateSemanticTextField = () => { - return reIndexKnowledgeBaseAndPopulateSemanticTextField({ + reIndexKnowledgeBaseWithLock = () => { + return reIndexKnowledgeBaseWithLock({ + core: this.dependencies.core, esClient: this.dependencies.esClient, logger: this.dependencies.logger, - config: this.dependencies.config, }); }; + reIndexKnowledgeBaseAndPopulateSemanticTextField = () => { + return populateMissingSemanticTextFieldMigration({ + core: this.dependencies.core, + logger: this.dependencies.logger, + config: this.dependencies.config, + }); + }; addUserInstruction = async ({ entry, }: { diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts index adc7ea2822747..62bf0ffb6c4e2 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts @@ -17,7 +17,7 @@ import { ObservabilityAIAssistantClient } from './client'; import { KnowledgeBaseService } from './knowledge_base_service'; import type { RegistrationCallback, RespondFunctionResources } from './types'; import { ObservabilityAIAssistantConfig } from '../config'; -import { createOrUpdateIndexAssets } from './create_or_update_index_assets'; +import { createOrUpdateIndexAssets } from './startup_migrations/create_or_update_index_assets'; function getResourceName(resource: string) { return `.kibana-observability-ai-assistant-${resource}`; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts index e6f9f38a202f1..4915570346b34 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts @@ -9,6 +9,11 @@ import { errors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import moment from 'moment'; +import pRetry from 'p-retry'; +import { + InferenceInferenceEndpointInfo, + MlGetTrainedModelsStatsResponse, +} from '@elastic/elasticsearch/lib/api/types'; import { ObservabilityAIAssistantConfig } from '../config'; export const AI_ASSISTANT_KB_INFERENCE_ID = 'obs_ai_assistant_kb_inference'; @@ -77,9 +82,11 @@ export async function getInferenceEndpoint({ inference_id: AI_ASSISTANT_KB_INFERENCE_ID, }); - if (response.endpoints.length > 0) { - return response.endpoints[0]; + if (response.endpoints.length === 0) { + throw new Error('Inference endpoint not found'); } + + return response.endpoints[0]; } export function isInferenceEndpointMissingOrUnavailable(error: Error) { @@ -90,7 +97,7 @@ export function isInferenceEndpointMissingOrUnavailable(error: Error) { ); } -export async function getElserModelStatus({ +export async function getKbModelStatus({ esClient, logger, config, @@ -99,40 +106,34 @@ export async function getElserModelStatus({ logger: Logger; config: ObservabilityAIAssistantConfig; }) { - let errorMessage = ''; - const endpoint = await getInferenceEndpoint({ - esClient, - }).catch((error) => { + const enabled = config.enableKnowledgeBase; + + let endpoint: InferenceInferenceEndpointInfo; + try { + endpoint = await getInferenceEndpoint({ esClient }); + } catch (error) { if (!isInferenceEndpointMissingOrUnavailable(error)) { throw error; } - errorMessage = error.message; - }); - - const enabled = config.enableKnowledgeBase; - if (!endpoint) { - return { ready: false, enabled, errorMessage }; + return { ready: false, enabled, errorMessage: error.message }; } - const modelId = endpoint.service_settings?.model_id; - const modelStats = await esClient.asInternalUser.ml - .getTrainedModelsStats({ model_id: modelId }) - .catch((error) => { - logger.debug(`Failed to get model stats: ${error.message}`); - errorMessage = error.message; + let trainedModelStatsResponse: MlGetTrainedModelsStatsResponse; + try { + trainedModelStatsResponse = await esClient.asInternalUser.ml.getTrainedModelsStats({ + model_id: endpoint.service_settings?.model_id, }); - - if (!modelStats) { - return { ready: false, enabled, errorMessage }; + } catch (error) { + logger.debug(`Failed to get model stats: ${error.message}`); + return { ready: false, enabled, errorMessage: error.message }; } - const elserModelStats = modelStats.trained_model_stats.find( + const modelStats = trainedModelStatsResponse.trained_model_stats.find( (stats) => stats.deployment_stats?.deployment_id === AI_ASSISTANT_KB_INFERENCE_ID ); - const deploymentState = elserModelStats?.deployment_stats?.state; - const allocationState = elserModelStats?.deployment_stats?.allocation_status?.state; - const allocationCount = - elserModelStats?.deployment_stats?.allocation_status?.allocation_count ?? 0; + const deploymentState = modelStats?.deployment_stats?.state; + const allocationState = modelStats?.deployment_stats?.allocation_status?.state; + const allocationCount = modelStats?.deployment_stats?.allocation_status?.allocation_count ?? 0; const ready = deploymentState === 'started' && allocationState === 'fully_allocated' && allocationCount > 0; @@ -147,3 +148,24 @@ export async function getElserModelStatus({ }, }; } + +export async function waitForKbModel({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + return pRetry( + async () => { + const { ready } = await getKbModelStatus({ esClient, logger, config }); + if (!ready) { + logger.debug('Knowledge base model is not yet ready. Retrying...'); + throw new Error('Knowledge base model is not yet ready'); + } + }, + { retries: 30, factor: 2, maxTimeout: 30_000 } + ); +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts index c886b5187ab95..d666ee7aec832 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -10,6 +10,7 @@ import type { CoreSetup, ElasticsearchClient, IUiSettingsClient } from '@kbn/cor import type { Logger } from '@kbn/logging'; import { orderBy } from 'lodash'; import { encode } from 'gpt-tokenizer'; +import { LockAcquisitionError } from '@kbn/lock-manager'; import { resourceNames } from '..'; import { Instruction, @@ -23,7 +24,7 @@ import { getSpaceQuery } from '../util/get_space_query'; import { createInferenceEndpoint, deleteInferenceEndpoint, - getElserModelStatus, + getKbModelStatus, isInferenceEndpointMissingOrUnavailable, } from '../inference_endpoint'; import { recallFromSearchConnectors } from './recall_from_search_connectors'; @@ -32,8 +33,8 @@ import { ObservabilityAIAssistantConfig } from '../../config'; import { isKnowledgeBaseIndexWriteBlocked, isSemanticTextUnsupportedError, + reIndexKnowledgeBaseWithLock, } from './reindex_knowledge_base'; -import { scheduleKbSemanticTextMigrationTask } from '../task_manager_definitions/register_kb_semantic_text_migration_task'; interface Dependencies { core: CoreSetup; @@ -443,23 +444,20 @@ export class KnowledgeBaseService { } if (isSemanticTextUnsupportedError(error)) { - this.dependencies.core - .getStartServices() - .then(([_, pluginsStart]) => { - return scheduleKbSemanticTextMigrationTask({ - taskManager: pluginsStart.taskManager, - logger: this.dependencies.logger, - runSoon: true, - }); - }) - .catch((e) => { - this.dependencies.logger.error( - `Failed to schedule knowledge base semantic text migration task: ${e}` - ); - }); + reIndexKnowledgeBaseWithLock({ + core: this.dependencies.core, + logger: this.dependencies.logger, + esClient: this.dependencies.esClient, + }).catch((e) => { + if (error instanceof LockAcquisitionError) { + this.dependencies.logger.debug(`Re-indexing operation is already in progress`); + return; + } + this.dependencies.logger.error(`Failed to re-index knowledge base: ${e.message}`); + }); throw serverUnavailable( - 'The knowledge base is currently being re-indexed. Please try again later' + `The index "${resourceNames.aliases.kb}" does not support semantic text and must be reindexed. This re-index operation has been scheduled and will be started automatically. Please try again later.` ); } @@ -491,7 +489,7 @@ export class KnowledgeBaseService { }; getStatus = async () => { - return getElserModelStatus({ + return getKbModelStatus({ esClient: this.dependencies.esClient, logger: this.dependencies.logger, config: this.dependencies.config, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts index ea8dd58517f18..776f123e70f34 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts @@ -8,44 +8,45 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; +import { CoreSetup } from '@kbn/core/server'; +import { LockManagerService } from '@kbn/lock-manager'; import { resourceNames } from '..'; -import { createKbConcreteIndex } from '../create_or_update_index_assets'; +import { createKbConcreteIndex } from '../startup_migrations/create_or_update_index_assets'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; -export async function reIndexKnowledgeBase({ +export const KB_REINDEXING_LOCK_ID = 'observability_ai_assistant:kb_reindexing'; +export async function reIndexKnowledgeBaseWithLock({ + core, logger, esClient, }: { + core: CoreSetup; logger: Logger; esClient: { asInternalUser: ElasticsearchClient; }; -}): Promise { +}): Promise { + const lmService = new LockManagerService(core, logger); + return lmService.withLock(KB_REINDEXING_LOCK_ID, () => + reIndexKnowledgeBase({ logger, esClient }) + ); +} + +async function reIndexKnowledgeBase({ + logger, + esClient, +}: { + logger: Logger; + esClient: { + asInternalUser: ElasticsearchClient; + }; +}): Promise { logger.debug('Initiating knowledge base re-indexing...'); try { const originalIndex = resourceNames.concreteIndexName.kb; const tempIndex = `${resourceNames.aliases.kb}-000002`; - const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({ - index: originalIndex, - }); - - const indexSettings = indexSettingsResponse[originalIndex].settings; - const createdVersion = parseInt(indexSettings?.index?.version?.created ?? '', 10); - - // Check if the index was created before version 8.11 - const versionThreshold = 8110000; // Version 8.11.0 - if (createdVersion >= versionThreshold) { - logger.debug( - `Knowledge base index "${originalIndex}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting` - ); - return; - } - - logger.info( - `Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...` - ); - // Create temporary index logger.debug(`Creating temporary index "${tempIndex}"...`); await esClient.asInternalUser.indices.delete({ index: tempIndex }, { ignore: [404] }); @@ -82,11 +83,10 @@ export async function reIndexKnowledgeBase({ logger.debug(`Deleting temporary index "${tempIndex}"...`); await esClient.asInternalUser.indices.delete({ index: tempIndex }); - logger.info( - 'Re-indexing knowledge base completed successfully. Semantic text field is now supported.' - ); + logger.info('Re-indexing knowledge base completed successfully'); + return true; } catch (error) { - throw new Error(`Failed to reindex knowledge base: ${error.message}`); + throw new Error(`Failed to re-index knowledge base: ${error.message}`); } } diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts similarity index 95% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts rename to x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts index 85a0e9ba8e42a..f416dd1d0292a 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts @@ -7,10 +7,10 @@ import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server'; import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server'; -import type { ObservabilityAIAssistantPluginStartDependencies } from '../types'; -import { conversationComponentTemplate } from './conversation_component_template'; -import { kbComponentTemplate } from './kb_component_template'; -import { resourceNames } from '.'; +import type { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { conversationComponentTemplate } from '../conversation_component_template'; +import { kbComponentTemplate } from '../kb_component_template'; +import { resourceNames } from '..'; export async function updateExistingIndexAssets({ logger, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts new file mode 100644 index 0000000000000..70bfbbe6a9c6a --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import pLimit from 'p-limit'; +import type { CoreSetup, Logger } from '@kbn/core/server'; +import { uniq } from 'lodash'; +import pRetry from 'p-retry'; +import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; +import { KnowledgeBaseEntry } from '../../../common'; +import { resourceNames } from '..'; +import { waitForKbModel } from '../inference_endpoint'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { ObservabilityAIAssistantConfig } from '../../config'; +import { reIndexKnowledgeBaseWithLock } from '../knowledge_base_service/reindex_knowledge_base'; + +const PLUGIN_STARTUP_LOCK_ID = 'observability_ai_assistant:startup_migrations'; + +// This function populates the `semantic_text` field for knowledge base entries during the plugin's startup process. +// It ensures all missing fields are updated in batches and uses a distributed lock to prevent conflicts in distributed environments. +// If the knowledge base index does not support the `semantic_text` field, it is re-indexed. +export async function populateMissingSemanticTextFieldMigration({ + core, + logger, + config, +}: { + core: CoreSetup; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + const [coreStart] = await core.getStartServices(); + const esClient = coreStart.elasticsearch.client; + + const lmService = new LockManagerService(core, logger); + await lmService + .withLock(PLUGIN_STARTUP_LOCK_ID, async () => { + const hasKbIndex = await esClient.asInternalUser.indices.exists({ + index: resourceNames.aliases.kb, + }); + + if (!hasKbIndex) { + logger.debug('Knowledge base index does not exist. Aborting updating index assets'); + return; + } + + const isKbSemanticTextCompatible = await isKnowledgeBaseSemanticTextCompatible({ + logger, + esClient, + }); + + if (!isKbSemanticTextCompatible) { + await reIndexKnowledgeBaseWithLock({ core, logger, esClient }); + } + + await pRetry( + async () => populateMissingSemanticTextFieldRecursively({ esClient, logger, config }), + { retries: 5, minTimeout: 10_000 } + ); + }) + .catch((error) => { + if (!(error instanceof LockAcquisitionError)) { + throw error; + } + }); +} + +// Ensures that every doc has populated the `semantic_text` field. +// It retrieves entries without the field, updates them in batches, and continues until no entries remain. +async function populateMissingSemanticTextFieldRecursively({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + logger.debug( + 'Checking for remaining entries without semantic_text field that need to be migrated' + ); + + const response = await esClient.asInternalUser.search({ + size: 100, + track_total_hits: true, + index: [resourceNames.aliases.kb], + query: { + bool: { + must_not: { + exists: { + field: 'semantic_text', + }, + }, + }, + }, + _source: { + excludes: ['ml.tokens'], + }, + }); + + if (response.hits.hits.length === 0) { + logger.debug('No remaining entries to migrate'); + return; + } + + await waitForKbModel({ esClient, logger, config }); + + const indicesWithOutdatedEntries = uniq(response.hits.hits.map((hit) => hit._index)); + logger.debug( + `Found ${response.hits.hits.length} entries without semantic_text field in "${indicesWithOutdatedEntries}". Updating now...` + ); + + // Limit the number of concurrent requests to avoid overloading the cluster + const limiter = pLimit(20); + const promises = response.hits.hits.map((hit) => { + return limiter(() => { + if (!hit._source || !hit._id) { + return; + } + + return esClient.asInternalUser.update({ + refresh: 'wait_for', + index: resourceNames.aliases.kb, + id: hit._id, + doc: { + ...hit._source, + semantic_text: hit._source.text ?? 'No text', + }, + }); + }); + }); + + await Promise.all(promises); + logger.debug(`Updated ${promises.length} entries`); + + await sleep(100); + await populateMissingSemanticTextFieldRecursively({ esClient, logger, config }); +} + +async function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// Checks if the knowledge base index supports `semantic_text` +// If the index was created before version 8.11, it requires re-indexing to support the `semantic_text` field. +async function isKnowledgeBaseSemanticTextCompatible({ + logger, + esClient, +}: { + logger: Logger; + esClient: { asInternalUser: ElasticsearchClient }; +}): Promise { + const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({ + index: resourceNames.aliases.kb, + }); + + const results = Object.entries(indexSettingsResponse); + if (results.length === 0) { + logger.debug('No knowledge base indices found. Skipping re-indexing.'); + return true; + } + + const [indexName, { settings }] = results[0]; + const createdVersion = parseInt(settings?.index?.version?.created ?? '', 10); + + // Check if the index was created before version 8.11 + const versionThreshold = 8110000; // Version 8.11.0 + if (createdVersion >= versionThreshold) { + logger.debug( + `Knowledge base index "${indexName}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting` + ); + return true; + } + + logger.info( + `Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...` + ); + + return false; +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts deleted file mode 100644 index 4f83e61a67a4d..0000000000000 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import pLimit from 'p-limit'; -import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server'; -import type { CoreSetup, CoreStart, Logger } from '@kbn/core/server'; -import pRetry from 'p-retry'; -import { KnowledgeBaseEntry } from '../../../common'; -import { resourceNames } from '..'; -import { getElserModelStatus } from '../inference_endpoint'; -import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; -import { ObservabilityAIAssistantConfig } from '../../config'; -import { reIndexKnowledgeBase } from '../knowledge_base_service/reindex_knowledge_base'; - -const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id'; -const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration'; - -// This task will re-index all knowledge base entries without `semantic_text` field -// to ensure the field is populated with the correct embeddings. -// After the migration we will no longer need to use the `ml.tokens` field. -export async function registerAndScheduleKbSemanticTextMigrationTask({ - taskManager, - logger, - core, - config, - indexAssetsUpdatedPromise, -}: { - taskManager: TaskManagerSetupContract; - logger: Logger; - core: CoreSetup; - config: ObservabilityAIAssistantConfig; - indexAssetsUpdatedPromise: Promise; -}) { - const [coreStart, pluginsStart] = await core.getStartServices(); - - // register task - registerKbSemanticTextMigrationTask({ taskManager, logger, coreStart, config }); - - // wait for index assets to be updated - await indexAssetsUpdatedPromise; - - // schedule task - await scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }); -} - -function registerKbSemanticTextMigrationTask({ - taskManager, - logger, - coreStart, - config, -}: { - taskManager: TaskManagerSetupContract; - logger: Logger; - coreStart: CoreStart; - config: ObservabilityAIAssistantConfig; -}) { - try { - logger.debug(`Register task "${TASK_TYPE}"`); - taskManager.registerTaskDefinitions({ - [TASK_TYPE]: { - title: 'Add support for semantic_text in Knowledge Base', - description: `This task will reindex the knowledge base and populate the semantic_text fields for all entries without it.`, - timeout: '1h', - maxAttempts: 5, - createTaskRunner() { - return { - async run() { - logger.debug(`Run task: "${TASK_TYPE}"`); - const esClient = coreStart.elasticsearch.client; - - const hasKbIndex = await esClient.asInternalUser.indices.exists({ - index: resourceNames.aliases.kb, - }); - - if (!hasKbIndex) { - logger.debug('Knowledge base index does not exist. Skipping migration.'); - return; - } - - if (config.disableKbSemanticTextMigration) { - logger.info( - 'Semantic text migration is disabled via config "xpack.observabilityAIAssistant.disableKbSemanticTextMigration=true". Skipping migration.' - ); - return; - } - - await reIndexKnowledgeBaseAndPopulateSemanticTextField({ esClient, logger, config }); - }, - }; - }, - }, - }); - } catch (error) { - logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`); - } -} - -export async function scheduleKbSemanticTextMigrationTask({ - taskManager, - logger, - runSoon = false, -}: { - taskManager: ObservabilityAIAssistantPluginStartDependencies['taskManager']; - logger: Logger; - runSoon?: boolean; -}) { - logger.debug('Schedule migration task'); - await taskManager.ensureScheduled({ - id: TASK_ID, - taskType: TASK_TYPE, - scope: ['aiAssistant'], - params: {}, - state: {}, - }); - - if (runSoon) { - logger.debug('Run migration task soon'); - await taskManager.runSoon(TASK_ID); - } -} - -export async function reIndexKnowledgeBaseAndPopulateSemanticTextField({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - logger.debug('Starting migration...'); - - try { - await reIndexKnowledgeBase({ logger, esClient }); - await populateSemanticTextFieldRecursively({ esClient, logger, config }); - } catch (e) { - logger.error(`Migration failed: ${e.message}`); - } - - logger.debug('Migration succeeded'); -} - -async function populateSemanticTextFieldRecursively({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - logger.debug('Populating semantic_text field for entries without it'); - - const response = await esClient.asInternalUser.search({ - size: 100, - track_total_hits: true, - index: [resourceNames.aliases.kb], - query: { - bool: { - must_not: { - exists: { - field: 'semantic_text', - }, - }, - }, - }, - _source: { - excludes: ['ml.tokens'], - }, - }); - - if (response.hits.hits.length === 0) { - logger.debug('No remaining entries to migrate'); - return; - } - - logger.debug(`Found ${response.hits.hits.length} entries to migrate`); - - await waitForModel({ esClient, logger, config }); - - // Limit the number of concurrent requests to avoid overloading the cluster - const limiter = pLimit(10); - const promises = response.hits.hits.map((hit) => { - return limiter(() => { - if (!hit._source || !hit._id) { - return; - } - - return esClient.asInternalUser.update({ - refresh: 'wait_for', - index: resourceNames.aliases.kb, - id: hit._id, - body: { - doc: { - ...hit._source, - semantic_text: hit._source.text, - }, - }, - }); - }); - }); - - await Promise.all(promises); - - logger.debug(`Populated ${promises.length} entries`); - await populateSemanticTextFieldRecursively({ esClient, logger, config }); -} - -async function waitForModel({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - return pRetry( - async () => { - const { ready } = await getElserModelStatus({ esClient, logger, config }); - if (!ready) { - logger.debug('Elser model is not yet ready. Retrying...'); - throw new Error('Elser model is not yet ready'); - } - }, - { retries: 30, factor: 2, maxTimeout: 30_000 } - ); -} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json b/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json index 63485425b8cd6..5a0faa995ffa3 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json @@ -53,7 +53,8 @@ "@kbn/ai-assistant-icon", "@kbn/core-http-browser", "@kbn/sse-utils", - "@kbn/core-security-server" + "@kbn/core-security-server", + "@kbn/lock-manager" ], "exclude": ["target/**/*"] } diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts index d49fda2556f69..c4ea8213f583d 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts @@ -27,6 +27,9 @@ export const REMOVED_TYPES: string[] = [ 'cleanup_failed_action_executions', 'reports:monitor', + + // deprecated in https://github.com/elastic/kibana/pull/216916 + 'obs-ai-assistant:knowledge-base-migration', ]; /** diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts index 29b326e34f847..c3f4e4607ca71 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts @@ -15,55 +15,10 @@ import { addSampleDocsToCustomIndex, setupKnowledgeBase, } from '../../utils/knowledge_base'; +import { animalSampleDocs, technicalSampleDocs } from '../../utils/sample_docs'; const customSearchConnectorIndex = 'animals_kb'; -const sampleDocsForInternalKb = [ - { - id: 'technical_db_outage_slow_queries', - title: 'Database Outage: Slow Query Execution', - text: 'At 03:15 AM UTC, the production database experienced a significant outage, leading to slow query execution and increased response times across multiple services. A surge in database load was detected, with 90% of queries exceeding 2 seconds. A detailed log analysis pointed to locking issues within the transaction queue and inefficient index usage.', - }, - { - id: 'technical_api_gateway_timeouts', - title: 'Service Timeout: API Gateway Bottleneck', - text: 'At 10:45 AM UTC, the API Gateway encountered a timeout issue, causing a 500 error for all incoming requests. Detailed traces indicated a significant bottleneck at the gateway level, where requests stalled while waiting for upstream service responses. The upstream service was overwhelmed due to a sudden spike in inbound traffic and failed to release resources promptly.', - }, - { - id: 'technical_cache_misses_thirdparty_api', - title: 'Cache Misses and Increased Latency: Third-Party API Failure', - text: 'At 04:30 PM UTC, a dramatic increase in cache misses and latency was observed. The failure of a third-party API prevented critical data from being cached, leading to unnecessary re-fetching of resources from external sources. This caused significant delays in response times, with up to 10-second delays in some key services.', - }, -]; - -const sampleDocsForCustomIndex = [ - { - id: 'animal_elephants_social_structure', - title: 'Elephants and Their Social Structure', - text: 'Elephants are highly social animals that live in matriarchal herds led by the oldest female. These animals communicate through low-frequency sounds, called infrasound, that travel long distances. They are known for their intelligence, strong memory, and deep emotional bonds with each other.', - }, - { - id: 'animal_cheetah_life_speed', - title: 'The Life of a Cheetah', - text: 'Cheetahs are the fastest land animals, capable of reaching speeds up to 60 miles per hour in short bursts. They rely on their speed to catch prey, such as gazelles. Unlike other big cats, cheetahs cannot roar, but they make distinctive chirping sounds, especially when communicating with their cubs.', - }, - { - id: 'animal_whale_migration_patterns', - title: 'Whales and Their Migration Patterns', - text: 'Whales are known for their long migration patterns, traveling thousands of miles between feeding and breeding grounds.', - }, - { - id: 'animal_giraffe_habitat_feeding', - title: 'Giraffes: Habitat and Feeding Habits', - text: 'Giraffes are the tallest land animals, with long necks that help them reach leaves high up in trees. They live in savannas and grasslands, where they feed on leaves, twigs, and fruits from acacia trees.', - }, - { - id: 'animal_penguin_antarctic_adaptations', - title: 'Penguins and Their Antarctic Adaptations', - text: 'Penguins are flightless birds that have adapted to life in the cold Antarctic environment. They have a thick layer of blubber to keep warm, and their wings have evolved into flippers for swimming in the icy waters.', - }, -]; - export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); const es = getService('es'); @@ -71,12 +26,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('recall', function () { before(async () => { await setupKnowledgeBase(getService); - await addSampleDocsToInternalKb(getService, sampleDocsForInternalKb); - await addSampleDocsToCustomIndex( - getService, - sampleDocsForCustomIndex, - customSearchConnectorIndex - ); + await addSampleDocsToInternalKb(getService, technicalSampleDocs); + await addSampleDocsToCustomIndex(getService, animalSampleDocs, customSearchConnectorIndex); }); after(async () => { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index 6c84b86d33b33..c0a511d4eb3eb 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -8,19 +8,28 @@ import expect from '@kbn/expect'; import { v4 as uuid } from 'uuid'; import prettyMilliseconds from 'pretty-ms'; +import nock from 'nock'; +import { Client } from '@elastic/elasticsearch'; +import { times } from 'lodash'; +import { ToolingLog } from '@kbn/tooling-log'; +import pRetry from 'p-retry'; import { LockId, - ensureTemplatesAndIndexCreated, LockManager, - withLock, LockDocument, + withLock, + runSetupIndexAssetEveryTime, +} from '@kbn/lock-manager/src/lock_manager_client'; +import { + LOCKS_COMPONENT_TEMPLATE_NAME, LOCKS_CONCRETE_INDEX_NAME, -} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/lock_manager_client'; -import { Client } from '@elastic/elasticsearch'; -import { times } from 'lodash'; -import { ToolingLog } from '@kbn/tooling-log'; + LOCKS_INDEX_TEMPLATE_NAME, + setupLockManagerIndex, +} from '@kbn/lock-manager/src/setup_lock_manager_index'; + import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; import { getLoggerMock } from '../utils/logger'; +import { dateAsTimestamp, durationAsMs, sleep } from '../utils/time'; export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const es = getService('es'); @@ -28,194 +37,428 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const logger = getLoggerMock(log); describe('LockManager', function () { + // see details: https://github.com/elastic/kibana/issues/219091 + this.tags(['failsOnMKI']); before(async () => { - await clearAllLocks(es); - await ensureTemplatesAndIndexCreated(es); + // delete existing index mappings to ensure we start from a clean state + await deleteLockIndexAssets(es, log); + + // ensure that the index and templates are created + runSetupIndexAssetEveryTime(); }); - describe('Basic lock operations', () => { - let lockManager: LockManager; - const LOCK_ID = 'basic_lock_operations'; + after(async () => { + await deleteLockIndexAssets(es, log); + }); - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); + describe('Manual locking API', function () { + this.tags(['failsOnMKI']); + before(async () => { + await clearAllLocks(es, log); }); - afterEach(async () => { - await lockManager.release(); - }); + describe('Basic lock operations', () => { + let lockManager: LockManager; + const LOCK_ID = 'basic_lock_operations'; - it('acquires the lock when not held', async () => { - const acquired = await lockManager.acquire(); - expect(acquired).to.be(true); + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + }); - const lock = await getLockById(es, LOCK_ID); - expect(true).to.be(true); + afterEach(async () => { + await lockManager.release(); + }); - expect(lock).not.to.be(undefined); - }); + it('acquires the lock when not held', async () => { + const acquired = await lockManager.acquire(); + expect(acquired).to.be(true); - it('stores and retrieves metadata', async () => { - const metadata = { foo: 'bar' }; - await lockManager.acquire({ metadata }); - const lock = await getLockById(es, LOCK_ID); - expect(lock?.metadata).to.eql(metadata); - }); + const lock = await getLockById(es, LOCK_ID); + expect(true).to.be(true); - it('releases the lock', async () => { - const acquired = await lockManager.acquire(); - expect(acquired).to.be(true); + expect(lock).not.to.be(undefined); + }); - await lockManager.release(); + it('stores and retrieves metadata', async () => { + const metadata = { foo: 'bar' }; + await lockManager.acquire({ metadata }); + const lock = await getLockById(es, LOCK_ID); + expect(lock?.metadata).to.eql(metadata); + }); - const lock = await getLockById(es, LOCK_ID); - expect(lock).to.be(undefined); - }); + it('releases the lock', async () => { + const acquired = await lockManager.acquire(); + expect(acquired).to.be(true); - it('it sets expiresAt according to the ttl', async () => { - await lockManager.acquire({ ttl: 8 * 60 * 1000 }); - const lock = await getLockById(es, LOCK_ID); - const ttl = new Date(lock!.expiresAt).getTime() - new Date(lock!.createdAt).getTime(); - expect(prettyMilliseconds(ttl)).to.be('8m'); - }); + await lockManager.release(); - it('does not throw when releasing a non-existent lock', async () => { - await lockManager.release(); - await lockManager.release(); - const lock = await getLockById(es, LOCK_ID); - expect(lock).to.be(undefined); - }); - }); + const lock = await getLockById(es, LOCK_ID); + expect(lock).to.be(undefined); + }); - describe('get', () => { - let lockManager: LockManager; - const LOCK_ID = 'my_lock_with_get'; + it('it sets expiresAt according to the ttl', async () => { + await lockManager.acquire({ ttl: durationAsMs(8, 'minutes') }); + const lock = await getLockById(es, LOCK_ID); + const ttl = dateAsTimestamp(lock!.expiresAt) - dateAsTimestamp(lock!.createdAt); + expect(prettyMilliseconds(ttl)).to.be('8m'); + }); - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); + it('does not throw when releasing a non-existent lock', async () => { + await lockManager.release(); + await lockManager.release(); + const lock = await getLockById(es, LOCK_ID); + expect(lock).to.be(undefined); + }); }); - afterEach(async () => { - await lockManager.release(); - }); + describe('when encountering network error the ES client retries the request', () => { + let lockManager: LockManager; + const LOCK_ID = 'es_client_retries_lock'; + let retryCounter = 0; - it('does not return expired locks', async () => { - await lockManager.acquire({ ttl: 500 }); - await sleep(1000); - const lock = await lockManager.get(); - expect(lock).to.be(undefined); + beforeEach(async () => { + retryCounter = 0; - const lockRaw = await getLockById(es, LOCK_ID); - expect(lockRaw).to.not.be(undefined); - }); - }); + lockManager = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + nock.cleanAll(); + await lockManager.release(); + }); - describe('Two LockManagers with different lockId', () => { - let manager1: LockManager; - let manager2: LockManager; + after(async () => { + nock.restore(); + }); - beforeEach(async () => { - manager1 = new LockManager('my_lock_id' as LockId, es, logger); - manager2 = new LockManager('my_other_lock_id', es, logger); - }); + function addElasticsearchMock({ numberOfMocks }: { numberOfMocks: number }) { + nock(/localhost:9220/, { allowUnmocked: true }) + .filteringRequestBody(() => '*') + .post(`/${LOCKS_CONCRETE_INDEX_NAME}/_update/${LOCK_ID}`) + .times(numberOfMocks) + .reply((uri, requestBody, cb) => { + log.debug(`Returning mock error for ${uri}`); + retryCounter++; + setTimeout(() => { + cb(null, [503, 'Service Unavailable']); + }, 100); + }); + } - afterEach(async () => { - await manager1.release(); - await manager2.release(); - }); + it('eventually succeeds', async () => { + addElasticsearchMock({ numberOfMocks: 3 }); - it('does not interfere between separate locks', async () => { - const acquired1 = await manager1.acquire(); - const acquired2 = await manager2.acquire(); - expect(acquired1).to.be(true); - expect(acquired2).to.be(true); - }); - }); + const acquired = await lockManager.acquire(); + + expect(acquired).to.be(true); + expect(retryCounter).to.be(3); + }); - describe('Two LockManagers with identical lockId', () => { - let manager1: LockManager; - let manager2: LockManager; + it('eventually fails', async () => { + addElasticsearchMock({ numberOfMocks: 4 }); - const LOCK_ID = 'my_lock'; + const acquired = await lockManager.acquire(); - beforeEach(async () => { - manager1 = new LockManager(LOCK_ID, es, logger); - manager2 = new LockManager(LOCK_ID, es, logger); + expect(acquired).to.be(false); + expect(retryCounter).to.be(4); + }); }); - afterEach(async () => { - await manager1.release(); - await manager2.release(); + describe('get', () => { + let lockManager: LockManager; + const LOCK_ID = 'my_lock_with_get'; + + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await lockManager.release(); + }); + + it('does not return expired locks', async () => { + await lockManager.acquire({ ttl: 500 }); + await sleep(1000); + const lock = await lockManager.get(); + expect(lock).to.be(undefined); + + const lockRaw = await getLockById(es, LOCK_ID); + expect(lockRaw).to.not.be(undefined); + }); }); - it('does not acquire the lock if already held', async () => { - const acquired1 = await manager1.acquire({ metadata: { attempt: 'one' } }); - expect(acquired1).to.be(true); + describe('Two LockManagers with different lockId', () => { + let manager1: LockManager; + let manager2: LockManager; - const acquired2 = await manager2.acquire({ metadata: { attempt: 'two' } }); - expect(acquired2).to.be(false); + beforeEach(async () => { + manager1 = new LockManager('my_lock_id' as LockId, es, logger); + manager2 = new LockManager('my_other_lock_id', es, logger); + }); + + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); - const lock = await getLockById(es, LOCK_ID); - expect(lock?.metadata).to.eql({ attempt: 'one' }); + it('does not interfere between separate locks', async () => { + const acquired1 = await manager1.acquire(); + const acquired2 = await manager2.acquire(); + expect(acquired1).to.be(true); + expect(acquired2).to.be(true); + }); }); - it('allows re-acquisition after expiration', async () => { - // Acquire with a very short TTL. - const acquired = await manager1.acquire({ ttl: 500, metadata: { attempt: 'one' } }); - expect(acquired).to.be(true); + describe('Two LockManagers with identical lockId', () => { + let manager1: LockManager; + let manager2: LockManager; + + const LOCK_ID = 'my_lock'; + + beforeEach(async () => { + manager1 = new LockManager(LOCK_ID, es, logger); + manager2 = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); - await sleep(1000); // wait for lock to expire + it('does not acquire the lock if already held', async () => { + const acquired1 = await manager1.acquire({ metadata: { attempt: 'one' } }); + expect(acquired1).to.be(true); + + const acquired2 = await manager2.acquire({ metadata: { attempt: 'two' } }); + expect(acquired2).to.be(false); + + const lock = await getLockById(es, LOCK_ID); + expect(lock?.metadata).to.eql({ attempt: 'one' }); + }); - const reacquired = await manager2.acquire({ metadata: { attempt: 'two' } }); - expect(reacquired).to.be(true); + it('allows re-acquisition after expiration', async () => { + // Acquire with a very short TTL. + const acquired = await manager1.acquire({ ttl: 500, metadata: { attempt: 'one' } }); + expect(acquired).to.be(true); + + await sleep(1000); // wait for lock to expire + + const reacquired = await manager2.acquire({ metadata: { attempt: 'two' } }); + expect(reacquired).to.be(true); + }); }); - }); - describe('acquireWithRetry', () => { - let blockingManager: LockManager; - let waitingManager: LockManager; + describe('when waiting for lock to be available using pRetry', () => { + let blockingManager: LockManager; + let waitingManager: LockManager; + + const RETRY_LOCK_ID = 'my_lock_with_retry'; + + beforeEach(async () => { + blockingManager = new LockManager(RETRY_LOCK_ID, es, logger); + waitingManager = new LockManager(RETRY_LOCK_ID, es, logger); + await blockingManager.release(); + await waitingManager.release(); + }); + + afterEach(async () => { + await blockingManager.release(); + await waitingManager.release(); + }); - const RETRY_LOCK_ID = 'my_lock_with_retry'; + it('eventually acquires the lock when it becomes available', async () => { + const acquired = await blockingManager.acquire(); + expect(acquired).to.be(true); + + const waitPromise = pRetry(async () => { + const hasLock = await waitingManager.acquire(); + if (!hasLock) { + throw new Error(`Lock "${RETRY_LOCK_ID}" not available yet`); + } + return hasLock; + }); + await sleep(100); + await blockingManager.release(); - beforeEach(async () => { - blockingManager = new LockManager(RETRY_LOCK_ID, es, logger); - waitingManager = new LockManager(RETRY_LOCK_ID, es, logger); - await blockingManager.release(); - await waitingManager.release(); + const waitResult = await waitPromise; + expect(waitResult).to.be(true); + }); + + it('throws an error when the retry times out', async () => { + await blockingManager.acquire(); + + let error: Error | undefined; + try { + await pRetry( + async () => { + const hasLock = await waitingManager.acquire(); + if (!hasLock) { + throw new Error(`Lock "${RETRY_LOCK_ID}" not available yet`); + } + return hasLock; + }, + { minTimeout: 100, maxTimeout: 100, retries: 2 } + ); + } catch (err) { + error = err; + } + expect(error?.message).to.contain('Lock "my_lock_with_retry" not available yet'); + await blockingManager.release(); + }); }); - afterEach(async () => { - await blockingManager.release(); - await waitingManager.release(); + describe('extendTtl', () => { + let lockManager: LockManager; + const ttl = 1000; + + const LOCK_ID = 'my_lock_extend_ttl'; + + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + await lockManager.acquire({ ttl }); + }); + + afterEach(async () => { + await lockManager.release(); + }); + + it('has initial `expiresAt` value', async () => { + const lock = (await getLockById(es, LOCK_ID))!; + expect(dateAsTimestamp(lock.expiresAt)).to.be(dateAsTimestamp(lock.createdAt) + ttl); + }); + + it('update `expiresAt` to be greater than before', async () => { + const lockBeforeExtending = (await getLockById(es, LOCK_ID))!; + const res = await lockManager.extendTtl(2000); + expect(res).to.be(true); + + const lockAfterExtension = (await getLockById(es, LOCK_ID))!; + expect(dateAsTimestamp(lockAfterExtension.expiresAt)).to.be.greaterThan( + dateAsTimestamp(lockBeforeExtending.expiresAt) + ); + }); + + it('does not extend lock if already released', async () => { + await lockManager.release(); + const res = await lockManager.extendTtl(2000); + expect(res).to.be(false); + }); }); - it('eventually acquires the lock when it becomes available', async () => { - const acquired = await blockingManager.acquire(); - expect(acquired).to.be(true); + describe('Concurrency and race conditions', () => { + const LOCK_ID = 'my_lock_with_concurrency'; + + it('should allow only one lock acquisition among many concurrent attempts', async () => { + const lockManagers = await Promise.all( + times(20).map(() => new LockManager(LOCK_ID, es, logger)) + ); + + const acquireAttempts = await Promise.all(lockManagers.map((lm) => lm.acquire())); + const releaseAttempts = await Promise.all(lockManagers.map((lm) => lm.release())); + + expect(acquireAttempts.filter((v) => v === true)).to.have.length(1); + expect(releaseAttempts.filter((v) => v === true)).to.have.length(1); + }); + + it('should handle concurrent release and acquisition without race conditions', async () => { + const initialManager = new LockManager(LOCK_ID, es, logger); + const acquired = await initialManager.acquire(); + expect(acquired).to.be(true); + + const attempts = await Promise.all( + times(20).map(async () => { + const releaseResult = new LockManager(LOCK_ID, es, logger).release(); + const acquireResult = new LockManager(LOCK_ID, es, logger).acquire(); + + const [release, acquire] = await Promise.all([releaseResult, acquireResult]); + return { release, acquire }; + }) + ); + + expect(attempts.filter((r) => r.acquire === true)).to.have.length(0); + expect(attempts.filter((r) => r.release === true)).to.have.length(0); - const waitPromise = waitingManager.acquireWithRetry({ minTimeout: 50, maxTimeout: 50 }); - await sleep(100); - await blockingManager.release(); + // Finally, confirm that the lock still exists + const lock = await getLockById(es, LOCK_ID); + expect(lock).not.to.be(undefined); - const waitResult = await waitPromise; - expect(waitResult).to.be(true); + // cleanup + await initialManager.release(); + }); }); - it('throws an Error when the wait times out', async () => { - await blockingManager.acquire(); + describe('Token fencing', () => { + let manager1: LockManager; + let manager2: LockManager; - let error: Error | undefined; - try { - await waitingManager.acquireWithRetry({ minTimeout: 100, maxRetryTime: 100, retries: 2 }); - } catch (err) { - error = err; - } - expect(error?.message).to.contain('Lock "my_lock_with_retry" not available yet'); - await blockingManager.release(); + const LOCK_ID = 'my_lock_with_token_fencing'; + + beforeEach(async () => { + manager1 = new LockManager(LOCK_ID, es, logger); + manager2 = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); + + it('should not release the lock if the token does not match', async () => { + const acquired = await manager1.acquire(); + expect(acquired).to.be(true); + + // Simulate an interfering update that changes the token. + // (We do this by issuing an update directly to Elasticsearch.) + const newToken = uuid(); + await es.update({ + index: LOCKS_CONCRETE_INDEX_NAME, + id: LOCK_ID, + doc: { token: newToken }, + refresh: true, + }); + log.debug(`Updated lock token to: ${newToken}`); + + // Now manager1 still holds its old token. + // Its call to release() should find no document with its token and return false. + const releaseResult = await manager1.release(); + expect(releaseResult).to.be(false); + + // Verify that the lock still exists and now carries the interfering token. + const lock = await getLockById(es, LOCK_ID); + expect(lock).not.to.be(undefined); + expect(lock!.token).to.be(newToken); + + // cleanup + await clearAllLocks(es, log); + }); + + it('should use a fresh token on subsequent acquisitions', async () => { + const acquired1 = await manager1.acquire(); + expect(acquired1).to.be(true); + + // Get the current token. + const firstLock = await getLockById(es, LOCK_ID); + + // Release the lock. + const released = await manager1.release(); + expect(released).to.be(true); + + // Re-acquire the lock. + const acquired2 = await manager2.acquire(); + expect(acquired2).to.be(true); + + const secondLock = await getLockById(es, LOCK_ID); + expect(secondLock!.token).not.to.be(firstLock!.token); + }); }); }); - describe('withLock', () => { + describe('withLock API', function () { + this.tags(['failsOnMKI']); + before(async () => { + await clearAllLocks(es, log); + }); + const LOCK_ID = 'my_lock_with_lock'; describe('Successful execution and concurrent calls', () => { @@ -293,43 +536,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon await preAcquirer.release(); }); }); - }); - - describe('TTL extension', () => { - let lockManager: LockManager; - - const LOCK_ID = 'my_lock_with_ttl_extension'; - - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); - }); - - describe('when the lock is manually handled', () => { - afterEach(async () => { - await lockManager.release(); - }); - - it('should extend the TTL when `extendTtl` is called', async () => { - // Acquire the lock with a very short TTL (e.g. 1 second). - const acquired = await lockManager.acquire({ ttl: 1000 }); - expect(acquired).to.be(true); - - const lockExpiryBefore = (await getLockById(es, LOCK_ID))!.expiresAt; - - // Extend the TTL - const extended = await lockManager.extendTtl(); - expect(extended).to.be(true); - - const lockExpiryAfter = (await getLockById(es, LOCK_ID))!.expiresAt; - - // Verify that the new expiration is later than before. - expect(new Date(lockExpiryAfter).getTime()).to.be.greaterThan( - new Date(lockExpiryBefore).getTime() - ); - }); - }); - describe('withLock', () => { + describe('Extending TTL', () => { let lockExpiryBefore: string | undefined; let lockExpiryAfter: string | undefined; let result: string | undefined; @@ -337,9 +545,9 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon before(async () => { // Use a very short TTL (1 second) so that without extension the lock would expire. // The withLock helper extends the TTL periodically. - result = await withLock({ lockId: LOCK_ID, esClient: es, logger, ttl: 100 }, async () => { + result = await withLock({ lockId: LOCK_ID, esClient: es, logger, ttl: 500 }, async () => { lockExpiryBefore = (await getLockById(es, LOCK_ID))?.expiresAt; - await sleep(500); // Simulate a long-running operation + await sleep(600); // Simulate a long-running operation lockExpiryAfter = (await getLockById(es, LOCK_ID))?.expiresAt; return 'done'; }); @@ -354,8 +562,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(lockExpiryAfter).not.to.be(undefined); // Verify that the new expiration is later than before. - expect(new Date(lockExpiryAfter!).getTime()).to.be.greaterThan( - new Date(lockExpiryBefore!).getTime() + expect(dateAsTimestamp(lockExpiryAfter!)).to.be.greaterThan( + dateAsTimestamp(lockExpiryBefore!) ); }); @@ -366,132 +574,225 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(lock).to.be(undefined); }); }); - }); - describe('Concurrency and race conditions', () => { - const LOCK_ID = 'my_lock_with_concurrency'; + describe('when waiting for lock to be available using pRetry and it times out', () => { + const RETRY_LOCK_ID = 'my_lock_with_retry'; + let retries = 0; + let error: Error | undefined; + let lm: LockManager; - it('should allow only one lock acquisition among many concurrent attempts', async () => { - const lockManagers = await Promise.all( - times(20).map(() => new LockManager(LOCK_ID, es, logger)) - ); + before(async () => { + lm = new LockManager(RETRY_LOCK_ID, es, logger); + const acquired = await lm.acquire(); + expect(acquired).to.be(true); - const acquireAttempts = await Promise.all(lockManagers.map((lm) => lm.acquire())); - const releaseAttempts = await Promise.all(lockManagers.map((lm) => lm.release())); + try { + await pRetry( + async () => { + retries++; + await withLock( + { lockId: RETRY_LOCK_ID, esClient: es, logger }, + async () => 'should not execute' + ); + }, + { minTimeout: 50, maxTimeout: 50, retries: 2 } + ); + } catch (err) { + error = err; + } + }); + + after(async () => { + await lm.release(); + }); + + it('invokes withLock 3 times', async () => { + expect(retries).to.be(3); + }); + + it('throws a LockAcquisitionError', () => { + expect(error?.name).to.be('LockAcquisitionError'); + }); - expect(acquireAttempts.filter((v) => v === true)).to.have.length(1); - expect(releaseAttempts.filter((v) => v === true)).to.have.length(1); + it('throws a LockAcquisitionError with a message', () => { + expect(error?.message).to.contain(`Lock "${RETRY_LOCK_ID}" not acquired`); + }); }); - it('should handle concurrent release and acquisition without race conditions', async () => { - const initialManager = new LockManager(LOCK_ID, es, logger); - const acquired = await initialManager.acquire(); - expect(acquired).to.be(true); + describe('when waiting for lock to be available using pRetry and does not time out', () => { + const RETRY_LOCK_ID = 'my_lock_with_retry'; + let retries = 0; + let res: string | undefined; - const attempts = await Promise.all( - times(20).map(async () => { - const releaseResult = new LockManager(LOCK_ID, es, logger).release(); - const acquireResult = new LockManager(LOCK_ID, es, logger).acquire(); + before(async () => { + const lm = new LockManager(RETRY_LOCK_ID, es, logger); + const acquired = await lm.acquire(); + expect(acquired).to.be(true); - const [release, acquire] = await Promise.all([releaseResult, acquireResult]); - return { release, acquire }; - }) - ); + setTimeout(() => lm.release(), 100); + + await pRetry( + async () => { + retries++; + res = await withLock( + { lockId: RETRY_LOCK_ID, esClient: es, logger }, + async () => 'should execute' + ); + }, + { minTimeout: 50, maxTimeout: 50, retries: 5 } + ); + }); - expect(attempts.filter((r) => r.acquire === true)).to.have.length(0); - expect(attempts.filter((r) => r.release === true)).to.have.length(0); + it('retries calling withLock multiple times', async () => { + expect(retries).to.be.greaterThan(1); + }); - // Finally, confirm that the lock still exists - const lock = await getLockById(es, LOCK_ID); - expect(lock).not.to.be(undefined); + it('returns the result', () => { + expect(res).to.be('should execute'); + }); - // cleanup - await initialManager.release(); + it('releases the lock', async () => { + const lock = await getLockById(es, RETRY_LOCK_ID); + expect(lock).to.be(undefined); + }); }); }); - describe('Token fencing', () => { - let manager1: LockManager; - let manager2: LockManager; + describe('index assets', () => { + describe('when lock index is created with incorrect mappings', () => { + before(async () => { + await deleteLockIndexAssets(es, log); + await es.index({ + refresh: true, + index: LOCKS_CONCRETE_INDEX_NAME, + id: 'my_lock_with_incorrect_mappings', + document: { + token: 'my token', + expiresAt: new Date(Date.now() + 100000), + createdAt: new Date(), + metadata: { foo: 'bar' }, + }, + }); + }); - const LOCK_ID = 'my_lock_with_token_fencing'; + it('should delete the index and re-create it', async () => { + const mappingsBefore = await getMappings(es); + log.debug(`Mappings before: ${JSON.stringify(mappingsBefore)}`); + expect(mappingsBefore.properties?.token.type).to.eql('text'); - beforeEach(async () => { - manager1 = new LockManager(LOCK_ID, es, logger); - manager2 = new LockManager(LOCK_ID, es, logger); - }); + // Simulate a scenario where the index mappings are incorrect and a lock is added + // it should delete the index and re-create it with the correct mappings + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); - afterEach(async () => { - await manager1.release(); - await manager2.release(); + const mappingsAfter = await getMappings(es); + log.debug(`Mappings after: ${JSON.stringify(mappingsAfter)}`); + expect(mappingsAfter.properties?.token.type).to.be('keyword'); + }); }); - it('should not release the lock if the token does not match', async () => { - const acquired = await manager1.acquire(); - expect(acquired).to.be(true); - - // Simulate an interfering update that changes the token. - // (We do this by issuing an update directly to Elasticsearch.) - const newToken = uuid(); - await es.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: LOCK_ID, - doc: { token: newToken }, - refresh: true, - }); - log.debug(`Updated lock token to: ${newToken}`); - - // Now manager1 still holds its old token. - // Its call to release() should find no document with its token and return false. - const releaseResult = await manager1.release(); - expect(releaseResult).to.be(false); - - // Verify that the lock still exists and now carries the interfering token. - const lock = await getLockById(es, LOCK_ID); - expect(lock).not.to.be(undefined); - expect(lock!.token).to.be(newToken); - - // cleanup - await clearAllLocks(es); + describe('when lock index is created with correct mappings', () => { + before(async () => { + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); + + // wait for the index to be created + await es.indices.refresh({ index: LOCKS_CONCRETE_INDEX_NAME }); + }); + + it('should have the correct mappings for the lock index', async () => { + const mappings = await getMappings(es); + + const expectedMapping = { + dynamic: 'false', + properties: { + token: { type: 'keyword' }, + expiresAt: { type: 'date' }, + createdAt: { type: 'date' }, + metadata: { enabled: false, type: 'object' }, + }, + }; + + expect(mappings).to.eql(expectedMapping); + }); + + it('has the right number_of_replicas', async () => { + const settings = await getSettings(es); + expect(settings?.index?.auto_expand_replicas).to.eql('0-1'); + }); + + it('does not delete the index when adding a new lock', async () => { + const settingsBefore = await getSettings(es); + + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); + + const settingsAfter = await getSettings(es); + expect(settingsAfter?.uuid).to.be(settingsBefore?.uuid); + }); }); - it('should use a fresh token on subsequent acquisitions', async () => { - const acquired1 = await manager1.acquire(); - expect(acquired1).to.be(true); + describe('when setting up index assets', () => { + beforeEach(async () => { + await deleteLockIndexAssets(es, log); + }); - // Get the current token. - const firstLock = await getLockById(es, LOCK_ID); + it('can run in parallel', async () => { + try { + await Promise.all([ + setupLockManagerIndex(es, logger), + setupLockManagerIndex(es, logger), + setupLockManagerIndex(es, logger), + ]); + } catch (error) { + expect().fail(`Parallel setup should not throw but got error: ${error.message}`); + } - // Release the lock. - const released = await manager1.release(); - expect(released).to.be(true); + const indexExists = await es.indices.exists({ index: LOCKS_CONCRETE_INDEX_NAME }); + expect(indexExists).to.be(true); + }); - // Re-acquire the lock. - const acquired2 = await manager2.acquire(); - expect(acquired2).to.be(true); + it('can run in sequence', async () => { + try { + await setupLockManagerIndex(es, logger); + await setupLockManagerIndex(es, logger); + await setupLockManagerIndex(es, logger); + } catch (error) { + expect().fail(`Sequential setup should not throw but got error: ${error.message}`); + } - const secondLock = await getLockById(es, LOCK_ID); - expect(secondLock!.token).not.to.be(firstLock!.token); + const indexExists = await es.indices.exists({ index: LOCKS_CONCRETE_INDEX_NAME }); + expect(indexExists).to.be(true); + }); }); }); }); } -function sleep(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -function clearAllLocks(es: Client) { - return es.deleteByQuery( - { - index: LOCKS_CONCRETE_INDEX_NAME, - query: { match_all: {} }, - refresh: true, - }, +async function deleteLockIndexAssets(es: Client, log: ToolingLog) { + log.debug(`Deleting index assets`); + await es.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [404] }); + await es.indices.deleteIndexTemplate({ name: LOCKS_INDEX_TEMPLATE_NAME }, { ignore: [404] }); + await es.cluster.deleteComponentTemplate( + { name: LOCKS_COMPONENT_TEMPLATE_NAME }, { ignore: [404] } ); } +function clearAllLocks(es: Client, log: ToolingLog) { + try { + return es.deleteByQuery( + { + index: LOCKS_CONCRETE_INDEX_NAME, + query: { match_all: {} }, + refresh: true, + conflicts: 'proceed', + }, + { ignore: [404] } + ); + } catch (e) { + log.error(`Failed to clear locks: ${e.message}`); + log.debug(e); + } +} + async function getLocks(es: Client) { const res = await es.search({ index: LOCKS_CONCRETE_INDEX_NAME, @@ -519,15 +820,22 @@ async function outputLocks(es: Client, log: ToolingLog, name?: string) { } async function getLockById(esClient: Client, lockId: LockId): Promise { - const res = await esClient.search( - { - index: LOCKS_CONCRETE_INDEX_NAME, - query: { - bool: { filter: [{ term: { _id: lockId } }] }, - }, - }, + const res = await esClient.get( + { index: LOCKS_CONCRETE_INDEX_NAME, id: lockId }, { ignore: [404] } ); - return res.hits.hits[0]?._source; + return res._source; +} + +async function getMappings(es: Client) { + const res = await es.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME }); + const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME]; + return mappings; +} + +async function getSettings(es: Client) { + const res = await es.indices.getSettings({ index: LOCKS_CONCRETE_INDEX_NAME }); + const { settings } = res[LOCKS_CONCRETE_INDEX_NAME]; + return settings; } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts index ad0bbb46869da..d7f318d9244b7 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts @@ -28,9 +28,16 @@ export default function aiAssistantApiIntegrationTests({ loadTestFile(require.resolve('./public_complete/public_complete.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_setup.spec.ts')); loadTestFile( - require.resolve('./knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts') + require.resolve( + './knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts' + ) ); - loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex.spec.ts')); + loadTestFile( + require.resolve( + './knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts' + ) + ); + loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex_concurrency.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_status.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_user_instructions.spec.ts')); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts similarity index 94% rename from x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts rename to x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts index 5e71ab748b9d9..94c0fcccef5fa 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts @@ -63,7 +63,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon return res.hits.hits; } - describe('When there are knowledge base entries (from 8.15 or earlier) that does not contain semantic_text embeddings', function () { + describe('when the knowledge base index was created before 8.15', function () { // Intentionally skipped in all serverless environnments (local and MKI) // because the migration scenario being tested is not relevant to MKI and Serverless. this.tags(['skipServerless']); @@ -92,7 +92,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('after migrating', () => { before(async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', }); expect(status).to.be(200); }); @@ -131,7 +132,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon it('returns entries correctly via API', async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', }); expect(status).to.be(200); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts new file mode 100644 index 0000000000000..9051b00261d28 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { times } from 'lodash'; +import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; +import { + deleteKnowledgeBaseModel, + setupKnowledgeBase, + deleteKbIndices, + addSampleDocsToInternalKb, +} from '../utils/knowledge_base'; +import { createOrUpdateIndexAssets } from '../utils/index_assets'; +import { animalSampleDocs } from '../utils/sample_docs'; + +export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { + const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); + const es = getService('es'); + + describe('POST /internal/observability_ai_assistant/kb/reindex', function () { + // Intentionally skipped in all serverless environnments (local and MKI) + // because the migration scenario being tested is not relevant to MKI and Serverless. + this.tags(['skipServerless']); + + before(async () => { + await deleteKbIndices(es); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + await setupKnowledgeBase(getService); + }); + + after(async () => { + await deleteKbIndices(es); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + await deleteKnowledgeBaseModel(getService); + }); + + describe('when running multiple re-index operations in parallel', () => { + let results: Array<{ + status: number; + errorMessage: string | undefined; + }>; + + before(async () => { + await addSampleDocsToInternalKb(getService, animalSampleDocs); + + results = await Promise.all(times(20).map(() => reIndexKnowledgeBase())); + }); + + it('makes 20 requests to the reindex endpoint', async () => { + expect(results).to.have.length(20); + }); + + it('only one request should succeed', async () => { + const successResults = results.filter((result) => result.status === 200); + expect(successResults).to.have.length(1); + }); + + it('should fail every request but 1', async () => { + const failures = results.filter((result) => result.status !== 200); + expect(failures).to.have.length(19); + }); + + it('throw a LockAcquisitionException for the failing requests', async () => { + const failures = results.filter((result) => result.status === 500); + const errorMessages = failures.every( + (result) => + result.errorMessage === 'Lock "observability_ai_assistant:kb_reindexing" not acquired' + ); + + expect(errorMessages).to.be(true); + }); + }); + + describe('when running multiple re-index operations in sequence', () => { + let results: Array<{ status: number; result: boolean; errorMessage: string | undefined }>; + + before(async () => { + results = []; + for (const _ of times(20)) { + results.push(await reIndexKnowledgeBase()); + } + }); + + it('makes 20 requests', async () => { + expect(results).to.have.length(20); + }); + + it('every re-index operation succeeds', async () => { + const successResults = results.filter((result) => result.status === 200); + expect(successResults).to.have.length(20); + expect(successResults.every((r) => r.result === true)).to.be(true); + }); + + it('no requests should fail', async () => { + const failures = results.filter((result) => result.status !== 200); + expect(failures).to.have.length(0); + }); + }); + }); + + async function reIndexKnowledgeBase() { + const res = await observabilityAIAssistantAPIClient.editor({ + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', + }); + + return { + status: res.status, + result: res.body.result, + errorMessage: 'message' in res.body ? (res.body.message as string) : undefined, + }; + } +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts similarity index 75% rename from x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts rename to x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts index d172da9970abb..e9afa6c29fe8f 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts @@ -11,7 +11,12 @@ import AdmZip from 'adm-zip'; import path from 'path'; import { AI_ASSISTANT_SNAPSHOT_REPO_PATH } from '../../../../default_configs/stateful.config.base'; import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; -import { deleteKnowledgeBaseModel, setupKnowledgeBase } from '../utils/knowledge_base'; +import { + deleteKbIndices, + deleteKnowledgeBaseModel, + setupKnowledgeBase, +} from '../utils/knowledge_base'; +import { createOrUpdateIndexAssets, restoreIndexAssets } from '../utils/index_assets'; export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); @@ -25,22 +30,17 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon this.tags(['skipServerless']); before(async () => { - const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`; - log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`); - new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true); - + await unZipKbSnapshot(); await setupKnowledgeBase(getService); }); beforeEach(async () => { - await deleteKbIndex(); await restoreKbSnapshot(); - await createOrUpdateIndexAssets(); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); }); after(async () => { - await deleteKbIndex(); - await createOrUpdateIndexAssets(); + await restoreIndexAssets(observabilityAIAssistantAPIClient, es); await deleteKnowledgeBaseModel(getService); }); @@ -68,14 +68,14 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon // @ts-expect-error expect(body.message).to.eql( - 'The knowledge base is currently being re-indexed. Please try again later' + 'The index ".kibana-observability-ai-assistant-kb" does not support semantic text and must be reindexed. This re-index operation has been scheduled and will be started automatically. Please try again later.' ); expect(status).to.be(503); }); it('can add new entries after re-indexing', async () => { - await runKbSemanticTextMigration(); + await reIndexKnowledgeBase(); await retry.try(async () => { const { status } = await createKnowledgeBaseEntry(); @@ -93,16 +93,15 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon return parseInt(settings?.index?.version?.created ?? '', 10); } - async function deleteKbIndex() { - log.debug('Deleting KB index'); - - await es.indices.delete( - { index: resourceNames.concreteIndexName.kb, ignore_unavailable: true }, - { ignore: [404] } - ); + async function unZipKbSnapshot() { + const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`; + log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`); + new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true); } async function restoreKbSnapshot() { + await deleteKbIndices(es); + log.debug( `Restoring snapshot of ${resourceNames.concreteIndexName.kb} from ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}` ); @@ -128,16 +127,9 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon await es.snapshot.deleteRepository({ name: snapshotRepoName }); } - async function createOrUpdateIndexAssets() { - const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/index_assets', - }); - expect(status).to.be(200); - } - - async function runKbSemanticTextMigration() { + async function reIndexKnowledgeBase() { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', }); expect(status).to.be(200); } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts new file mode 100644 index 0000000000000..892b99079bc98 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const animalSampleDocs = [ + { + id: 'animal_elephants_social_structure', + title: 'Elephants and Their Social Structure', + text: 'Elephants are highly social animals that live in matriarchal herds led by the oldest female. These animals communicate through low-frequency sounds, called infrasound, that travel long distances. They are known for their intelligence, strong memory, and deep emotional bonds with each other.', + }, + { + id: 'animal_cheetah_life_speed', + title: 'The Life of a Cheetah', + text: 'Cheetahs are the fastest land animals, capable of reaching speeds up to 60 miles per hour in short bursts. They rely on their speed to catch prey, such as gazelles. Unlike other big cats, cheetahs cannot roar, but they make distinctive chirping sounds, especially when communicating with their cubs.', + }, + { + id: 'animal_whale_migration_patterns', + title: 'Whales and Their Migration Patterns', + text: 'Whales are known for their long migration patterns, traveling thousands of miles between feeding and breeding grounds.', + }, + { + id: 'animal_giraffe_habitat_feeding', + title: 'Giraffes: Habitat and Feeding Habits', + text: 'Giraffes are the tallest land animals, with long necks that help them reach leaves high up in trees. They live in savannas and grasslands, where they feed on leaves, twigs, and fruits from acacia trees.', + }, + { + id: 'animal_penguin_antarctic_adaptations', + title: 'Penguins and Their Antarctic Adaptations', + text: 'Penguins are flightless birds that have adapted to life in the cold Antarctic environment. They have a thick layer of blubber to keep warm, and their wings have evolved into flippers for swimming in the icy waters.', + }, +]; + +export const technicalSampleDocs = [ + { + id: 'technical_db_outage_slow_queries', + title: 'Database Outage: Slow Query Execution', + text: 'At 03:15 AM UTC, the production database experienced a significant outage, leading to slow query execution and increased response times across multiple services. A surge in database load was detected, with 90% of queries exceeding 2 seconds. A detailed log analysis pointed to locking issues within the transaction queue and inefficient index usage.', + }, + { + id: 'technical_api_gateway_timeouts', + title: 'Service Timeout: API Gateway Bottleneck', + text: 'At 10:45 AM UTC, the API Gateway encountered a timeout issue, causing a 500 error for all incoming requests. Detailed traces indicated a significant bottleneck at the gateway level, where requests stalled while waiting for upstream service responses. The upstream service was overwhelmed due to a sudden spike in inbound traffic and failed to release resources promptly.', + }, + { + id: 'technical_cache_misses_thirdparty_api', + title: 'Cache Misses and Increased Latency: Third-Party API Failure', + text: 'At 04:30 PM UTC, a dramatic increase in cache misses and latency was observed. The failure of a third-party API prevented critical data from being cached, leading to unnecessary re-fetching of resources from external sources. This caused significant delays in response times, with up to 10-second delays in some key services.', + }, +]; diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts new file mode 100644 index 0000000000000..d91b60d9f991a --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { duration, unitOfTime } from 'moment'; + +export function durationAsMs(value: number, unit: unitOfTime.DurationConstructor) { + return duration(value, unit).asMilliseconds(); +} + +export function dateAsTimestamp(value: string) { + return new Date(value).getTime(); +} + +export function sleep(value: number) { + return new Promise((resolve) => setTimeout(resolve, value)); +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index b5b3c7e652bf9..e8e6f5bf016ce 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -159,7 +159,6 @@ export default function ({ getService }: FtrProviderContext) { 'fleet:upgrade-agentless-deployments-task', 'fleet:upgrade_action:retry', 'logs-data-telemetry', - 'obs-ai-assistant:knowledge-base-migration', 'osquery:telemetry-configs', 'osquery:telemetry-packs', 'osquery:telemetry-saved-queries', diff --git a/x-pack/test/tsconfig.json b/x-pack/test/tsconfig.json index 85c40aaf3752c..d34a4af8b8d40 100644 --- a/x-pack/test/tsconfig.json +++ b/x-pack/test/tsconfig.json @@ -199,6 +199,7 @@ "@kbn/aiops-change-point-detection", "@kbn/es-errors", "@kbn/content-packs-schema", + "@kbn/lock-manager", "@kbn/alerts-ui-shared", ] } diff --git a/yarn.lock b/yarn.lock index 7540a8c0f5e7e..ee717ec78aa6b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6006,6 +6006,10 @@ version "0.0.0" uid "" +"@kbn/lock-manager@link:packages/kbn-lock-manager": + version "0.0.0" + uid "" + "@kbn/logging-mocks@link:src/platform/packages/shared/kbn-logging-mocks": version "0.0.0" uid ""