From 6e9b7cfc39398555b83bb9bda2304e34a92ab0ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Wed, 2 Apr 2025 11:28:35 +0200 Subject: [PATCH 1/7] [Obs AI Assistant] Skip lock tests in MKI temporarily (#216753) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests added in https://github.com/elastic/kibana/pull/216397 are failing on MKI. Skipping temporarily in the affected environment ### Error ``` └- ✖ fail: Serverless Observability - Deployment-agnostic API integration tests observability AI Assistant LockManager Basic lock operations acquires the lock when not held │ ResponseError: security_exception │ Root causes: │ security_exception: action [indices:admin/create] is unauthorized for user [testing-internal] with effective roles [superuser] on restricted indices [.kibana_locks-000001], this action is granted by the index privileges [create_index,manage,all] ``` ### Root cause ```ts const es = getService('es'); es.deleteByQuery({ index: '.kibana_locks-000001', query: { match_all: {} }}); ``` (cherry picked from commit 8bcce2e89b6be758d144b9b802eb9d2a09dd62cf) --- .../distributed_lock_manager/distributed_lock_manager.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index 6c84b86d33b33..d0054856f633a 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -28,6 +28,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const logger = getLoggerMock(log); describe('LockManager', function () { + this.tags(['failsOnMKI']); before(async () => { await clearAllLocks(es); await ensureTemplatesAndIndexCreated(es); From 2f4c77009672788fed5e520cc20471a86b3fdfe0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Tue, 8 Apr 2025 10:13:00 +0200 Subject: [PATCH 2/7] [LockManager] Ensure index template are created (#218901) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes: https://github.com/elastic/kibana/issues/218944 The index template for the Lock Manager was not created, causing index mappings and settings to be incorrect. Root cause: the function responsible for creating the index template (`ensureTemplatesAndIndexCreated`) was never invoked - only during automated testing 🤦 The mappings for the lock manager index (`.kibana_locks-000001`) should match this: ```ts { mappings: { dynamic: false, properties: { token: { type: 'keyword' }, metadata: { enabled: false }, createdAt: { type: 'date' }, expiresAt: { type: 'date' }, }, }, } ``` In this test we make sure that the LockManager library can recover and fix the mappings if the existing index has invalid mappings ``` DELETE .kibana_locks-000001 DELETE _index_template/.kibana_locks-index-template DELETE _component_template/.kibana_locks-component ``` correct mappings ``` PUT .kibana_locks-000001 ``` (Restart Kibana) ``` GET .kibana_locks-000001/_mapping ``` In this test we make sure that out of the box, the LockManager library creates an index with the correct mappings ``` DELETE .kibana_locks-000001 DELETE _index_template/.kibana_locks-index-template DELETE _component_template/.kibana_locks-component ``` (Restart Kibana) ``` GET .kibana_locks-000001/_mapping ``` Related: https://github.com/elastic/kibana/pull/216916 https://github.com/elastic/kibana/pull/216397 --------- Co-authored-by: Viduni Wickramarachchi (cherry picked from commit f684ea4071dfb5e6acc4cb057b46acbf915943d7) --- .../server/plugin.ts | 32 +- .../server/routes/knowledge_base/route.ts | 18 +- .../server/routes/top_level/route.ts | 2 +- .../server/service/client/index.ts | 36 +- .../lock_manager_client.ts | 226 +++--- .../lock_manager_service.ts | 4 +- .../server/service/index.ts | 2 +- .../server/service/inference_endpoint.ts | 76 +- .../service/knowledge_base_service/index.ts | 34 +- .../reindex_knowledge_base.ts | 54 +- .../create_or_update_index_assets.ts | 8 +- ...e_missing_semantic_text_field_migration.ts | 184 +++++ ...egister_kb_semantic_text_migration_task.ts | 233 ------ .../server/task_type_dictionary.ts | 3 + .../complete/functions/recall.spec.ts | 55 +- .../distributed_lock_manager.spec.ts | 722 +++++++++++------- .../apis/observability/ai_assistant/index.ts | 11 +- ...late_missing_semantic_text_fields.spec.ts} | 8 +- ...knowledge_base_reindex_concurrency.spec.ts | 116 +++ ...ndex_to_fix_sparse_vector_support.spec.ts} | 46 +- .../ai_assistant/utils/sample_docs.ts | 52 ++ .../observability/ai_assistant/utils/time.ts | 20 + .../check_registered_task_types.ts | 1 - 23 files changed, 1114 insertions(+), 829 deletions(-) rename x-pack/platform/plugins/shared/observability_ai_assistant/server/service/{ => startup_migrations}/create_or_update_index_assets.ts (95%) create mode 100644 x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts delete mode 100644 x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts rename x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/{knowledge_base_add_semantic_text_field_migration.spec.ts => knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts} (94%) create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts rename x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/{knowledge_base_reindex.spec.ts => knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts} (75%) create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts index 54ef970bd2dcc..8e96245d2a097 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts @@ -31,8 +31,8 @@ import { registerFunctions } from './functions'; import { recallRankingEvent } from './analytics/recall_ranking'; import { initLangtrace } from './service/client/instrumentation/init_langtrace'; import { aiAssistantCapabilities } from '../common/capabilities'; -import { registerAndScheduleKbSemanticTextMigrationTask } from './service/task_manager_definitions/register_kb_semantic_text_migration_task'; -import { updateExistingIndexAssets } from './service/create_or_update_index_assets'; +import { populateMissingSemanticTextFieldMigration } from './service/startup_migrations/populate_missing_semantic_text_field_migration'; +import { updateExistingIndexAssets } from './service/startup_migrations/create_or_update_index_assets'; export class ObservabilityAIAssistantPlugin implements @@ -128,23 +128,19 @@ export class ObservabilityAIAssistantPlugin })); // Update existing index assets (mappings, templates, etc). This will not create assets if they do not exist. - const indexAssetsUpdatedPromise = updateExistingIndexAssets({ - logger: this.logger.get('index_assets'), - core, - }).catch((e) => this.logger.error(`Index assets could not be updated: ${e.message}`)); - - // register task to migrate knowledge base entries to include semantic_text field - registerAndScheduleKbSemanticTextMigrationTask({ - core, - taskManager: plugins.taskManager, - logger: this.logger.get('kb_semantic_text_migration_task'), - config: this.config, - indexAssetsUpdatedPromise, - }).catch((e) => - this.logger.error( - `Knowledge base semantic_text migration task could not be registered: ${e.message}` + updateExistingIndexAssets({ logger: this.logger, core }) + .then(() => + populateMissingSemanticTextFieldMigration({ + core, + logger: this.logger, + config: this.config, + }) ) - ); + .catch((e) => + this.logger.error( + `Error during knowledge base migration in AI Assistant plugin startup: ${e.message}` + ) + ); service.register(registerFunctions); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts index e9c7078c0062e..9957a68178ab3 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts @@ -100,8 +100,23 @@ const resetKnowledgeBase = createObservabilityAIAssistantServerRoute({ }, }); +const reIndexKnowledgeBase = createObservabilityAIAssistantServerRoute({ + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', + security: { + authz: { + requiredPrivileges: ['ai_assistant'], + }, + }, + handler: async (resources): Promise<{ result: boolean }> => { + const client = await resources.service.getClient({ request: resources.request }); + const result = await client.reIndexKnowledgeBaseWithLock(); + return { result }; + }, +}); + const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerRoute({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', security: { authz: { requiredPrivileges: ['ai_assistant'], @@ -320,6 +335,7 @@ const importKnowledgeBaseEntries = createObservabilityAIAssistantServerRoute({ }); export const knowledgeBaseRoutes = { + ...reIndexKnowledgeBase, ...semanticTextMigrationKnowledgeBase, ...setupKnowledgeBase, ...resetKnowledgeBase, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts index dc817f378263f..b56b2e1f07bd2 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { createOrUpdateIndexAssets } from '../../service/create_or_update_index_assets'; +import { createOrUpdateIndexAssets } from '../../service/startup_migrations/create_or_update_index_assets'; import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route'; const createOrUpdateIndexAssetsRoute = createObservabilityAIAssistantServerRoute({ diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts index c3698c55396cd..10447b8beb0f4 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts @@ -67,14 +67,12 @@ import { continueConversation } from './operators/continue_conversation'; import { convertInferenceEventsToStreamingEvents } from './operators/convert_inference_events_to_streaming_events'; import { extractMessages } from './operators/extract_messages'; import { getGeneratedTitle } from './operators/get_generated_title'; -import { - reIndexKnowledgeBaseAndPopulateSemanticTextField, - scheduleKbSemanticTextMigrationTask, -} from '../task_manager_definitions/register_kb_semantic_text_migration_task'; +import { populateMissingSemanticTextFieldMigration } from '../startup_migrations/populate_missing_semantic_text_field_migration'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; import { getElserModelId } from '../knowledge_base_service/get_elser_model_id'; import { apmInstrumentation } from './operators/apm_instrumentation'; +import { reIndexKnowledgeBaseWithLock } from '../knowledge_base_service/reindex_knowledge_base'; const MAX_FUNCTION_CALLS = 8; @@ -684,14 +682,15 @@ export class ObservabilityAIAssistantClient { // setup the knowledge base const res = await knowledgeBaseService.setup(esClient, modelId); - core - .getStartServices() - .then(([_, pluginsStart]) => - scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }) - ) - .catch((error) => { - logger.error(`Failed to schedule semantic text migration task: ${error}`); - }); + populateMissingSemanticTextFieldMigration({ + core, + logger, + config: this.dependencies.config, + }).catch((e) => { + this.dependencies.logger.error( + `Failed to populate missing semantic text fields: ${e.message}` + ); + }); return res; }; @@ -701,14 +700,21 @@ export class ObservabilityAIAssistantClient { return this.dependencies.knowledgeBaseService.reset(esClient); }; - reIndexKnowledgeBaseAndPopulateSemanticTextField = () => { - return reIndexKnowledgeBaseAndPopulateSemanticTextField({ + reIndexKnowledgeBaseWithLock = () => { + return reIndexKnowledgeBaseWithLock({ + core: this.dependencies.core, esClient: this.dependencies.esClient, logger: this.dependencies.logger, - config: this.dependencies.config, }); }; + reIndexKnowledgeBaseAndPopulateSemanticTextField = () => { + return populateMissingSemanticTextFieldMigration({ + core: this.dependencies.core, + logger: this.dependencies.logger, + config: this.dependencies.config, + }); + }; addUserInstruction = async ({ entry, }: { diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts index 8fedfd81d978e..4d19684ee2f7c 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts @@ -9,7 +9,6 @@ import { errors } from '@elastic/elasticsearch'; import { Logger } from '@kbn/logging'; import { v4 as uuid } from 'uuid'; -import pRetry from 'p-retry'; import prettyMilliseconds from 'pretty-ms'; import { once } from 'lodash'; import { duration } from 'moment'; @@ -33,7 +32,7 @@ export interface AcquireOptions { */ metadata?: Record; /** - * Time to live (TTL) for the lock in milliseconds. Default is 5 minutes. + * Time to live (TTL) for the lock in milliseconds. Default is 30 seconds. * When a lock expires it can be acquired by another process */ ttl?: number; @@ -56,165 +55,167 @@ export class LockManager { */ public async acquire({ metadata = {}, - ttl = duration(5, 'minutes').asMilliseconds(), + ttl = duration(30, 'seconds').asMilliseconds(), }: AcquireOptions = {}): Promise { + let response: Awaited>; await createLocksWriteIndexOnce(this.esClient); this.token = uuid(); - this.logger.debug( - `Acquiring lock "${this.lockId}" with ttl = ${prettyMilliseconds(ttl)} and token = ${ - this.token - }` - ); try { - const response = await this.esClient.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: this.lockId, - scripted_upsert: true, - script: { - lang: 'painless', - source: ` + response = await this.esClient.update( + { + index: LOCKS_CONCRETE_INDEX_NAME, + id: this.lockId, + scripted_upsert: true, + script: { + lang: 'painless', + source: ` // Get the current time on the ES server. long now = System.currentTimeMillis(); - // If creating the document or if the lock is expired, update it. - if (ctx.op == 'create' || Instant.parse(ctx._source.expiresAt).toEpochMilli() < now) { + // If creating the document, or if the lock is expired, + // or if the current document is owned by the same token, then update it. + if (ctx.op == 'create' || + Instant.parse(ctx._source.expiresAt).toEpochMilli() < now || + ctx._source.token == params.token) { def instantNow = Instant.ofEpochMilli(now); ctx._source.createdAt = instantNow.toString(); ctx._source.expiresAt = instantNow.plusMillis(params.ttl).toString(); } else { - ctx.op = 'noop' + ctx.op = 'noop'; } `, - params: { - ttl, + params: { + ttl, + token: this.token, + }, + }, + // @ts-expect-error + upsert: { + metadata, + token: this.token, }, }, - // @ts-expect-error - upsert: { - metadata, - token: this.token, - }, - refresh: true, - }); + { + retryOnTimeout: true, + maxRetries: 3, + } + ); + } catch (e) { + if (isVersionConflictException(e)) { + this.logger.debug(`Lock "${this.lockId}" already held (version conflict)`); + return false; + } - if (response.result === 'created') { + this.logger.error(`Failed to acquire lock "${this.lockId}": ${e.message}`); + return false; + } + + switch (response.result) { + case 'created': { this.logger.debug( - `Lock "${this.lockId}" acquired with ttl = ${prettyMilliseconds(ttl)} and token = ${ + `Lock "${this.lockId}" with token = ${ this.token - }` + } acquired with ttl = ${prettyMilliseconds(ttl)}` ); return true; - } else if (response.result === 'updated') { + } + case 'updated': { this.logger.debug( - ` Lock "${this.lockId}" was expired and re-acquired with ttl = ${prettyMilliseconds( + `Lock "${this.lockId}" was expired and re-acquired with ttl = ${prettyMilliseconds( ttl )} and token = ${this.token}` ); return true; - } else if (response.result === 'noop') { + } + case 'noop': { this.logger.debug( - `Lock "${this.lockId}" could not be acquired with token ${this.token} because it is already held` + `Lock "${this.lockId}" with token = ${this.token} could not be acquired. It is already held` ); return false; - } else { - throw new Error(`Unexpected response: ${response.result}`); } - } catch (e) { - if (isVersionConflictException(e)) { - this.logger.debug(`Lock "${this.lockId}" already held (version conflict)`); - return false; - } - - this.logger.error(`Failed to acquire lock "${this.lockId}": ${e.message}`); - this.logger.debug(e); - return false; } + + this.logger.warn(`Unexpected response: ${response.result}`); + return false; } /** * Releases the lock by deleting the document with the given lockId and token */ public async release(): Promise { + let response: Awaited>; try { - const response = await this.esClient.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: this.lockId, - scripted_upsert: false, - script: { - lang: 'painless', - source: ` + response = await this.esClient.update( + { + index: LOCKS_CONCRETE_INDEX_NAME, + id: this.lockId, + scripted_upsert: false, + script: { + lang: 'painless', + source: ` if (ctx._source.token == params.token) { ctx.op = 'delete'; } else { ctx.op = 'noop'; } `, - params: { token: this.token }, + params: { token: this.token }, + }, }, - refresh: true, - }); - - if (response.result === 'deleted') { - this.logger.debug(`Lock "${this.lockId}" released with token ${this.token}.`); - return true; - } else if (response.result === 'noop') { - this.logger.debug( - `Lock "${this.lockId}" was not released. Token ${this.token} does not match.` - ); - return false; - } else { - throw new Error(`Unexpected response: ${response.result}`); - } + { + retryOnTimeout: true, + maxRetries: 3, + } + ); } catch (error: any) { - if ( - error instanceof errors.ResponseError && - error.body?.error?.type === 'document_missing_exception' - ) { + if (isDocumentMissingException(error)) { this.logger.debug(`Lock "${this.lockId}" already released.`); return false; } this.logger.error(`Failed to release lock "${this.lockId}": ${error.message}`); - this.logger.debug(error); - return false; + throw error; } + + switch (response.result) { + case 'deleted': + this.logger.debug(`Lock "${this.lockId}" released with token ${this.token}.`); + return true; + case 'noop': + this.logger.debug( + `Lock "${this.lockId}" with token = ${this.token} could not be released. Token does not match.` + ); + return false; + } + + this.logger.warn(`Unexpected response: ${response.result}`); + return false; } /** * Retrieves the lock document for a given lockId. - * If the lock is expired, it will be released. + * If the lock is expired, it will not be returned */ public async get(): Promise { - const result = await this.esClient.search({ - index: LOCKS_CONCRETE_INDEX_NAME, - query: { - bool: { - filter: [{ term: { _id: this.lockId } }, { range: { expiresAt: { gt: 'now' } } }], - }, - }, - }); + const result = await this.esClient.get( + { index: LOCKS_CONCRETE_INDEX_NAME, id: this.lockId }, + { ignore: [404] } + ); - const hits = result.hits.hits; - return hits[0]?._source; - } + if (!result._source) { + return undefined; + } - public async acquireWithRetry({ - metadata, - ttl, - ...retryOptions - }: AcquireOptions & pRetry.Options = {}): Promise { - return pRetry(async () => { - const acquired = await this.acquire({ metadata, ttl }); - if (!acquired) { - this.logger.debug(`Lock "${this.lockId}" not available yet.`); - throw new Error(`Lock "${this.lockId}" not available yet`); - } - return acquired; - }, retryOptions ?? { forever: true, maxTimeout: 10_000 }); + const isExpired = new Date(result._source?.expiresAt).getTime() < Date.now(); + if (isExpired) { + return undefined; + } + + return result._source; } - public async extendTtl(ttl = 300000): Promise { + public async extendTtl(ttl: number): Promise { try { await this.esClient.update({ index: LOCKS_CONCRETE_INDEX_NAME, @@ -233,12 +234,11 @@ export class LockManager { token: this.token, }, }, - refresh: true, }); this.logger.debug(`Lock "${this.lockId}" extended ttl with ${prettyMilliseconds(ttl)}.`); return true; } catch (error) { - if (isVersionConflictException(error)) { + if (isVersionConflictException(error) || isDocumentMissingException(error)) { this.logger.debug(`Lock "${this.lockId}" was released concurrently. Not extending TTL.`); return false; } @@ -256,23 +256,16 @@ export async function withLock( logger, lockId, metadata, - ttl = duration(5, 'minutes').asMilliseconds(), - waitForLock = false, - retryOptions, + ttl = duration(30, 'seconds').asMilliseconds(), }: { esClient: ElasticsearchClient; logger: Logger; lockId: LockId; - waitForLock?: boolean; - retryOptions?: pRetry.Options; } & AcquireOptions, callback: () => Promise -): Promise { +): Promise { const lockManager = new LockManager(lockId, esClient, logger); - const acquired = - waitForLock ?? retryOptions - ? await lockManager.acquireWithRetry({ metadata, ttl, ...retryOptions }) - : await lockManager.acquire({ metadata, ttl }); + const acquired = await lockManager.acquire({ metadata, ttl }); if (!acquired) { logger.debug(`Lock "${lockId}" not acquired. Exiting.`); @@ -280,7 +273,7 @@ export async function withLock( } // extend the ttl periodically - const extendInterval = Math.floor(ttl / 2); + const extendInterval = Math.floor(ttl / 4); logger.debug( `Lock "${lockId}" acquired. Extending TTL every ${prettyMilliseconds(extendInterval)}` ); @@ -289,7 +282,7 @@ export async function withLock( const intervalId = setInterval(() => { // wait for the previous extendTtl request to finish before sending the next one. This is to avoid flooding ES with extendTtl requests in cases where ES is slow to respond. extendTTlPromise = extendTTlPromise - .then(() => lockManager.extendTtl()) + .then(() => lockManager.extendTtl(ttl)) .catch((err) => { logger.error(`Failed to extend lock "${lockId}":`, err); return false; @@ -305,6 +298,7 @@ export async function withLock( await lockManager.release(); } catch (error) { logger.error(`Failed to release lock "${lockId}" in withLock: ${error.message}`); + logger.debug(error); } } } @@ -344,7 +338,7 @@ export async function ensureTemplatesAndIndexCreated(esClient: ElasticsearchClie }); } -async function createLocksWriteIndex(esClient: ElasticsearchClient): Promise { +export async function createLocksWriteIndex(esClient: ElasticsearchClient): Promise { await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] }); } @@ -354,6 +348,10 @@ function isVersionConflictException(e: Error): boolean { ); } +function isDocumentMissingException(e: Error): boolean { + return e instanceof errors.ResponseError && e.body?.error?.type === 'document_missing_exception'; +} + export class LockAcquisitionError extends Error { constructor(message: string) { super(message); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts index a293083e4361a..da9cdfef5a811 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts @@ -27,16 +27,14 @@ export class LockManagerService { callback: () => Promise, { metadata, - waitForLock, }: { metadata?: Record; - waitForLock?: boolean; } = {} ) { const [coreStart] = await this.coreSetup.getStartServices(); const esClient = coreStart.elasticsearch.client.asInternalUser; const logger = this.logger.get('LockManager'); - return withLock({ esClient, logger, lockId, metadata, waitForLock }, callback); + return withLock({ esClient, logger, lockId, metadata }, callback); } } diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts index adc7ea2822747..62bf0ffb6c4e2 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts @@ -17,7 +17,7 @@ import { ObservabilityAIAssistantClient } from './client'; import { KnowledgeBaseService } from './knowledge_base_service'; import type { RegistrationCallback, RespondFunctionResources } from './types'; import { ObservabilityAIAssistantConfig } from '../config'; -import { createOrUpdateIndexAssets } from './create_or_update_index_assets'; +import { createOrUpdateIndexAssets } from './startup_migrations/create_or_update_index_assets'; function getResourceName(resource: string) { return `.kibana-observability-ai-assistant-${resource}`; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts index e6f9f38a202f1..4915570346b34 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/inference_endpoint.ts @@ -9,6 +9,11 @@ import { errors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import moment from 'moment'; +import pRetry from 'p-retry'; +import { + InferenceInferenceEndpointInfo, + MlGetTrainedModelsStatsResponse, +} from '@elastic/elasticsearch/lib/api/types'; import { ObservabilityAIAssistantConfig } from '../config'; export const AI_ASSISTANT_KB_INFERENCE_ID = 'obs_ai_assistant_kb_inference'; @@ -77,9 +82,11 @@ export async function getInferenceEndpoint({ inference_id: AI_ASSISTANT_KB_INFERENCE_ID, }); - if (response.endpoints.length > 0) { - return response.endpoints[0]; + if (response.endpoints.length === 0) { + throw new Error('Inference endpoint not found'); } + + return response.endpoints[0]; } export function isInferenceEndpointMissingOrUnavailable(error: Error) { @@ -90,7 +97,7 @@ export function isInferenceEndpointMissingOrUnavailable(error: Error) { ); } -export async function getElserModelStatus({ +export async function getKbModelStatus({ esClient, logger, config, @@ -99,40 +106,34 @@ export async function getElserModelStatus({ logger: Logger; config: ObservabilityAIAssistantConfig; }) { - let errorMessage = ''; - const endpoint = await getInferenceEndpoint({ - esClient, - }).catch((error) => { + const enabled = config.enableKnowledgeBase; + + let endpoint: InferenceInferenceEndpointInfo; + try { + endpoint = await getInferenceEndpoint({ esClient }); + } catch (error) { if (!isInferenceEndpointMissingOrUnavailable(error)) { throw error; } - errorMessage = error.message; - }); - - const enabled = config.enableKnowledgeBase; - if (!endpoint) { - return { ready: false, enabled, errorMessage }; + return { ready: false, enabled, errorMessage: error.message }; } - const modelId = endpoint.service_settings?.model_id; - const modelStats = await esClient.asInternalUser.ml - .getTrainedModelsStats({ model_id: modelId }) - .catch((error) => { - logger.debug(`Failed to get model stats: ${error.message}`); - errorMessage = error.message; + let trainedModelStatsResponse: MlGetTrainedModelsStatsResponse; + try { + trainedModelStatsResponse = await esClient.asInternalUser.ml.getTrainedModelsStats({ + model_id: endpoint.service_settings?.model_id, }); - - if (!modelStats) { - return { ready: false, enabled, errorMessage }; + } catch (error) { + logger.debug(`Failed to get model stats: ${error.message}`); + return { ready: false, enabled, errorMessage: error.message }; } - const elserModelStats = modelStats.trained_model_stats.find( + const modelStats = trainedModelStatsResponse.trained_model_stats.find( (stats) => stats.deployment_stats?.deployment_id === AI_ASSISTANT_KB_INFERENCE_ID ); - const deploymentState = elserModelStats?.deployment_stats?.state; - const allocationState = elserModelStats?.deployment_stats?.allocation_status?.state; - const allocationCount = - elserModelStats?.deployment_stats?.allocation_status?.allocation_count ?? 0; + const deploymentState = modelStats?.deployment_stats?.state; + const allocationState = modelStats?.deployment_stats?.allocation_status?.state; + const allocationCount = modelStats?.deployment_stats?.allocation_status?.allocation_count ?? 0; const ready = deploymentState === 'started' && allocationState === 'fully_allocated' && allocationCount > 0; @@ -147,3 +148,24 @@ export async function getElserModelStatus({ }, }; } + +export async function waitForKbModel({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + return pRetry( + async () => { + const { ready } = await getKbModelStatus({ esClient, logger, config }); + if (!ready) { + logger.debug('Knowledge base model is not yet ready. Retrying...'); + throw new Error('Knowledge base model is not yet ready'); + } + }, + { retries: 30, factor: 2, maxTimeout: 30_000 } + ); +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts index c886b5187ab95..324b7608086f9 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -23,7 +23,7 @@ import { getSpaceQuery } from '../util/get_space_query'; import { createInferenceEndpoint, deleteInferenceEndpoint, - getElserModelStatus, + getKbModelStatus, isInferenceEndpointMissingOrUnavailable, } from '../inference_endpoint'; import { recallFromSearchConnectors } from './recall_from_search_connectors'; @@ -32,8 +32,9 @@ import { ObservabilityAIAssistantConfig } from '../../config'; import { isKnowledgeBaseIndexWriteBlocked, isSemanticTextUnsupportedError, + reIndexKnowledgeBaseWithLock, } from './reindex_knowledge_base'; -import { scheduleKbSemanticTextMigrationTask } from '../task_manager_definitions/register_kb_semantic_text_migration_task'; +import { LockAcquisitionError } from '../distributed_lock_manager/lock_manager_client'; interface Dependencies { core: CoreSetup; @@ -443,23 +444,20 @@ export class KnowledgeBaseService { } if (isSemanticTextUnsupportedError(error)) { - this.dependencies.core - .getStartServices() - .then(([_, pluginsStart]) => { - return scheduleKbSemanticTextMigrationTask({ - taskManager: pluginsStart.taskManager, - logger: this.dependencies.logger, - runSoon: true, - }); - }) - .catch((e) => { - this.dependencies.logger.error( - `Failed to schedule knowledge base semantic text migration task: ${e}` - ); - }); + reIndexKnowledgeBaseWithLock({ + core: this.dependencies.core, + logger: this.dependencies.logger, + esClient: this.dependencies.esClient, + }).catch((e) => { + if (error instanceof LockAcquisitionError) { + this.dependencies.logger.debug(`Re-indexing operation is already in progress`); + return; + } + this.dependencies.logger.error(`Failed to re-index knowledge base: ${e.message}`); + }); throw serverUnavailable( - 'The knowledge base is currently being re-indexed. Please try again later' + `The index "${resourceNames.aliases.kb}" does not support semantic text and must be reindexed. This re-index operation has been scheduled and will be started automatically. Please try again later.` ); } @@ -491,7 +489,7 @@ export class KnowledgeBaseService { }; getStatus = async () => { - return getElserModelStatus({ + return getKbModelStatus({ esClient: this.dependencies.esClient, logger: this.dependencies.logger, config: this.dependencies.config, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts index ea8dd58517f18..a7c6b5d6391bd 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts @@ -8,44 +8,45 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; +import { CoreSetup } from '@kbn/core/server'; import { resourceNames } from '..'; -import { createKbConcreteIndex } from '../create_or_update_index_assets'; +import { createKbConcreteIndex } from '../startup_migrations/create_or_update_index_assets'; +import { LockManagerService } from '../distributed_lock_manager/lock_manager_service'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; -export async function reIndexKnowledgeBase({ +export const KB_REINDEXING_LOCK_ID = 'observability_ai_assistant:kb_reindexing'; +export async function reIndexKnowledgeBaseWithLock({ + core, logger, esClient, }: { + core: CoreSetup; logger: Logger; esClient: { asInternalUser: ElasticsearchClient; }; -}): Promise { +}): Promise { + const lmService = new LockManagerService(core, logger); + return lmService.withLock(KB_REINDEXING_LOCK_ID, () => + reIndexKnowledgeBase({ logger, esClient }) + ); +} + +async function reIndexKnowledgeBase({ + logger, + esClient, +}: { + logger: Logger; + esClient: { + asInternalUser: ElasticsearchClient; + }; +}): Promise { logger.debug('Initiating knowledge base re-indexing...'); try { const originalIndex = resourceNames.concreteIndexName.kb; const tempIndex = `${resourceNames.aliases.kb}-000002`; - const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({ - index: originalIndex, - }); - - const indexSettings = indexSettingsResponse[originalIndex].settings; - const createdVersion = parseInt(indexSettings?.index?.version?.created ?? '', 10); - - // Check if the index was created before version 8.11 - const versionThreshold = 8110000; // Version 8.11.0 - if (createdVersion >= versionThreshold) { - logger.debug( - `Knowledge base index "${originalIndex}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting` - ); - return; - } - - logger.info( - `Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...` - ); - // Create temporary index logger.debug(`Creating temporary index "${tempIndex}"...`); await esClient.asInternalUser.indices.delete({ index: tempIndex }, { ignore: [404] }); @@ -82,11 +83,10 @@ export async function reIndexKnowledgeBase({ logger.debug(`Deleting temporary index "${tempIndex}"...`); await esClient.asInternalUser.indices.delete({ index: tempIndex }); - logger.info( - 'Re-indexing knowledge base completed successfully. Semantic text field is now supported.' - ); + logger.info('Re-indexing knowledge base completed successfully'); + return true; } catch (error) { - throw new Error(`Failed to reindex knowledge base: ${error.message}`); + throw new Error(`Failed to re-index knowledge base: ${error.message}`); } } diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts similarity index 95% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts rename to x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts index 85a0e9ba8e42a..f416dd1d0292a 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/create_or_update_index_assets.ts @@ -7,10 +7,10 @@ import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server'; import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server'; -import type { ObservabilityAIAssistantPluginStartDependencies } from '../types'; -import { conversationComponentTemplate } from './conversation_component_template'; -import { kbComponentTemplate } from './kb_component_template'; -import { resourceNames } from '.'; +import type { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { conversationComponentTemplate } from '../conversation_component_template'; +import { kbComponentTemplate } from '../kb_component_template'; +import { resourceNames } from '..'; export async function updateExistingIndexAssets({ logger, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts new file mode 100644 index 0000000000000..692a9bdd431ab --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import pLimit from 'p-limit'; +import type { CoreSetup, Logger } from '@kbn/core/server'; +import { uniq } from 'lodash'; +import pRetry from 'p-retry'; +import { KnowledgeBaseEntry } from '../../../common'; +import { resourceNames } from '..'; +import { waitForKbModel } from '../inference_endpoint'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { ObservabilityAIAssistantConfig } from '../../config'; +import { reIndexKnowledgeBaseWithLock } from '../knowledge_base_service/reindex_knowledge_base'; +import { LockManagerService } from '../distributed_lock_manager/lock_manager_service'; +import { LockAcquisitionError } from '../distributed_lock_manager/lock_manager_client'; + +const PLUGIN_STARTUP_LOCK_ID = 'observability_ai_assistant:startup_migrations'; + +// This function populates the `semantic_text` field for knowledge base entries during the plugin's startup process. +// It ensures all missing fields are updated in batches and uses a distributed lock to prevent conflicts in distributed environments. +// If the knowledge base index does not support the `semantic_text` field, it is re-indexed. +export async function populateMissingSemanticTextFieldMigration({ + core, + logger, + config, +}: { + core: CoreSetup; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + const [coreStart] = await core.getStartServices(); + const esClient = coreStart.elasticsearch.client; + + const lmService = new LockManagerService(core, logger); + await lmService + .withLock(PLUGIN_STARTUP_LOCK_ID, async () => { + const hasKbIndex = await esClient.asInternalUser.indices.exists({ + index: resourceNames.aliases.kb, + }); + + if (!hasKbIndex) { + logger.debug('Knowledge base index does not exist. Aborting updating index assets'); + return; + } + + const isKbSemanticTextCompatible = await isKnowledgeBaseSemanticTextCompatible({ + logger, + esClient, + }); + + if (!isKbSemanticTextCompatible) { + await reIndexKnowledgeBaseWithLock({ core, logger, esClient }); + } + + await pRetry( + async () => populateMissingSemanticTextFieldRecursively({ esClient, logger, config }), + { retries: 5, minTimeout: 10_000 } + ); + }) + .catch((error) => { + if (!(error instanceof LockAcquisitionError)) { + throw error; + } + }); +} + +// Ensures that every doc has populated the `semantic_text` field. +// It retrieves entries without the field, updates them in batches, and continues until no entries remain. +async function populateMissingSemanticTextFieldRecursively({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + logger.debug( + 'Checking for remaining entries without semantic_text field that need to be migrated' + ); + + const response = await esClient.asInternalUser.search({ + size: 100, + track_total_hits: true, + index: [resourceNames.aliases.kb], + query: { + bool: { + must_not: { + exists: { + field: 'semantic_text', + }, + }, + }, + }, + _source: { + excludes: ['ml.tokens'], + }, + }); + + if (response.hits.hits.length === 0) { + logger.debug('No remaining entries to migrate'); + return; + } + + await waitForKbModel({ esClient, logger, config }); + + const indicesWithOutdatedEntries = uniq(response.hits.hits.map((hit) => hit._index)); + logger.debug( + `Found ${response.hits.hits.length} entries without semantic_text field in "${indicesWithOutdatedEntries}". Updating now...` + ); + + // Limit the number of concurrent requests to avoid overloading the cluster + const limiter = pLimit(20); + const promises = response.hits.hits.map((hit) => { + return limiter(() => { + if (!hit._source || !hit._id) { + return; + } + + return esClient.asInternalUser.update({ + refresh: 'wait_for', + index: resourceNames.aliases.kb, + id: hit._id, + doc: { + ...hit._source, + semantic_text: hit._source.text ?? 'No text', + }, + }); + }); + }); + + await Promise.all(promises); + logger.debug(`Updated ${promises.length} entries`); + + await sleep(100); + await populateMissingSemanticTextFieldRecursively({ esClient, logger, config }); +} + +async function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// Checks if the knowledge base index supports `semantic_text` +// If the index was created before version 8.11, it requires re-indexing to support the `semantic_text` field. +async function isKnowledgeBaseSemanticTextCompatible({ + logger, + esClient, +}: { + logger: Logger; + esClient: { asInternalUser: ElasticsearchClient }; +}): Promise { + const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({ + index: resourceNames.aliases.kb, + }); + + const results = Object.entries(indexSettingsResponse); + if (results.length === 0) { + logger.debug('No knowledge base indices found. Skipping re-indexing.'); + return true; + } + + const [indexName, { settings }] = results[0]; + const createdVersion = parseInt(settings?.index?.version?.created ?? '', 10); + + // Check if the index was created before version 8.11 + const versionThreshold = 8110000; // Version 8.11.0 + if (createdVersion >= versionThreshold) { + logger.debug( + `Knowledge base index "${indexName}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting` + ); + return true; + } + + logger.info( + `Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...` + ); + + return false; +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts deleted file mode 100644 index 4f83e61a67a4d..0000000000000 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import pLimit from 'p-limit'; -import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server'; -import type { CoreSetup, CoreStart, Logger } from '@kbn/core/server'; -import pRetry from 'p-retry'; -import { KnowledgeBaseEntry } from '../../../common'; -import { resourceNames } from '..'; -import { getElserModelStatus } from '../inference_endpoint'; -import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; -import { ObservabilityAIAssistantConfig } from '../../config'; -import { reIndexKnowledgeBase } from '../knowledge_base_service/reindex_knowledge_base'; - -const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id'; -const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration'; - -// This task will re-index all knowledge base entries without `semantic_text` field -// to ensure the field is populated with the correct embeddings. -// After the migration we will no longer need to use the `ml.tokens` field. -export async function registerAndScheduleKbSemanticTextMigrationTask({ - taskManager, - logger, - core, - config, - indexAssetsUpdatedPromise, -}: { - taskManager: TaskManagerSetupContract; - logger: Logger; - core: CoreSetup; - config: ObservabilityAIAssistantConfig; - indexAssetsUpdatedPromise: Promise; -}) { - const [coreStart, pluginsStart] = await core.getStartServices(); - - // register task - registerKbSemanticTextMigrationTask({ taskManager, logger, coreStart, config }); - - // wait for index assets to be updated - await indexAssetsUpdatedPromise; - - // schedule task - await scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }); -} - -function registerKbSemanticTextMigrationTask({ - taskManager, - logger, - coreStart, - config, -}: { - taskManager: TaskManagerSetupContract; - logger: Logger; - coreStart: CoreStart; - config: ObservabilityAIAssistantConfig; -}) { - try { - logger.debug(`Register task "${TASK_TYPE}"`); - taskManager.registerTaskDefinitions({ - [TASK_TYPE]: { - title: 'Add support for semantic_text in Knowledge Base', - description: `This task will reindex the knowledge base and populate the semantic_text fields for all entries without it.`, - timeout: '1h', - maxAttempts: 5, - createTaskRunner() { - return { - async run() { - logger.debug(`Run task: "${TASK_TYPE}"`); - const esClient = coreStart.elasticsearch.client; - - const hasKbIndex = await esClient.asInternalUser.indices.exists({ - index: resourceNames.aliases.kb, - }); - - if (!hasKbIndex) { - logger.debug('Knowledge base index does not exist. Skipping migration.'); - return; - } - - if (config.disableKbSemanticTextMigration) { - logger.info( - 'Semantic text migration is disabled via config "xpack.observabilityAIAssistant.disableKbSemanticTextMigration=true". Skipping migration.' - ); - return; - } - - await reIndexKnowledgeBaseAndPopulateSemanticTextField({ esClient, logger, config }); - }, - }; - }, - }, - }); - } catch (error) { - logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`); - } -} - -export async function scheduleKbSemanticTextMigrationTask({ - taskManager, - logger, - runSoon = false, -}: { - taskManager: ObservabilityAIAssistantPluginStartDependencies['taskManager']; - logger: Logger; - runSoon?: boolean; -}) { - logger.debug('Schedule migration task'); - await taskManager.ensureScheduled({ - id: TASK_ID, - taskType: TASK_TYPE, - scope: ['aiAssistant'], - params: {}, - state: {}, - }); - - if (runSoon) { - logger.debug('Run migration task soon'); - await taskManager.runSoon(TASK_ID); - } -} - -export async function reIndexKnowledgeBaseAndPopulateSemanticTextField({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - logger.debug('Starting migration...'); - - try { - await reIndexKnowledgeBase({ logger, esClient }); - await populateSemanticTextFieldRecursively({ esClient, logger, config }); - } catch (e) { - logger.error(`Migration failed: ${e.message}`); - } - - logger.debug('Migration succeeded'); -} - -async function populateSemanticTextFieldRecursively({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - logger.debug('Populating semantic_text field for entries without it'); - - const response = await esClient.asInternalUser.search({ - size: 100, - track_total_hits: true, - index: [resourceNames.aliases.kb], - query: { - bool: { - must_not: { - exists: { - field: 'semantic_text', - }, - }, - }, - }, - _source: { - excludes: ['ml.tokens'], - }, - }); - - if (response.hits.hits.length === 0) { - logger.debug('No remaining entries to migrate'); - return; - } - - logger.debug(`Found ${response.hits.hits.length} entries to migrate`); - - await waitForModel({ esClient, logger, config }); - - // Limit the number of concurrent requests to avoid overloading the cluster - const limiter = pLimit(10); - const promises = response.hits.hits.map((hit) => { - return limiter(() => { - if (!hit._source || !hit._id) { - return; - } - - return esClient.asInternalUser.update({ - refresh: 'wait_for', - index: resourceNames.aliases.kb, - id: hit._id, - body: { - doc: { - ...hit._source, - semantic_text: hit._source.text, - }, - }, - }); - }); - }); - - await Promise.all(promises); - - logger.debug(`Populated ${promises.length} entries`); - await populateSemanticTextFieldRecursively({ esClient, logger, config }); -} - -async function waitForModel({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - return pRetry( - async () => { - const { ready } = await getElserModelStatus({ esClient, logger, config }); - if (!ready) { - logger.debug('Elser model is not yet ready. Retrying...'); - throw new Error('Elser model is not yet ready'); - } - }, - { retries: 30, factor: 2, maxTimeout: 30_000 } - ); -} diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts index d49fda2556f69..c4ea8213f583d 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts @@ -27,6 +27,9 @@ export const REMOVED_TYPES: string[] = [ 'cleanup_failed_action_executions', 'reports:monitor', + + // deprecated in https://github.com/elastic/kibana/pull/216916 + 'obs-ai-assistant:knowledge-base-migration', ]; /** diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts index 29b326e34f847..c3f4e4607ca71 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/complete/functions/recall.spec.ts @@ -15,55 +15,10 @@ import { addSampleDocsToCustomIndex, setupKnowledgeBase, } from '../../utils/knowledge_base'; +import { animalSampleDocs, technicalSampleDocs } from '../../utils/sample_docs'; const customSearchConnectorIndex = 'animals_kb'; -const sampleDocsForInternalKb = [ - { - id: 'technical_db_outage_slow_queries', - title: 'Database Outage: Slow Query Execution', - text: 'At 03:15 AM UTC, the production database experienced a significant outage, leading to slow query execution and increased response times across multiple services. A surge in database load was detected, with 90% of queries exceeding 2 seconds. A detailed log analysis pointed to locking issues within the transaction queue and inefficient index usage.', - }, - { - id: 'technical_api_gateway_timeouts', - title: 'Service Timeout: API Gateway Bottleneck', - text: 'At 10:45 AM UTC, the API Gateway encountered a timeout issue, causing a 500 error for all incoming requests. Detailed traces indicated a significant bottleneck at the gateway level, where requests stalled while waiting for upstream service responses. The upstream service was overwhelmed due to a sudden spike in inbound traffic and failed to release resources promptly.', - }, - { - id: 'technical_cache_misses_thirdparty_api', - title: 'Cache Misses and Increased Latency: Third-Party API Failure', - text: 'At 04:30 PM UTC, a dramatic increase in cache misses and latency was observed. The failure of a third-party API prevented critical data from being cached, leading to unnecessary re-fetching of resources from external sources. This caused significant delays in response times, with up to 10-second delays in some key services.', - }, -]; - -const sampleDocsForCustomIndex = [ - { - id: 'animal_elephants_social_structure', - title: 'Elephants and Their Social Structure', - text: 'Elephants are highly social animals that live in matriarchal herds led by the oldest female. These animals communicate through low-frequency sounds, called infrasound, that travel long distances. They are known for their intelligence, strong memory, and deep emotional bonds with each other.', - }, - { - id: 'animal_cheetah_life_speed', - title: 'The Life of a Cheetah', - text: 'Cheetahs are the fastest land animals, capable of reaching speeds up to 60 miles per hour in short bursts. They rely on their speed to catch prey, such as gazelles. Unlike other big cats, cheetahs cannot roar, but they make distinctive chirping sounds, especially when communicating with their cubs.', - }, - { - id: 'animal_whale_migration_patterns', - title: 'Whales and Their Migration Patterns', - text: 'Whales are known for their long migration patterns, traveling thousands of miles between feeding and breeding grounds.', - }, - { - id: 'animal_giraffe_habitat_feeding', - title: 'Giraffes: Habitat and Feeding Habits', - text: 'Giraffes are the tallest land animals, with long necks that help them reach leaves high up in trees. They live in savannas and grasslands, where they feed on leaves, twigs, and fruits from acacia trees.', - }, - { - id: 'animal_penguin_antarctic_adaptations', - title: 'Penguins and Their Antarctic Adaptations', - text: 'Penguins are flightless birds that have adapted to life in the cold Antarctic environment. They have a thick layer of blubber to keep warm, and their wings have evolved into flippers for swimming in the icy waters.', - }, -]; - export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); const es = getService('es'); @@ -71,12 +26,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('recall', function () { before(async () => { await setupKnowledgeBase(getService); - await addSampleDocsToInternalKb(getService, sampleDocsForInternalKb); - await addSampleDocsToCustomIndex( - getService, - sampleDocsForCustomIndex, - customSearchConnectorIndex - ); + await addSampleDocsToInternalKb(getService, technicalSampleDocs); + await addSampleDocsToCustomIndex(getService, animalSampleDocs, customSearchConnectorIndex); }); after(async () => { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index d0054856f633a..be304d350f999 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -12,15 +12,19 @@ import { LockId, ensureTemplatesAndIndexCreated, LockManager, - withLock, LockDocument, LOCKS_CONCRETE_INDEX_NAME, + createLocksWriteIndex, + withLock, } from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/lock_manager_client'; +import nock from 'nock'; import { Client } from '@elastic/elasticsearch'; import { times } from 'lodash'; import { ToolingLog } from '@kbn/tooling-log'; +import pRetry from 'p-retry'; import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; import { getLoggerMock } from '../utils/logger'; +import { dateAsTimestamp, durationAsMs, sleep } from '../utils/time'; export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const es = getService('es'); @@ -28,195 +32,418 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const logger = getLoggerMock(log); describe('LockManager', function () { - this.tags(['failsOnMKI']); - before(async () => { - await clearAllLocks(es); - await ensureTemplatesAndIndexCreated(es); - }); + describe('Manual locking API', function () { + this.tags(['failsOnMKI']); + before(async () => { + await ensureTemplatesAndIndexCreated(es); + await createLocksWriteIndex(es); + await clearAllLocks(es); + }); - describe('Basic lock operations', () => { - let lockManager: LockManager; - const LOCK_ID = 'basic_lock_operations'; + describe('Basic lock operations', () => { + let lockManager: LockManager; + const LOCK_ID = 'basic_lock_operations'; - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); - }); + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + }); - afterEach(async () => { - await lockManager.release(); - }); + afterEach(async () => { + await lockManager.release(); + }); - it('acquires the lock when not held', async () => { - const acquired = await lockManager.acquire(); - expect(acquired).to.be(true); + it('acquires the lock when not held', async () => { + const acquired = await lockManager.acquire(); + expect(acquired).to.be(true); - const lock = await getLockById(es, LOCK_ID); - expect(true).to.be(true); + const lock = await getLockById(es, LOCK_ID); + expect(true).to.be(true); - expect(lock).not.to.be(undefined); - }); + expect(lock).not.to.be(undefined); + }); - it('stores and retrieves metadata', async () => { - const metadata = { foo: 'bar' }; - await lockManager.acquire({ metadata }); - const lock = await getLockById(es, LOCK_ID); - expect(lock?.metadata).to.eql(metadata); - }); + it('stores and retrieves metadata', async () => { + const metadata = { foo: 'bar' }; + await lockManager.acquire({ metadata }); + const lock = await getLockById(es, LOCK_ID); + expect(lock?.metadata).to.eql(metadata); + }); - it('releases the lock', async () => { - const acquired = await lockManager.acquire(); - expect(acquired).to.be(true); + it('releases the lock', async () => { + const acquired = await lockManager.acquire(); + expect(acquired).to.be(true); - await lockManager.release(); + await lockManager.release(); - const lock = await getLockById(es, LOCK_ID); - expect(lock).to.be(undefined); - }); + const lock = await getLockById(es, LOCK_ID); + expect(lock).to.be(undefined); + }); - it('it sets expiresAt according to the ttl', async () => { - await lockManager.acquire({ ttl: 8 * 60 * 1000 }); - const lock = await getLockById(es, LOCK_ID); - const ttl = new Date(lock!.expiresAt).getTime() - new Date(lock!.createdAt).getTime(); - expect(prettyMilliseconds(ttl)).to.be('8m'); - }); + it('it sets expiresAt according to the ttl', async () => { + await lockManager.acquire({ ttl: durationAsMs(8, 'minutes') }); + const lock = await getLockById(es, LOCK_ID); + const ttl = dateAsTimestamp(lock!.expiresAt) - dateAsTimestamp(lock!.createdAt); + expect(prettyMilliseconds(ttl)).to.be('8m'); + }); - it('does not throw when releasing a non-existent lock', async () => { - await lockManager.release(); - await lockManager.release(); - const lock = await getLockById(es, LOCK_ID); - expect(lock).to.be(undefined); + it('does not throw when releasing a non-existent lock', async () => { + await lockManager.release(); + await lockManager.release(); + const lock = await getLockById(es, LOCK_ID); + expect(lock).to.be(undefined); + }); }); - }); - describe('get', () => { - let lockManager: LockManager; - const LOCK_ID = 'my_lock_with_get'; + describe('when encountering network error the ES client retries the request', () => { + let lockManager: LockManager; + const LOCK_ID = 'es_client_retries_lock'; + let retryCounter = 0; - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); - }); + beforeEach(async () => { + retryCounter = 0; - afterEach(async () => { - await lockManager.release(); - }); + lockManager = new LockManager(LOCK_ID, es, logger); + }); - it('does not return expired locks', async () => { - await lockManager.acquire({ ttl: 500 }); - await sleep(1000); - const lock = await lockManager.get(); - expect(lock).to.be(undefined); + afterEach(async () => { + nock.cleanAll(); + await lockManager.release(); + }); - const lockRaw = await getLockById(es, LOCK_ID); - expect(lockRaw).to.not.be(undefined); - }); - }); + after(async () => { + nock.restore(); + }); - describe('Two LockManagers with different lockId', () => { - let manager1: LockManager; - let manager2: LockManager; + function addElasticsearchMock({ numberOfMocks }: { numberOfMocks: number }) { + nock(/localhost:9220/, { allowUnmocked: true }) + .filteringRequestBody(() => '*') + .post(`/${LOCKS_CONCRETE_INDEX_NAME}/_update/${LOCK_ID}`) + .times(numberOfMocks) + .reply((uri, requestBody, cb) => { + log.debug(`Returning mock error for ${uri}`); + retryCounter++; + setTimeout(() => { + cb(null, [503, 'Service Unavailable']); + }, 100); + }); + } - beforeEach(async () => { - manager1 = new LockManager('my_lock_id' as LockId, es, logger); - manager2 = new LockManager('my_other_lock_id', es, logger); - }); + it('eventually succeeds', async () => { + addElasticsearchMock({ numberOfMocks: 3 }); - afterEach(async () => { - await manager1.release(); - await manager2.release(); - }); + const acquired = await lockManager.acquire(); - it('does not interfere between separate locks', async () => { - const acquired1 = await manager1.acquire(); - const acquired2 = await manager2.acquire(); - expect(acquired1).to.be(true); - expect(acquired2).to.be(true); - }); - }); + expect(acquired).to.be(true); + expect(retryCounter).to.be(3); + }); - describe('Two LockManagers with identical lockId', () => { - let manager1: LockManager; - let manager2: LockManager; + it('eventually fails', async () => { + addElasticsearchMock({ numberOfMocks: 4 }); - const LOCK_ID = 'my_lock'; + const acquired = await lockManager.acquire(); - beforeEach(async () => { - manager1 = new LockManager(LOCK_ID, es, logger); - manager2 = new LockManager(LOCK_ID, es, logger); + expect(acquired).to.be(false); + expect(retryCounter).to.be(4); + }); }); - afterEach(async () => { - await manager1.release(); - await manager2.release(); + describe('get', () => { + let lockManager: LockManager; + const LOCK_ID = 'my_lock_with_get'; + + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await lockManager.release(); + }); + + it('does not return expired locks', async () => { + await lockManager.acquire({ ttl: 500 }); + await sleep(1000); + const lock = await lockManager.get(); + expect(lock).to.be(undefined); + + const lockRaw = await getLockById(es, LOCK_ID); + expect(lockRaw).to.not.be(undefined); + }); }); - it('does not acquire the lock if already held', async () => { - const acquired1 = await manager1.acquire({ metadata: { attempt: 'one' } }); - expect(acquired1).to.be(true); + describe('Two LockManagers with different lockId', () => { + let manager1: LockManager; + let manager2: LockManager; + + beforeEach(async () => { + manager1 = new LockManager('my_lock_id' as LockId, es, logger); + manager2 = new LockManager('my_other_lock_id', es, logger); + }); - const acquired2 = await manager2.acquire({ metadata: { attempt: 'two' } }); - expect(acquired2).to.be(false); + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); - const lock = await getLockById(es, LOCK_ID); - expect(lock?.metadata).to.eql({ attempt: 'one' }); + it('does not interfere between separate locks', async () => { + const acquired1 = await manager1.acquire(); + const acquired2 = await manager2.acquire(); + expect(acquired1).to.be(true); + expect(acquired2).to.be(true); + }); }); - it('allows re-acquisition after expiration', async () => { - // Acquire with a very short TTL. - const acquired = await manager1.acquire({ ttl: 500, metadata: { attempt: 'one' } }); - expect(acquired).to.be(true); + describe('Two LockManagers with identical lockId', () => { + let manager1: LockManager; + let manager2: LockManager; - await sleep(1000); // wait for lock to expire + const LOCK_ID = 'my_lock'; - const reacquired = await manager2.acquire({ metadata: { attempt: 'two' } }); - expect(reacquired).to.be(true); + beforeEach(async () => { + manager1 = new LockManager(LOCK_ID, es, logger); + manager2 = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); + + it('does not acquire the lock if already held', async () => { + const acquired1 = await manager1.acquire({ metadata: { attempt: 'one' } }); + expect(acquired1).to.be(true); + + const acquired2 = await manager2.acquire({ metadata: { attempt: 'two' } }); + expect(acquired2).to.be(false); + + const lock = await getLockById(es, LOCK_ID); + expect(lock?.metadata).to.eql({ attempt: 'one' }); + }); + + it('allows re-acquisition after expiration', async () => { + // Acquire with a very short TTL. + const acquired = await manager1.acquire({ ttl: 500, metadata: { attempt: 'one' } }); + expect(acquired).to.be(true); + + await sleep(1000); // wait for lock to expire + + const reacquired = await manager2.acquire({ metadata: { attempt: 'two' } }); + expect(reacquired).to.be(true); + }); }); - }); - describe('acquireWithRetry', () => { - let blockingManager: LockManager; - let waitingManager: LockManager; + describe('when waiting for lock to be available using pRetry', () => { + let blockingManager: LockManager; + let waitingManager: LockManager; + + const RETRY_LOCK_ID = 'my_lock_with_retry'; - const RETRY_LOCK_ID = 'my_lock_with_retry'; + beforeEach(async () => { + blockingManager = new LockManager(RETRY_LOCK_ID, es, logger); + waitingManager = new LockManager(RETRY_LOCK_ID, es, logger); + await blockingManager.release(); + await waitingManager.release(); + }); + + afterEach(async () => { + await blockingManager.release(); + await waitingManager.release(); + }); + + it('eventually acquires the lock when it becomes available', async () => { + const acquired = await blockingManager.acquire(); + expect(acquired).to.be(true); + + const waitPromise = pRetry(async () => { + const hasLock = await waitingManager.acquire(); + if (!hasLock) { + throw new Error(`Lock "${RETRY_LOCK_ID}" not available yet`); + } + return hasLock; + }); + await sleep(100); + await blockingManager.release(); + + const waitResult = await waitPromise; + expect(waitResult).to.be(true); + }); + + it('throws an error when the retry times out', async () => { + await blockingManager.acquire(); - beforeEach(async () => { - blockingManager = new LockManager(RETRY_LOCK_ID, es, logger); - waitingManager = new LockManager(RETRY_LOCK_ID, es, logger); - await blockingManager.release(); - await waitingManager.release(); + let error: Error | undefined; + try { + await pRetry( + async () => { + const hasLock = await waitingManager.acquire(); + if (!hasLock) { + throw new Error(`Lock "${RETRY_LOCK_ID}" not available yet`); + } + return hasLock; + }, + { minTimeout: 100, maxTimeout: 100, retries: 2 } + ); + } catch (err) { + error = err; + } + expect(error?.message).to.contain('Lock "my_lock_with_retry" not available yet'); + await blockingManager.release(); + }); }); - afterEach(async () => { - await blockingManager.release(); - await waitingManager.release(); + describe('extendTtl', () => { + let lockManager: LockManager; + const ttl = 1000; + + const LOCK_ID = 'my_lock_extend_ttl'; + + beforeEach(async () => { + lockManager = new LockManager(LOCK_ID, es, logger); + await lockManager.acquire({ ttl }); + }); + + afterEach(async () => { + await lockManager.release(); + }); + + it('has initial `expiresAt` value', async () => { + const lock = (await getLockById(es, LOCK_ID))!; + expect(dateAsTimestamp(lock.expiresAt)).to.be(dateAsTimestamp(lock.createdAt) + ttl); + }); + + it('update `expiresAt` to be greater than before', async () => { + const lockBeforeExtending = (await getLockById(es, LOCK_ID))!; + const res = await lockManager.extendTtl(2000); + expect(res).to.be(true); + + const lockAfterExtension = (await getLockById(es, LOCK_ID))!; + expect(dateAsTimestamp(lockAfterExtension.expiresAt)).to.be.greaterThan( + dateAsTimestamp(lockBeforeExtending.expiresAt) + ); + }); + + it('does not extend lock if already released', async () => { + await lockManager.release(); + const res = await lockManager.extendTtl(2000); + expect(res).to.be(false); + }); }); - it('eventually acquires the lock when it becomes available', async () => { - const acquired = await blockingManager.acquire(); - expect(acquired).to.be(true); + describe('Concurrency and race conditions', () => { + const LOCK_ID = 'my_lock_with_concurrency'; + + it('should allow only one lock acquisition among many concurrent attempts', async () => { + const lockManagers = await Promise.all( + times(20).map(() => new LockManager(LOCK_ID, es, logger)) + ); + + const acquireAttempts = await Promise.all(lockManagers.map((lm) => lm.acquire())); + const releaseAttempts = await Promise.all(lockManagers.map((lm) => lm.release())); + + expect(acquireAttempts.filter((v) => v === true)).to.have.length(1); + expect(releaseAttempts.filter((v) => v === true)).to.have.length(1); + }); + + it('should handle concurrent release and acquisition without race conditions', async () => { + const initialManager = new LockManager(LOCK_ID, es, logger); + const acquired = await initialManager.acquire(); + expect(acquired).to.be(true); - const waitPromise = waitingManager.acquireWithRetry({ minTimeout: 50, maxTimeout: 50 }); - await sleep(100); - await blockingManager.release(); + const attempts = await Promise.all( + times(20).map(async () => { + const releaseResult = new LockManager(LOCK_ID, es, logger).release(); + const acquireResult = new LockManager(LOCK_ID, es, logger).acquire(); - const waitResult = await waitPromise; - expect(waitResult).to.be(true); + const [release, acquire] = await Promise.all([releaseResult, acquireResult]); + return { release, acquire }; + }) + ); + + expect(attempts.filter((r) => r.acquire === true)).to.have.length(0); + expect(attempts.filter((r) => r.release === true)).to.have.length(0); + + // Finally, confirm that the lock still exists + const lock = await getLockById(es, LOCK_ID); + expect(lock).not.to.be(undefined); + + // cleanup + await initialManager.release(); + }); }); - it('throws an Error when the wait times out', async () => { - await blockingManager.acquire(); + describe('Token fencing', () => { + let manager1: LockManager; + let manager2: LockManager; - let error: Error | undefined; - try { - await waitingManager.acquireWithRetry({ minTimeout: 100, maxRetryTime: 100, retries: 2 }); - } catch (err) { - error = err; - } - expect(error?.message).to.contain('Lock "my_lock_with_retry" not available yet'); - await blockingManager.release(); + const LOCK_ID = 'my_lock_with_token_fencing'; + + beforeEach(async () => { + manager1 = new LockManager(LOCK_ID, es, logger); + manager2 = new LockManager(LOCK_ID, es, logger); + }); + + afterEach(async () => { + await manager1.release(); + await manager2.release(); + }); + + it('should not release the lock if the token does not match', async () => { + const acquired = await manager1.acquire(); + expect(acquired).to.be(true); + + // Simulate an interfering update that changes the token. + // (We do this by issuing an update directly to Elasticsearch.) + const newToken = uuid(); + await es.update({ + index: LOCKS_CONCRETE_INDEX_NAME, + id: LOCK_ID, + doc: { token: newToken }, + refresh: true, + }); + log.debug(`Updated lock token to: ${newToken}`); + + // Now manager1 still holds its old token. + // Its call to release() should find no document with its token and return false. + const releaseResult = await manager1.release(); + expect(releaseResult).to.be(false); + + // Verify that the lock still exists and now carries the interfering token. + const lock = await getLockById(es, LOCK_ID); + expect(lock).not.to.be(undefined); + expect(lock!.token).to.be(newToken); + + // cleanup + await clearAllLocks(es); + }); + + it('should use a fresh token on subsequent acquisitions', async () => { + const acquired1 = await manager1.acquire(); + expect(acquired1).to.be(true); + + // Get the current token. + const firstLock = await getLockById(es, LOCK_ID); + + // Release the lock. + const released = await manager1.release(); + expect(released).to.be(true); + + // Re-acquire the lock. + const acquired2 = await manager2.acquire(); + expect(acquired2).to.be(true); + + const secondLock = await getLockById(es, LOCK_ID); + expect(secondLock!.token).not.to.be(firstLock!.token); + }); }); }); - describe('withLock', () => { + describe('withLock API', function () { + this.tags(['failsOnMKI']); + before(async () => { + await ensureTemplatesAndIndexCreated(es); + await createLocksWriteIndex(es); + await clearAllLocks(es); + }); + const LOCK_ID = 'my_lock_with_lock'; describe('Successful execution and concurrent calls', () => { @@ -294,43 +521,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon await preAcquirer.release(); }); }); - }); - - describe('TTL extension', () => { - let lockManager: LockManager; - - const LOCK_ID = 'my_lock_with_ttl_extension'; - - beforeEach(async () => { - lockManager = new LockManager(LOCK_ID, es, logger); - }); - - describe('when the lock is manually handled', () => { - afterEach(async () => { - await lockManager.release(); - }); - - it('should extend the TTL when `extendTtl` is called', async () => { - // Acquire the lock with a very short TTL (e.g. 1 second). - const acquired = await lockManager.acquire({ ttl: 1000 }); - expect(acquired).to.be(true); - - const lockExpiryBefore = (await getLockById(es, LOCK_ID))!.expiresAt; - - // Extend the TTL - const extended = await lockManager.extendTtl(); - expect(extended).to.be(true); - - const lockExpiryAfter = (await getLockById(es, LOCK_ID))!.expiresAt; - - // Verify that the new expiration is later than before. - expect(new Date(lockExpiryAfter).getTime()).to.be.greaterThan( - new Date(lockExpiryBefore).getTime() - ); - }); - }); - describe('withLock', () => { + describe('Extending TTL', () => { let lockExpiryBefore: string | undefined; let lockExpiryAfter: string | undefined; let result: string | undefined; @@ -338,9 +530,9 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon before(async () => { // Use a very short TTL (1 second) so that without extension the lock would expire. // The withLock helper extends the TTL periodically. - result = await withLock({ lockId: LOCK_ID, esClient: es, logger, ttl: 100 }, async () => { + result = await withLock({ lockId: LOCK_ID, esClient: es, logger, ttl: 500 }, async () => { lockExpiryBefore = (await getLockById(es, LOCK_ID))?.expiresAt; - await sleep(500); // Simulate a long-running operation + await sleep(600); // Simulate a long-running operation lockExpiryAfter = (await getLockById(es, LOCK_ID))?.expiresAt; return 'done'; }); @@ -355,8 +547,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(lockExpiryAfter).not.to.be(undefined); // Verify that the new expiration is later than before. - expect(new Date(lockExpiryAfter!).getTime()).to.be.greaterThan( - new Date(lockExpiryBefore!).getTime() + expect(dateAsTimestamp(lockExpiryAfter!)).to.be.greaterThan( + dateAsTimestamp(lockExpiryBefore!) ); }); @@ -367,121 +559,92 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(lock).to.be(undefined); }); }); - }); - - describe('Concurrency and race conditions', () => { - const LOCK_ID = 'my_lock_with_concurrency'; - - it('should allow only one lock acquisition among many concurrent attempts', async () => { - const lockManagers = await Promise.all( - times(20).map(() => new LockManager(LOCK_ID, es, logger)) - ); - - const acquireAttempts = await Promise.all(lockManagers.map((lm) => lm.acquire())); - const releaseAttempts = await Promise.all(lockManagers.map((lm) => lm.release())); - expect(acquireAttempts.filter((v) => v === true)).to.have.length(1); - expect(releaseAttempts.filter((v) => v === true)).to.have.length(1); - }); - - it('should handle concurrent release and acquisition without race conditions', async () => { - const initialManager = new LockManager(LOCK_ID, es, logger); - const acquired = await initialManager.acquire(); - expect(acquired).to.be(true); - - const attempts = await Promise.all( - times(20).map(async () => { - const releaseResult = new LockManager(LOCK_ID, es, logger).release(); - const acquireResult = new LockManager(LOCK_ID, es, logger).acquire(); - - const [release, acquire] = await Promise.all([releaseResult, acquireResult]); - return { release, acquire }; - }) - ); + describe('when waiting for lock to be available using pRetry and it times out', () => { + const RETRY_LOCK_ID = 'my_lock_with_retry'; + let retries = 0; + let error: Error | undefined; + let lm: LockManager; - expect(attempts.filter((r) => r.acquire === true)).to.have.length(0); - expect(attempts.filter((r) => r.release === true)).to.have.length(0); + before(async () => { + lm = new LockManager(RETRY_LOCK_ID, es, logger); + const acquired = await lm.acquire(); + expect(acquired).to.be(true); - // Finally, confirm that the lock still exists - const lock = await getLockById(es, LOCK_ID); - expect(lock).not.to.be(undefined); + try { + await pRetry( + async () => { + retries++; + await withLock( + { lockId: RETRY_LOCK_ID, esClient: es, logger }, + async () => 'should not execute' + ); + }, + { minTimeout: 50, maxTimeout: 50, retries: 2 } + ); + } catch (err) { + error = err; + } + }); - // cleanup - await initialManager.release(); - }); - }); + after(async () => { + await lm.release(); + }); - describe('Token fencing', () => { - let manager1: LockManager; - let manager2: LockManager; + it('invokes withLock 3 times', async () => { + expect(retries).to.be(3); + }); - const LOCK_ID = 'my_lock_with_token_fencing'; + it('throws a LockAcquisitionError', () => { + expect(error?.name).to.be('LockAcquisitionError'); + }); - beforeEach(async () => { - manager1 = new LockManager(LOCK_ID, es, logger); - manager2 = new LockManager(LOCK_ID, es, logger); + it('throws a LockAcquisitionError with a message', () => { + expect(error?.message).to.contain(`Lock "${RETRY_LOCK_ID}" not acquired`); + }); }); - afterEach(async () => { - await manager1.release(); - await manager2.release(); - }); + describe('when waiting for lock to be available using pRetry and does not time out', () => { + const RETRY_LOCK_ID = 'my_lock_with_retry'; + let retries = 0; + let res: string | undefined; - it('should not release the lock if the token does not match', async () => { - const acquired = await manager1.acquire(); - expect(acquired).to.be(true); + before(async () => { + const lm = new LockManager(RETRY_LOCK_ID, es, logger); + const acquired = await lm.acquire(); + expect(acquired).to.be(true); - // Simulate an interfering update that changes the token. - // (We do this by issuing an update directly to Elasticsearch.) - const newToken = uuid(); - await es.update({ - index: LOCKS_CONCRETE_INDEX_NAME, - id: LOCK_ID, - doc: { token: newToken }, - refresh: true, + setTimeout(() => lm.release(), 100); + + await pRetry( + async () => { + retries++; + res = await withLock( + { lockId: RETRY_LOCK_ID, esClient: es, logger }, + async () => 'should execute' + ); + }, + { minTimeout: 50, maxTimeout: 50, retries: 5 } + ); }); - log.debug(`Updated lock token to: ${newToken}`); - // Now manager1 still holds its old token. - // Its call to release() should find no document with its token and return false. - const releaseResult = await manager1.release(); - expect(releaseResult).to.be(false); - - // Verify that the lock still exists and now carries the interfering token. - const lock = await getLockById(es, LOCK_ID); - expect(lock).not.to.be(undefined); - expect(lock!.token).to.be(newToken); - - // cleanup - await clearAllLocks(es); - }); - - it('should use a fresh token on subsequent acquisitions', async () => { - const acquired1 = await manager1.acquire(); - expect(acquired1).to.be(true); - - // Get the current token. - const firstLock = await getLockById(es, LOCK_ID); - - // Release the lock. - const released = await manager1.release(); - expect(released).to.be(true); + it('retries calling withLock multiple times', async () => { + expect(retries).to.be.greaterThan(1); + }); - // Re-acquire the lock. - const acquired2 = await manager2.acquire(); - expect(acquired2).to.be(true); + it('returns the result', () => { + expect(res).to.be('should execute'); + }); - const secondLock = await getLockById(es, LOCK_ID); - expect(secondLock!.token).not.to.be(firstLock!.token); + it('releases the lock', async () => { + const lock = await getLockById(es, RETRY_LOCK_ID); + expect(lock).to.be(undefined); + }); }); }); }); } -function sleep(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - function clearAllLocks(es: Client) { return es.deleteByQuery( { @@ -520,15 +683,10 @@ async function outputLocks(es: Client, log: ToolingLog, name?: string) { } async function getLockById(esClient: Client, lockId: LockId): Promise { - const res = await esClient.search( - { - index: LOCKS_CONCRETE_INDEX_NAME, - query: { - bool: { filter: [{ term: { _id: lockId } }] }, - }, - }, + const res = await esClient.get( + { index: LOCKS_CONCRETE_INDEX_NAME, id: lockId }, { ignore: [404] } ); - return res.hits.hits[0]?._source; + return res._source; } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts index ad0bbb46869da..d7f318d9244b7 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts @@ -28,9 +28,16 @@ export default function aiAssistantApiIntegrationTests({ loadTestFile(require.resolve('./public_complete/public_complete.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_setup.spec.ts')); loadTestFile( - require.resolve('./knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts') + require.resolve( + './knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts' + ) ); - loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex.spec.ts')); + loadTestFile( + require.resolve( + './knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts' + ) + ); + loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex_concurrency.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_status.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_user_instructions.spec.ts')); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts similarity index 94% rename from x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts rename to x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts index 5e71ab748b9d9..94c0fcccef5fa 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_and_populate_missing_semantic_text_fields.spec.ts @@ -63,7 +63,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon return res.hits.hits; } - describe('When there are knowledge base entries (from 8.15 or earlier) that does not contain semantic_text embeddings', function () { + describe('when the knowledge base index was created before 8.15', function () { // Intentionally skipped in all serverless environnments (local and MKI) // because the migration scenario being tested is not relevant to MKI and Serverless. this.tags(['skipServerless']); @@ -92,7 +92,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('after migrating', () => { before(async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', }); expect(status).to.be(200); }); @@ -131,7 +132,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon it('returns entries correctly via API', async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: + 'POST /internal/observability_ai_assistant/kb/migrations/populate_missing_semantic_text_field', }); expect(status).to.be(200); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts new file mode 100644 index 0000000000000..9051b00261d28 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_concurrency.spec.ts @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { times } from 'lodash'; +import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; +import { + deleteKnowledgeBaseModel, + setupKnowledgeBase, + deleteKbIndices, + addSampleDocsToInternalKb, +} from '../utils/knowledge_base'; +import { createOrUpdateIndexAssets } from '../utils/index_assets'; +import { animalSampleDocs } from '../utils/sample_docs'; + +export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { + const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); + const es = getService('es'); + + describe('POST /internal/observability_ai_assistant/kb/reindex', function () { + // Intentionally skipped in all serverless environnments (local and MKI) + // because the migration scenario being tested is not relevant to MKI and Serverless. + this.tags(['skipServerless']); + + before(async () => { + await deleteKbIndices(es); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + await setupKnowledgeBase(getService); + }); + + after(async () => { + await deleteKbIndices(es); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + await deleteKnowledgeBaseModel(getService); + }); + + describe('when running multiple re-index operations in parallel', () => { + let results: Array<{ + status: number; + errorMessage: string | undefined; + }>; + + before(async () => { + await addSampleDocsToInternalKb(getService, animalSampleDocs); + + results = await Promise.all(times(20).map(() => reIndexKnowledgeBase())); + }); + + it('makes 20 requests to the reindex endpoint', async () => { + expect(results).to.have.length(20); + }); + + it('only one request should succeed', async () => { + const successResults = results.filter((result) => result.status === 200); + expect(successResults).to.have.length(1); + }); + + it('should fail every request but 1', async () => { + const failures = results.filter((result) => result.status !== 200); + expect(failures).to.have.length(19); + }); + + it('throw a LockAcquisitionException for the failing requests', async () => { + const failures = results.filter((result) => result.status === 500); + const errorMessages = failures.every( + (result) => + result.errorMessage === 'Lock "observability_ai_assistant:kb_reindexing" not acquired' + ); + + expect(errorMessages).to.be(true); + }); + }); + + describe('when running multiple re-index operations in sequence', () => { + let results: Array<{ status: number; result: boolean; errorMessage: string | undefined }>; + + before(async () => { + results = []; + for (const _ of times(20)) { + results.push(await reIndexKnowledgeBase()); + } + }); + + it('makes 20 requests', async () => { + expect(results).to.have.length(20); + }); + + it('every re-index operation succeeds', async () => { + const successResults = results.filter((result) => result.status === 200); + expect(successResults).to.have.length(20); + expect(successResults.every((r) => r.result === true)).to.be(true); + }); + + it('no requests should fail', async () => { + const failures = results.filter((result) => result.status !== 200); + expect(failures).to.have.length(0); + }); + }); + }); + + async function reIndexKnowledgeBase() { + const res = await observabilityAIAssistantAPIClient.editor({ + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', + }); + + return { + status: res.status, + result: res.body.result, + errorMessage: 'message' in res.body ? (res.body.message as string) : undefined, + }; + } +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts similarity index 75% rename from x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts rename to x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts index d172da9970abb..e9afa6c29fe8f 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex_to_fix_sparse_vector_support.spec.ts @@ -11,7 +11,12 @@ import AdmZip from 'adm-zip'; import path from 'path'; import { AI_ASSISTANT_SNAPSHOT_REPO_PATH } from '../../../../default_configs/stateful.config.base'; import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; -import { deleteKnowledgeBaseModel, setupKnowledgeBase } from '../utils/knowledge_base'; +import { + deleteKbIndices, + deleteKnowledgeBaseModel, + setupKnowledgeBase, +} from '../utils/knowledge_base'; +import { createOrUpdateIndexAssets, restoreIndexAssets } from '../utils/index_assets'; export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); @@ -25,22 +30,17 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon this.tags(['skipServerless']); before(async () => { - const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`; - log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`); - new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true); - + await unZipKbSnapshot(); await setupKnowledgeBase(getService); }); beforeEach(async () => { - await deleteKbIndex(); await restoreKbSnapshot(); - await createOrUpdateIndexAssets(); + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); }); after(async () => { - await deleteKbIndex(); - await createOrUpdateIndexAssets(); + await restoreIndexAssets(observabilityAIAssistantAPIClient, es); await deleteKnowledgeBaseModel(getService); }); @@ -68,14 +68,14 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon // @ts-expect-error expect(body.message).to.eql( - 'The knowledge base is currently being re-indexed. Please try again later' + 'The index ".kibana-observability-ai-assistant-kb" does not support semantic text and must be reindexed. This re-index operation has been scheduled and will be started automatically. Please try again later.' ); expect(status).to.be(503); }); it('can add new entries after re-indexing', async () => { - await runKbSemanticTextMigration(); + await reIndexKnowledgeBase(); await retry.try(async () => { const { status } = await createKnowledgeBaseEntry(); @@ -93,16 +93,15 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon return parseInt(settings?.index?.version?.created ?? '', 10); } - async function deleteKbIndex() { - log.debug('Deleting KB index'); - - await es.indices.delete( - { index: resourceNames.concreteIndexName.kb, ignore_unavailable: true }, - { ignore: [404] } - ); + async function unZipKbSnapshot() { + const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`; + log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`); + new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true); } async function restoreKbSnapshot() { + await deleteKbIndices(es); + log.debug( `Restoring snapshot of ${resourceNames.concreteIndexName.kb} from ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}` ); @@ -128,16 +127,9 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon await es.snapshot.deleteRepository({ name: snapshotRepoName }); } - async function createOrUpdateIndexAssets() { - const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/index_assets', - }); - expect(status).to.be(200); - } - - async function runKbSemanticTextMigration() { + async function reIndexKnowledgeBase() { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + endpoint: 'POST /internal/observability_ai_assistant/kb/reindex', }); expect(status).to.be(200); } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts new file mode 100644 index 0000000000000..892b99079bc98 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/sample_docs.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const animalSampleDocs = [ + { + id: 'animal_elephants_social_structure', + title: 'Elephants and Their Social Structure', + text: 'Elephants are highly social animals that live in matriarchal herds led by the oldest female. These animals communicate through low-frequency sounds, called infrasound, that travel long distances. They are known for their intelligence, strong memory, and deep emotional bonds with each other.', + }, + { + id: 'animal_cheetah_life_speed', + title: 'The Life of a Cheetah', + text: 'Cheetahs are the fastest land animals, capable of reaching speeds up to 60 miles per hour in short bursts. They rely on their speed to catch prey, such as gazelles. Unlike other big cats, cheetahs cannot roar, but they make distinctive chirping sounds, especially when communicating with their cubs.', + }, + { + id: 'animal_whale_migration_patterns', + title: 'Whales and Their Migration Patterns', + text: 'Whales are known for their long migration patterns, traveling thousands of miles between feeding and breeding grounds.', + }, + { + id: 'animal_giraffe_habitat_feeding', + title: 'Giraffes: Habitat and Feeding Habits', + text: 'Giraffes are the tallest land animals, with long necks that help them reach leaves high up in trees. They live in savannas and grasslands, where they feed on leaves, twigs, and fruits from acacia trees.', + }, + { + id: 'animal_penguin_antarctic_adaptations', + title: 'Penguins and Their Antarctic Adaptations', + text: 'Penguins are flightless birds that have adapted to life in the cold Antarctic environment. They have a thick layer of blubber to keep warm, and their wings have evolved into flippers for swimming in the icy waters.', + }, +]; + +export const technicalSampleDocs = [ + { + id: 'technical_db_outage_slow_queries', + title: 'Database Outage: Slow Query Execution', + text: 'At 03:15 AM UTC, the production database experienced a significant outage, leading to slow query execution and increased response times across multiple services. A surge in database load was detected, with 90% of queries exceeding 2 seconds. A detailed log analysis pointed to locking issues within the transaction queue and inefficient index usage.', + }, + { + id: 'technical_api_gateway_timeouts', + title: 'Service Timeout: API Gateway Bottleneck', + text: 'At 10:45 AM UTC, the API Gateway encountered a timeout issue, causing a 500 error for all incoming requests. Detailed traces indicated a significant bottleneck at the gateway level, where requests stalled while waiting for upstream service responses. The upstream service was overwhelmed due to a sudden spike in inbound traffic and failed to release resources promptly.', + }, + { + id: 'technical_cache_misses_thirdparty_api', + title: 'Cache Misses and Increased Latency: Third-Party API Failure', + text: 'At 04:30 PM UTC, a dramatic increase in cache misses and latency was observed. The failure of a third-party API prevented critical data from being cached, leading to unnecessary re-fetching of resources from external sources. This caused significant delays in response times, with up to 10-second delays in some key services.', + }, +]; diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts new file mode 100644 index 0000000000000..d91b60d9f991a --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/time.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { duration, unitOfTime } from 'moment'; + +export function durationAsMs(value: number, unit: unitOfTime.DurationConstructor) { + return duration(value, unit).asMilliseconds(); +} + +export function dateAsTimestamp(value: string) { + return new Date(value).getTime(); +} + +export function sleep(value: number) { + return new Promise((resolve) => setTimeout(resolve, value)); +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index b5b3c7e652bf9..e8e6f5bf016ce 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -159,7 +159,6 @@ export default function ({ getService }: FtrProviderContext) { 'fleet:upgrade-agentless-deployments-task', 'fleet:upgrade_action:retry', 'logs-data-telemetry', - 'obs-ai-assistant:knowledge-base-migration', 'osquery:telemetry-configs', 'osquery:telemetry-packs', 'osquery:telemetry-saved-queries', From 2a2ea06b16e31bacdb026386d6bd9cdd56e0083a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Thu, 24 Apr 2025 01:34:28 +0200 Subject: [PATCH 3/7] [LockManager] Ensure index template are created (#218901) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes: https://github.com/elastic/kibana/issues/218944 The index template for the Lock Manager was not created, causing index mappings and settings to be incorrect. Root cause: the function responsible for creating the index template (`ensureTemplatesAndIndexCreated`) was never invoked - only during automated testing 🤦 The mappings for the lock manager index (`.kibana_locks-000001`) should match this: ```ts { mappings: { dynamic: false, properties: { token: { type: 'keyword' }, metadata: { enabled: false }, createdAt: { type: 'date' }, expiresAt: { type: 'date' }, }, }, } ``` In this test we make sure that the LockManager library can recover and fix the mappings if the existing index has invalid mappings ``` DELETE .kibana_locks-000001 DELETE _index_template/.kibana_locks-index-template DELETE _component_template/.kibana_locks-component ``` correct mappings ``` PUT .kibana_locks-000001 ``` (Restart Kibana) ``` GET .kibana_locks-000001/_mapping ``` In this test we make sure that out of the box, the LockManager library creates an index with the correct mappings ``` DELETE .kibana_locks-000001 DELETE _index_template/.kibana_locks-index-template DELETE _component_template/.kibana_locks-component ``` (Restart Kibana) ``` GET .kibana_locks-000001/_mapping ``` Related: https://github.com/elastic/kibana/pull/216916 https://github.com/elastic/kibana/pull/216397 --------- Co-authored-by: Viduni Wickramarachchi (cherry picked from commit f684ea4071dfb5e6acc4cb057b46acbf915943d7) --- .../lock_manager_client.ts | 53 ++----- .../setup_lock_manager_index.ts | 97 +++++++++++++ .../distributed_lock_manager.spec.ts | 135 ++++++++++++++++-- 3 files changed, 231 insertions(+), 54 deletions(-) create mode 100644 x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts index 4d19684ee2f7c..95f69c9c02358 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts @@ -13,9 +13,7 @@ import prettyMilliseconds from 'pretty-ms'; import { once } from 'lodash'; import { duration } from 'moment'; import { ElasticsearchClient } from '@kbn/core/server'; - -export const LOCKS_INDEX_ALIAS = '.kibana_locks'; -export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`; +import { LOCKS_CONCRETE_INDEX_NAME, setuplockManagerIndex } from './setup_lock_manager_index'; export type LockId = string; export interface LockDocument { @@ -38,7 +36,12 @@ export interface AcquireOptions { ttl?: number; } -const createLocksWriteIndexOnce = once(createLocksWriteIndex); +// The index assets should only be set up once +// For testing purposes, we need to be able to set it up every time +let runSetupIndexAssetOnce = once(setuplockManagerIndex); +export function runSetupIndexAssetEveryTime() { + runSetupIndexAssetOnce = setuplockManagerIndex; +} export class LockManager { private token = uuid(); @@ -58,7 +61,8 @@ export class LockManager { ttl = duration(30, 'seconds').asMilliseconds(), }: AcquireOptions = {}): Promise { let response: Awaited>; - await createLocksWriteIndexOnce(this.esClient); + + await runSetupIndexAssetOnce(this.esClient, this.logger); this.token = uuid(); try { @@ -303,45 +307,6 @@ export async function withLock( } } -export async function ensureTemplatesAndIndexCreated(esClient: ElasticsearchClient): Promise { - const COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`; - const INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`; - const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; - - await esClient.cluster.putComponentTemplate({ - name: COMPONENT_TEMPLATE_NAME, - template: { - mappings: { - dynamic: false, - properties: { - token: { type: 'keyword' }, - metadata: { enabled: false }, - createdAt: { type: 'date' }, - expiresAt: { type: 'date' }, - }, - }, - }, - }); - - await esClient.indices.putIndexTemplate({ - name: INDEX_TEMPLATE_NAME, - index_patterns: [INDEX_PATTERN], - composed_of: [COMPONENT_TEMPLATE_NAME], - priority: 500, - template: { - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - hidden: true, - }, - }, - }); -} - -export async function createLocksWriteIndex(esClient: ElasticsearchClient): Promise { - await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] }); -} - function isVersionConflictException(e: Error): boolean { return ( e instanceof errors.ResponseError && e.body?.error?.type === 'version_conflict_engine_exception' diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts new file mode 100644 index 0000000000000..e02ed86b7ef3a --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { errors } from '@elastic/elasticsearch'; +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types'; + +const LOCKS_INDEX_ALIAS = '.kibana_locks'; +export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`; +export const LOCKS_COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`; +export const LOCKS_INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`; + +export async function removeLockIndexWithIncorrectMappings( + esClient: ElasticsearchClient, + logger: Logger +) { + let res: IndicesGetMappingResponse; + try { + res = await esClient.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME }); + } catch (error) { + const isNotFoundError = error instanceof errors.ResponseError && error.statusCode === 404; + if (!isNotFoundError) { + logger.error( + `Failed to get mapping for lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}` + ); + } + + return; + } + + const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME]; + const hasIncorrectMappings = + mappings.properties?.token?.type !== 'keyword' || + mappings.properties?.expiresAt?.type !== 'date'; + + if (hasIncorrectMappings) { + logger.warn(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" has incorrect mappings.`); + try { + await esClient.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME }); + logger.info(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" removed successfully.`); + } catch (error) { + logger.error(`Failed to remove lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`); + } + } +} + +export async function ensureTemplatesAndIndexCreated( + esClient: ElasticsearchClient, + logger: Logger +): Promise { + const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; + + await esClient.cluster.putComponentTemplate({ + name: LOCKS_COMPONENT_TEMPLATE_NAME, + template: { + mappings: { + dynamic: false, + properties: { + token: { type: 'keyword' }, + metadata: { enabled: false }, + createdAt: { type: 'date' }, + expiresAt: { type: 'date' }, + }, + }, + }, + }); + logger.info( + `Component template ${LOCKS_COMPONENT_TEMPLATE_NAME} created or updated successfully.` + ); + + await esClient.indices.putIndexTemplate({ + name: LOCKS_INDEX_TEMPLATE_NAME, + index_patterns: [INDEX_PATTERN], + composed_of: [LOCKS_COMPONENT_TEMPLATE_NAME], + priority: 500, + template: { + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + hidden: true, + }, + }, + }); + logger.info(`Index template ${LOCKS_INDEX_TEMPLATE_NAME} created or updated successfully.`); + + await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] }); + logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created or updated successfully.`); +} + +export async function setuplockManagerIndex(esClient: ElasticsearchClient, logger: Logger) { + await removeLockIndexWithIncorrectMappings(esClient, logger); // TODO: should be removed in the future (after 9.1). See https://github.com/elastic/kibana/issues/218944 + await ensureTemplatesAndIndexCreated(esClient, logger); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index be304d350f999..0e4178717472a 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -10,18 +10,21 @@ import { v4 as uuid } from 'uuid'; import prettyMilliseconds from 'pretty-ms'; import { LockId, - ensureTemplatesAndIndexCreated, LockManager, LockDocument, - LOCKS_CONCRETE_INDEX_NAME, - createLocksWriteIndex, withLock, + runSetupIndexAssetEveryTime, } from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/lock_manager_client'; import nock from 'nock'; import { Client } from '@elastic/elasticsearch'; import { times } from 'lodash'; import { ToolingLog } from '@kbn/tooling-log'; import pRetry from 'p-retry'; +import { + LOCKS_COMPONENT_TEMPLATE_NAME, + LOCKS_CONCRETE_INDEX_NAME, + LOCKS_INDEX_TEMPLATE_NAME, +} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/setup_lock_manager_index'; import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; import { getLoggerMock } from '../utils/logger'; import { dateAsTimestamp, durationAsMs, sleep } from '../utils/time'; @@ -32,6 +35,18 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const logger = getLoggerMock(log); describe('LockManager', function () { + before(async () => { + // delete existing index mappings to ensure we start from a clean state + await deleteLockIndexAssets(es, log); + + // ensure that the index and templates are created + runSetupIndexAssetEveryTime(); + }); + + after(async () => { + await deleteLockIndexAssets(es, log); + }); + describe('Manual locking API', function () { this.tags(['failsOnMKI']); before(async () => { @@ -642,20 +657,108 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon }); }); }); + + describe('index assets', () => { + describe('when lock index is created with incorrect mappings', () => { + before(async () => { + await deleteLockIndexAssets(es, log); + await es.index({ + refresh: true, + index: LOCKS_CONCRETE_INDEX_NAME, + id: 'my_lock_with_incorrect_mappings', + document: { + token: 'my token', + expiresAt: new Date(Date.now() + 100000), + createdAt: new Date(), + metadata: { foo: 'bar' }, + }, + }); + }); + + it('should delete the index and re-create it', async () => { + const mappingsBefore = await getMappings(es); + log.debug(`Mappings before: ${JSON.stringify(mappingsBefore)}`); + expect(mappingsBefore.properties?.token.type).to.eql('text'); + + // Simulate a scenario where the index mappings are incorrect and a lock is added + // it should delete the index and re-create it with the correct mappings + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); + + const mappingsAfter = await getMappings(es); + log.debug(`Mappings after: ${JSON.stringify(mappingsAfter)}`); + expect(mappingsAfter.properties?.token.type).to.be('keyword'); + }); + }); + + describe('when lock index is created with correct mappings', () => { + before(async () => { + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); + + // wait for the index to be created + await es.indices.refresh({ index: LOCKS_CONCRETE_INDEX_NAME }); + }); + + it('should have the correct mappings for the lock index', async () => { + const mappings = await getMappings(es); + + const expectedMapping = { + dynamic: 'false', + properties: { + token: { type: 'keyword' }, + expiresAt: { type: 'date' }, + createdAt: { type: 'date' }, + metadata: { enabled: false, type: 'object' }, + }, + }; + + expect(mappings).to.eql(expectedMapping); + }); + + it('has the right number_of_replicas', async () => { + const settings = await getSettings(es); + expect(settings?.index?.auto_expand_replicas).to.eql('0-1'); + }); + + it('does not delete the index when adding a new lock', async () => { + const settingsBefore = await getSettings(es); + + await withLock({ esClient: es, lockId: uuid(), logger }, async () => {}); + + const settingsAfter = await getSettings(es); + expect(settingsAfter?.uuid).to.be(settingsBefore?.uuid); + }); + }); + }); }); } -function clearAllLocks(es: Client) { - return es.deleteByQuery( - { - index: LOCKS_CONCRETE_INDEX_NAME, - query: { match_all: {} }, - refresh: true, - }, +async function deleteLockIndexAssets(es: Client, log: ToolingLog) { + log.debug(`Deleting index assets`); + await es.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [404] }); + await es.indices.deleteIndexTemplate({ name: LOCKS_INDEX_TEMPLATE_NAME }, { ignore: [404] }); + await es.cluster.deleteComponentTemplate( + { name: LOCKS_COMPONENT_TEMPLATE_NAME }, { ignore: [404] } ); } +function clearAllLocks(es: Client, log: ToolingLog) { + try { + return es.deleteByQuery( + { + index: LOCKS_CONCRETE_INDEX_NAME, + query: { match_all: {} }, + refresh: true, + conflicts: 'proceed', + }, + { ignore: [404] } + ); + } catch (e) { + log.error(`Failed to clear locks: ${e.message}`); + log.debug(e); + } +} + async function getLocks(es: Client) { const res = await es.search({ index: LOCKS_CONCRETE_INDEX_NAME, @@ -690,3 +793,15 @@ async function getLockById(esClient: Client, lockId: LockId): Promise Date: Tue, 29 Apr 2025 18:42:45 +0200 Subject: [PATCH 4/7] [LockManager] Expose as package (#219220) Expose LockManager as package to make it easier to consume from other plugins cc @nchaulet --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Viduni Wickramarachchi (cherry picked from commit 8b8d569986f216185755aa7ac98a2a3bbeb84f76) --- .github/CODEOWNERS | 48 +++++++++++++++++ package.json | 1 + packages/kbn-lock-manager/README.md | 54 +++++++++++++++++++ packages/kbn-lock-manager/index.ts | 11 ++++ packages/kbn-lock-manager/jest.config.js | 14 +++++ packages/kbn-lock-manager/kibana.jsonc | 6 +++ packages/kbn-lock-manager/package.json | 6 +++ .../src}/lock_manager_client.ts | 14 ++--- .../src}/lock_manager_service.ts | 8 +-- .../src}/setup_lock_manager_index.ts | 31 ++++++++--- packages/kbn-lock-manager/tsconfig.json | 20 +++++++ tsconfig.base.json | 2 + .../service/knowledge_base_service/index.ts | 2 +- .../reindex_knowledge_base.ts | 2 +- ...e_missing_semantic_text_field_migration.ts | 3 +- .../observability_ai_assistant/tsconfig.json | 3 +- .../distributed_lock_manager.spec.ts | 50 ++++++++++++++--- x-pack/test/tsconfig.json | 1 + yarn.lock | 4 ++ 19 files changed, 251 insertions(+), 29 deletions(-) create mode 100644 packages/kbn-lock-manager/README.md create mode 100644 packages/kbn-lock-manager/index.ts create mode 100644 packages/kbn-lock-manager/jest.config.js create mode 100644 packages/kbn-lock-manager/kibana.jsonc create mode 100644 packages/kbn-lock-manager/package.json rename {x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager => packages/kbn-lock-manager/src}/lock_manager_client.ts (94%) rename {x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager => packages/kbn-lock-manager/src}/lock_manager_service.ts (74%) rename {x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager => packages/kbn-lock-manager/src}/setup_lock_manager_index.ts (73%) create mode 100644 packages/kbn-lock-manager/tsconfig.json diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d7f97d0646f7c..2776d01b4bc03 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -84,6 +84,7 @@ packages/kbn-ci-stats-performance-metrics @elastic/kibana-operations src/platform/packages/private/kbn-ci-stats-reporter @elastic/kibana-operations packages/kbn-ci-stats-shipper-cli @elastic/kibana-operations packages/kbn-cli-dev-mode @elastic/kibana-operations +<<<<<<< HEAD src/platform/packages/shared/cloud @elastic/kibana-core x-pack/platform/plugins/private/cloud_integrations/cloud_chat @elastic/kibana-core x-pack/platform/plugins/private/cloud_integrations/cloud_data_migration @elastic/kibana-management @@ -122,6 +123,53 @@ src/platform/packages/shared/kbn-content-management-utils @elastic/kibana-data-d x-pack/platform/packages/shared/kbn-content-packs-schema @elastic/streams-program-team examples/controls_example @elastic/kibana-presentation src/platform/plugins/shared/controls @elastic/kibana-presentation +======= +packages/kbn-dependency-ownership @elastic/kibana-security +packages/kbn-dependency-usage @elastic/kibana-security +packages/kbn-docs-utils @elastic/kibana-operations +packages/kbn-eslint-config @elastic/kibana-operations +packages/kbn-eslint-plugin-disable @elastic/kibana-operations +packages/kbn-eslint-plugin-eslint @elastic/kibana-operations +packages/kbn-eslint-plugin-eui-a11y @elastic/obs-ux-infra_services-team +packages/kbn-eslint-plugin-i18n @elastic/obs-knowledge-team @elastic/kibana-operations +packages/kbn-eslint-plugin-imports @elastic/kibana-operations +packages/kbn-eslint-plugin-telemetry @elastic/obs-knowledge-team +packages/kbn-failed-test-reporter-cli @elastic/kibana-operations @elastic/appex-qa +packages/kbn-find-used-node-modules @elastic/kibana-operations +packages/kbn-generate @elastic/kibana-operations +packages/kbn-generate-console-definitions @elastic/kibana-management +packages/kbn-import-locator @elastic/kibana-operations +packages/kbn-json-ast @elastic/kibana-operations +packages/kbn-kibana-manifest-schema @elastic/kibana-operations +packages/kbn-lint-packages-cli @elastic/kibana-operations +packages/kbn-lint-ts-projects-cli @elastic/kibana-operations +packages/kbn-lock-manager @elastic/obs-ai-assistant +packages/kbn-managed-vscode-config @elastic/kibana-operations +packages/kbn-managed-vscode-config-cli @elastic/kibana-operations +packages/kbn-manifest @elastic/kibana-core +packages/kbn-mock-idp-plugin @elastic/kibana-security +packages/kbn-optimizer @elastic/kibana-operations +packages/kbn-peggy-loader @elastic/kibana-operations +packages/kbn-performance-testing-dataset-extractor @elastic/kibana-performance-testing +packages/kbn-picomatcher @elastic/kibana-operations +packages/kbn-plugin-check @elastic/appex-sharedux +packages/kbn-plugin-generator @elastic/kibana-operations +packages/kbn-plugin-helpers @elastic/kibana-operations +packages/kbn-relocate @elastic/kibana-core +packages/kbn-repo-file-maps @elastic/kibana-operations +packages/kbn-repo-linter @elastic/kibana-operations +packages/kbn-repo-source-classifier @elastic/kibana-operations +packages/kbn-repo-source-classifier-cli @elastic/kibana-operations +packages/kbn-set-map @elastic/kibana-operations +packages/kbn-sort-package-json @elastic/kibana-operations +packages/kbn-styled-components-mapping-cli @elastic/kibana-operations @elastic/eui-team +packages/kbn-ts-projects @elastic/kibana-operations +packages/kbn-ts-type-check-cli @elastic/kibana-operations +packages/kbn-validate-next-docs-cli @elastic/kibana-operations +packages/kbn-web-worker-stub @elastic/kibana-operations +packages/kbn-whereis-pkg-cli @elastic/kibana-operations +packages/kbn-yarn-lock-validator @elastic/kibana-operations +>>>>>>> 8b8d569986f ([LockManager] Expose as package (#219220)) src/core @elastic/kibana-core src/core/packages/analytics/browser @elastic/kibana-core src/core/packages/analytics/browser-internal @elastic/kibana-core diff --git a/package.json b/package.json index f7adfc3033906..363b58841a117 100644 --- a/package.json +++ b/package.json @@ -632,6 +632,7 @@ "@kbn/llm-tasks-plugin": "link:x-pack/platform/plugins/shared/ai_infra/llm_tasks", "@kbn/locator-examples-plugin": "link:examples/locator_examples", "@kbn/locator-explorer-plugin": "link:examples/locator_explorer", + "@kbn/lock-manager": "link:packages/kbn-lock-manager", "@kbn/logging": "link:src/platform/packages/shared/kbn-logging", "@kbn/logging-mocks": "link:src/platform/packages/shared/kbn-logging-mocks", "@kbn/logs-data-access-plugin": "link:x-pack/platform/plugins/shared/logs_data_access", diff --git a/packages/kbn-lock-manager/README.md b/packages/kbn-lock-manager/README.md new file mode 100644 index 0000000000000..7d6e5886ccc6d --- /dev/null +++ b/packages/kbn-lock-manager/README.md @@ -0,0 +1,54 @@ +# Kibana Lock Manager + +A simple, distributed lock manager built on top of Elasticsearch. +Ensures that only one process at a time can hold a named lock, with automatic lease renewal and token fencing for safe release. + +# API Documentation + +## `withLock(lockId, callback, options)` + +Acquires a lock and executes the provided callback. If the lock is already held by another process, the method will throw a `LockAcquisitionError` and the callback will not be executed. When the callback returns the lock is released. + +### Parameters + +- **`lockId`** (`string`): Unique identifier for the lock + +- **`callback`** (`() => Promise`): Asynchronous function to execute once the lock is acquired. This function will be executed only if the lock acquisition succeeds. + +- **`options`** (`object`, optional): Additional configuration options. + - **`metadata`** (`Record`, optional): Custom metadata to store with the lock. + +## Example + +```ts +import { LockManagerService, LockAcquisitionError } from '@kbn/lock-manager'; + + +async function reIndexWithLock() { + // Attempt to acquire "my_lock"; if successful, runs the callback. + const lmService = new LockManagerService(coreSetup, logger); + return lmService.withLock('my_lock', async () => { + // …perform your exclusive operation here… + }); +} + +reIndexWithLock().catch((err) => { + if (err instanceof LockAcquisitionError) { + logger.debug('Re-index already in progress, skipping.'); + return; + } + logger.error(`Failed to re-index: ${err.message}`); +}); +``` + +## How It Works +**Atomic Acquire** +Performs one atomic Elasticsearch update that creates a new lock or renews an existing one - so if multiple processes race for the same lock, only one succeeds. + +**TTL-Based Lease** +Each lock has a short, fixed lifespan (default 30s) and will automatically expire if not renewed. While the callback is executing, the lock will automatically extend the TTL to keep the lock active. This safeguards against deadlocks because if a Kibana node crashes after having obtained a lock it will automatically be released after 30 seconds. + +Note: If Kibana node crashes, another process could acquire the same lock and start that task again when the lock automatically expires. To prevent your operation from running multiple times, include an application-level check (for example, querying Elasticsearch or your own status flag) to verify the operation isn’t already in progress before proceeding. + +**Token Fencing** +Each lock operation carries a unique token. Only the process with the matching token can extend or release the lock, preventing stale holders from interfering. \ No newline at end of file diff --git a/packages/kbn-lock-manager/index.ts b/packages/kbn-lock-manager/index.ts new file mode 100644 index 0000000000000..1ce7a5cc9aa22 --- /dev/null +++ b/packages/kbn-lock-manager/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +export { LockAcquisitionError } from './src/lock_manager_client'; +export { LockManagerService } from './src/lock_manager_service'; diff --git a/packages/kbn-lock-manager/jest.config.js b/packages/kbn-lock-manager/jest.config.js new file mode 100644 index 0000000000000..8dabddd8b1618 --- /dev/null +++ b/packages/kbn-lock-manager/jest.config.js @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +module.exports = { + preset: '@kbn/test/jest_node', + rootDir: '../..', + roots: ['/packages/kbn-lock-manager'], +}; diff --git a/packages/kbn-lock-manager/kibana.jsonc b/packages/kbn-lock-manager/kibana.jsonc new file mode 100644 index 0000000000000..949af71daec55 --- /dev/null +++ b/packages/kbn-lock-manager/kibana.jsonc @@ -0,0 +1,6 @@ +{ + "type": "shared-server", + "id": "@kbn/lock-manager", + "owner": ["@elastic/obs-ai-assistant"], + "devOnly": false +} diff --git a/packages/kbn-lock-manager/package.json b/packages/kbn-lock-manager/package.json new file mode 100644 index 0000000000000..254b94959770a --- /dev/null +++ b/packages/kbn-lock-manager/package.json @@ -0,0 +1,6 @@ +{ + "name": "@kbn/lock-manager", + "private": true, + "version": "1.0.0", + "license": "Elastic License 2.0 OR AGPL-3.0-only OR SSPL-1.0" +} \ No newline at end of file diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts b/packages/kbn-lock-manager/src/lock_manager_client.ts similarity index 94% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts rename to packages/kbn-lock-manager/src/lock_manager_client.ts index 95f69c9c02358..ebc0a8f2c7e7f 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_client.ts +++ b/packages/kbn-lock-manager/src/lock_manager_client.ts @@ -1,8 +1,10 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ // eslint-disable-next-line max-classes-per-file @@ -13,7 +15,7 @@ import prettyMilliseconds from 'pretty-ms'; import { once } from 'lodash'; import { duration } from 'moment'; import { ElasticsearchClient } from '@kbn/core/server'; -import { LOCKS_CONCRETE_INDEX_NAME, setuplockManagerIndex } from './setup_lock_manager_index'; +import { LOCKS_CONCRETE_INDEX_NAME, setupLockManagerIndex } from './setup_lock_manager_index'; export type LockId = string; export interface LockDocument { @@ -38,9 +40,9 @@ export interface AcquireOptions { // The index assets should only be set up once // For testing purposes, we need to be able to set it up every time -let runSetupIndexAssetOnce = once(setuplockManagerIndex); +let runSetupIndexAssetOnce = once(setupLockManagerIndex); export function runSetupIndexAssetEveryTime() { - runSetupIndexAssetOnce = setuplockManagerIndex; + runSetupIndexAssetOnce = setupLockManagerIndex; } export class LockManager { diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts b/packages/kbn-lock-manager/src/lock_manager_service.ts similarity index 74% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts rename to packages/kbn-lock-manager/src/lock_manager_service.ts index da9cdfef5a811..b7c03a43fd47c 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/lock_manager_service.ts +++ b/packages/kbn-lock-manager/src/lock_manager_service.ts @@ -1,8 +1,10 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ import { CoreSetup, Logger } from '@kbn/core/server'; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts b/packages/kbn-lock-manager/src/setup_lock_manager_index.ts similarity index 73% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts rename to packages/kbn-lock-manager/src/setup_lock_manager_index.ts index e02ed86b7ef3a..626f5d9ec1546 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/distributed_lock_manager/setup_lock_manager_index.ts +++ b/packages/kbn-lock-manager/src/setup_lock_manager_index.ts @@ -1,8 +1,10 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". */ import { errors } from '@elastic/elasticsearch'; @@ -10,6 +12,7 @@ import { ElasticsearchClient, Logger } from '@kbn/core/server'; import { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types'; const LOCKS_INDEX_ALIAS = '.kibana_locks'; +const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`; export const LOCKS_COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`; export const LOCKS_INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`; @@ -52,8 +55,6 @@ export async function ensureTemplatesAndIndexCreated( esClient: ElasticsearchClient, logger: Logger ): Promise { - const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`; - await esClient.cluster.putComponentTemplate({ name: LOCKS_COMPONENT_TEMPLATE_NAME, template: { @@ -87,11 +88,25 @@ export async function ensureTemplatesAndIndexCreated( }); logger.info(`Index template ${LOCKS_INDEX_TEMPLATE_NAME} created or updated successfully.`); - await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] }); - logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created or updated successfully.`); + try { + await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }); + logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created successfully.`); + } catch (error) { + const isIndexAlreadyExistsError = + error instanceof errors.ResponseError && + error.body.error.type === 'resource_already_exists_exception'; + + if (isIndexAlreadyExistsError) { + logger.debug(`Index ${LOCKS_CONCRETE_INDEX_NAME} already exists. Skipping creation.`); + return; + } + + logger.error(`Unable to create index ${LOCKS_CONCRETE_INDEX_NAME}: ${error.message}`); + throw error; + } } -export async function setuplockManagerIndex(esClient: ElasticsearchClient, logger: Logger) { +export async function setupLockManagerIndex(esClient: ElasticsearchClient, logger: Logger) { await removeLockIndexWithIncorrectMappings(esClient, logger); // TODO: should be removed in the future (after 9.1). See https://github.com/elastic/kibana/issues/218944 await ensureTemplatesAndIndexCreated(esClient, logger); } diff --git a/packages/kbn-lock-manager/tsconfig.json b/packages/kbn-lock-manager/tsconfig.json new file mode 100644 index 0000000000000..b854afaadbd5b --- /dev/null +++ b/packages/kbn-lock-manager/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "target/types", + "types": [ + "jest", + "node" + ] + }, + "include": [ + "**/*.ts" + ], + "exclude": [ + "target/**/*" + ], + "kbn_references": [ + "@kbn/logging", + "@kbn/core", + ] +} diff --git a/tsconfig.base.json b/tsconfig.base.json index 5257f2804863c..308138590dfba 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1186,6 +1186,8 @@ "@kbn/locator-examples-plugin/*": ["examples/locator_examples/*"], "@kbn/locator-explorer-plugin": ["examples/locator_explorer"], "@kbn/locator-explorer-plugin/*": ["examples/locator_explorer/*"], + "@kbn/lock-manager": ["packages/kbn-lock-manager"], + "@kbn/lock-manager/*": ["packages/kbn-lock-manager/*"], "@kbn/logging": ["src/platform/packages/shared/kbn-logging"], "@kbn/logging/*": ["src/platform/packages/shared/kbn-logging/*"], "@kbn/logging-mocks": ["src/platform/packages/shared/kbn-logging-mocks"], diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts index 324b7608086f9..d666ee7aec832 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -10,6 +10,7 @@ import type { CoreSetup, ElasticsearchClient, IUiSettingsClient } from '@kbn/cor import type { Logger } from '@kbn/logging'; import { orderBy } from 'lodash'; import { encode } from 'gpt-tokenizer'; +import { LockAcquisitionError } from '@kbn/lock-manager'; import { resourceNames } from '..'; import { Instruction, @@ -34,7 +35,6 @@ import { isSemanticTextUnsupportedError, reIndexKnowledgeBaseWithLock, } from './reindex_knowledge_base'; -import { LockAcquisitionError } from '../distributed_lock_manager/lock_manager_client'; interface Dependencies { core: CoreSetup; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts index a7c6b5d6391bd..776f123e70f34 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts @@ -9,9 +9,9 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import { CoreSetup } from '@kbn/core/server'; +import { LockManagerService } from '@kbn/lock-manager'; import { resourceNames } from '..'; import { createKbConcreteIndex } from '../startup_migrations/create_or_update_index_assets'; -import { LockManagerService } from '../distributed_lock_manager/lock_manager_service'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; export const KB_REINDEXING_LOCK_ID = 'observability_ai_assistant:kb_reindexing'; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts index 692a9bdd431ab..70bfbbe6a9c6a 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_field_migration.ts @@ -10,14 +10,13 @@ import pLimit from 'p-limit'; import type { CoreSetup, Logger } from '@kbn/core/server'; import { uniq } from 'lodash'; import pRetry from 'p-retry'; +import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; import { KnowledgeBaseEntry } from '../../../common'; import { resourceNames } from '..'; import { waitForKbModel } from '../inference_endpoint'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; import { reIndexKnowledgeBaseWithLock } from '../knowledge_base_service/reindex_knowledge_base'; -import { LockManagerService } from '../distributed_lock_manager/lock_manager_service'; -import { LockAcquisitionError } from '../distributed_lock_manager/lock_manager_client'; const PLUGIN_STARTUP_LOCK_ID = 'observability_ai_assistant:startup_migrations'; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json b/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json index 63485425b8cd6..5a0faa995ffa3 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/tsconfig.json @@ -53,7 +53,8 @@ "@kbn/ai-assistant-icon", "@kbn/core-http-browser", "@kbn/sse-utils", - "@kbn/core-security-server" + "@kbn/core-security-server", + "@kbn/lock-manager" ], "exclude": ["target/**/*"] } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index 0e4178717472a..1899ddf75cec5 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -8,23 +8,25 @@ import expect from '@kbn/expect'; import { v4 as uuid } from 'uuid'; import prettyMilliseconds from 'pretty-ms'; +import nock from 'nock'; +import { Client } from '@elastic/elasticsearch'; +import { times } from 'lodash'; +import { ToolingLog } from '@kbn/tooling-log'; +import pRetry from 'p-retry'; import { LockId, LockManager, LockDocument, withLock, runSetupIndexAssetEveryTime, -} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/lock_manager_client'; -import nock from 'nock'; -import { Client } from '@elastic/elasticsearch'; -import { times } from 'lodash'; -import { ToolingLog } from '@kbn/tooling-log'; -import pRetry from 'p-retry'; +} from '@kbn/lock-manager/src/lock_manager_client'; import { LOCKS_COMPONENT_TEMPLATE_NAME, LOCKS_CONCRETE_INDEX_NAME, LOCKS_INDEX_TEMPLATE_NAME, -} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/setup_lock_manager_index'; + setupLockManagerIndex, +} from '@kbn/lock-manager/src/setup_lock_manager_index'; + import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; import { getLoggerMock } from '../utils/logger'; import { dateAsTimestamp, durationAsMs, sleep } from '../utils/time'; @@ -728,6 +730,40 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(settingsAfter?.uuid).to.be(settingsBefore?.uuid); }); }); + + describe('when setting up index assets', () => { + beforeEach(async () => { + await deleteLockIndexAssets(es, log); + }); + + it('can run in parallel', async () => { + try { + await Promise.all([ + setupLockManagerIndex(es, logger), + setupLockManagerIndex(es, logger), + setupLockManagerIndex(es, logger), + ]); + } catch (error) { + expect().fail(`Parallel setup should not throw but got error: ${error.message}`); + } + + const indexExists = await es.indices.exists({ index: LOCKS_CONCRETE_INDEX_NAME }); + expect(indexExists).to.be(true); + }); + + it('can run in sequence', async () => { + try { + await setupLockManagerIndex(es, logger); + await setupLockManagerIndex(es, logger); + await setupLockManagerIndex(es, logger); + } catch (error) { + expect().fail(`Sequential setup should not throw but got error: ${error.message}`); + } + + const indexExists = await es.indices.exists({ index: LOCKS_CONCRETE_INDEX_NAME }); + expect(indexExists).to.be(true); + }); + }); }); }); } diff --git a/x-pack/test/tsconfig.json b/x-pack/test/tsconfig.json index 5914007b94a11..326752fb8d140 100644 --- a/x-pack/test/tsconfig.json +++ b/x-pack/test/tsconfig.json @@ -199,5 +199,6 @@ "@kbn/aiops-change-point-detection", "@kbn/es-errors", "@kbn/content-packs-schema", + "@kbn/lock-manager", ] } diff --git a/yarn.lock b/yarn.lock index 135c573e63c11..dc6e00b54ea93 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6006,6 +6006,10 @@ version "0.0.0" uid "" +"@kbn/lock-manager@link:packages/kbn-lock-manager": + version "0.0.0" + uid "" + "@kbn/logging-mocks@link:src/platform/packages/shared/kbn-logging-mocks": version "0.0.0" uid "" From 8f511e665a622b041ea19bc919c902e4740154f1 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Wed, 30 Apr 2025 22:02:50 +0000 Subject: [PATCH 5/7] [CI] Auto-commit changed files from 'node scripts/generate codeowners' --- .github/CODEOWNERS | 49 +--------------------------------------------- 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2776d01b4bc03..73a0347de7cd8 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -84,7 +84,6 @@ packages/kbn-ci-stats-performance-metrics @elastic/kibana-operations src/platform/packages/private/kbn-ci-stats-reporter @elastic/kibana-operations packages/kbn-ci-stats-shipper-cli @elastic/kibana-operations packages/kbn-cli-dev-mode @elastic/kibana-operations -<<<<<<< HEAD src/platform/packages/shared/cloud @elastic/kibana-core x-pack/platform/plugins/private/cloud_integrations/cloud_chat @elastic/kibana-core x-pack/platform/plugins/private/cloud_integrations/cloud_data_migration @elastic/kibana-management @@ -123,53 +122,6 @@ src/platform/packages/shared/kbn-content-management-utils @elastic/kibana-data-d x-pack/platform/packages/shared/kbn-content-packs-schema @elastic/streams-program-team examples/controls_example @elastic/kibana-presentation src/platform/plugins/shared/controls @elastic/kibana-presentation -======= -packages/kbn-dependency-ownership @elastic/kibana-security -packages/kbn-dependency-usage @elastic/kibana-security -packages/kbn-docs-utils @elastic/kibana-operations -packages/kbn-eslint-config @elastic/kibana-operations -packages/kbn-eslint-plugin-disable @elastic/kibana-operations -packages/kbn-eslint-plugin-eslint @elastic/kibana-operations -packages/kbn-eslint-plugin-eui-a11y @elastic/obs-ux-infra_services-team -packages/kbn-eslint-plugin-i18n @elastic/obs-knowledge-team @elastic/kibana-operations -packages/kbn-eslint-plugin-imports @elastic/kibana-operations -packages/kbn-eslint-plugin-telemetry @elastic/obs-knowledge-team -packages/kbn-failed-test-reporter-cli @elastic/kibana-operations @elastic/appex-qa -packages/kbn-find-used-node-modules @elastic/kibana-operations -packages/kbn-generate @elastic/kibana-operations -packages/kbn-generate-console-definitions @elastic/kibana-management -packages/kbn-import-locator @elastic/kibana-operations -packages/kbn-json-ast @elastic/kibana-operations -packages/kbn-kibana-manifest-schema @elastic/kibana-operations -packages/kbn-lint-packages-cli @elastic/kibana-operations -packages/kbn-lint-ts-projects-cli @elastic/kibana-operations -packages/kbn-lock-manager @elastic/obs-ai-assistant -packages/kbn-managed-vscode-config @elastic/kibana-operations -packages/kbn-managed-vscode-config-cli @elastic/kibana-operations -packages/kbn-manifest @elastic/kibana-core -packages/kbn-mock-idp-plugin @elastic/kibana-security -packages/kbn-optimizer @elastic/kibana-operations -packages/kbn-peggy-loader @elastic/kibana-operations -packages/kbn-performance-testing-dataset-extractor @elastic/kibana-performance-testing -packages/kbn-picomatcher @elastic/kibana-operations -packages/kbn-plugin-check @elastic/appex-sharedux -packages/kbn-plugin-generator @elastic/kibana-operations -packages/kbn-plugin-helpers @elastic/kibana-operations -packages/kbn-relocate @elastic/kibana-core -packages/kbn-repo-file-maps @elastic/kibana-operations -packages/kbn-repo-linter @elastic/kibana-operations -packages/kbn-repo-source-classifier @elastic/kibana-operations -packages/kbn-repo-source-classifier-cli @elastic/kibana-operations -packages/kbn-set-map @elastic/kibana-operations -packages/kbn-sort-package-json @elastic/kibana-operations -packages/kbn-styled-components-mapping-cli @elastic/kibana-operations @elastic/eui-team -packages/kbn-ts-projects @elastic/kibana-operations -packages/kbn-ts-type-check-cli @elastic/kibana-operations -packages/kbn-validate-next-docs-cli @elastic/kibana-operations -packages/kbn-web-worker-stub @elastic/kibana-operations -packages/kbn-whereis-pkg-cli @elastic/kibana-operations -packages/kbn-yarn-lock-validator @elastic/kibana-operations ->>>>>>> 8b8d569986f ([LockManager] Expose as package (#219220)) src/core @elastic/kibana-core src/core/packages/analytics/browser @elastic/kibana-core src/core/packages/analytics/browser-internal @elastic/kibana-core @@ -644,6 +596,7 @@ x-pack/solutions/security/plugins/lists @elastic/security-detection-engine x-pack/platform/plugins/shared/ai_infra/llm_tasks @elastic/appex-ai-infra examples/locator_examples @elastic/appex-sharedux examples/locator_explorer @elastic/appex-sharedux +packages/kbn-lock-manager @elastic/obs-ai-assistant src/platform/packages/shared/kbn-logging @elastic/kibana-core src/platform/packages/shared/kbn-logging-mocks @elastic/kibana-core x-pack/platform/plugins/shared/logs_data_access @elastic/obs-ux-logs-team From 39334864c7b85a713b6aadfe5b10f2f1e196fdd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Thu, 1 May 2025 09:58:05 +0200 Subject: [PATCH 6/7] Fix test --- .../distributed_lock_manager.spec.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index 1899ddf75cec5..6e41e58d47f12 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -37,6 +37,8 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const logger = getLoggerMock(log); describe('LockManager', function () { + // see details: https://github.com/elastic/kibana/issues/219091 + this.tags(['failsOnMKI']); before(async () => { // delete existing index mappings to ensure we start from a clean state await deleteLockIndexAssets(es, log); @@ -52,9 +54,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('Manual locking API', function () { this.tags(['failsOnMKI']); before(async () => { - await ensureTemplatesAndIndexCreated(es); - await createLocksWriteIndex(es); - await clearAllLocks(es); + await clearAllLocks(es, log); }); describe('Basic lock operations', () => { @@ -429,7 +429,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon expect(lock!.token).to.be(newToken); // cleanup - await clearAllLocks(es); + await clearAllLocks(es, log); }); it('should use a fresh token on subsequent acquisitions', async () => { @@ -456,9 +456,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('withLock API', function () { this.tags(['failsOnMKI']); before(async () => { - await ensureTemplatesAndIndexCreated(es); - await createLocksWriteIndex(es); - await clearAllLocks(es); + await clearAllLocks(es, log); }); const LOCK_ID = 'my_lock_with_lock'; @@ -840,4 +838,4 @@ async function getSettings(es: Client) { const res = await es.indices.getSettings({ index: LOCKS_CONCRETE_INDEX_NAME }); const { settings } = res[LOCKS_CONCRETE_INDEX_NAME]; return settings; -} +} \ No newline at end of file From 9cf2d3c368a04c98af15be0cf5964c3e76d5fb25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Thu, 1 May 2025 09:58:44 +0200 Subject: [PATCH 7/7] Newline fix --- .../distributed_lock_manager/distributed_lock_manager.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts index 6e41e58d47f12..c0a511d4eb3eb 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/distributed_lock_manager/distributed_lock_manager.spec.ts @@ -838,4 +838,4 @@ async function getSettings(es: Client) { const res = await es.indices.getSettings({ index: LOCKS_CONCRETE_INDEX_NAME }); const { settings } = res[LOCKS_CONCRETE_INDEX_NAME]; return settings; -} \ No newline at end of file +}