diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/config.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/config.ts index 2f36d27889c14..6669c991fe2bd 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/config.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/config.ts @@ -11,6 +11,7 @@ export const config = schema.object({ enabled: schema.boolean({ defaultValue: true }), scope: schema.maybe(schema.oneOf([schema.literal('observability'), schema.literal('search')])), enableKnowledgeBase: schema.boolean({ defaultValue: true }), + disableKbSemanticTextMigration: schema.boolean({ defaultValue: false }), }); export type ObservabilityAIAssistantConfig = TypeOf; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts index 7ff878bfa7a9d..fd2e8d0d4b45e 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/plugin.ts @@ -31,7 +31,8 @@ import { registerFunctions } from './functions'; import { recallRankingEvent } from './analytics/recall_ranking'; import { initLangtrace } from './service/client/instrumentation/init_langtrace'; import { aiAssistantCapabilities } from '../common/capabilities'; -import { registerMigrateKnowledgeBaseEntriesTask } from './service/task_manager_definitions/register_migrate_knowledge_base_entries_task'; +import { registerAndScheduleKbSemanticTextMigrationTask } from './service/task_manager_definitions/register_kb_semantic_text_migration_task'; +import { updateExistingIndexAssets } from './service/create_or_update_index_assets'; export class ObservabilityAIAssistantPlugin implements @@ -128,14 +129,22 @@ export class ObservabilityAIAssistantPlugin config: this.config, })); - registerMigrateKnowledgeBaseEntriesTask({ + // Update existing index assets (mappings, templates, etc). This will not create assets if they do not exist. + updateExistingIndexAssets({ logger: this.logger.get('index_assets'), core }).catch((e) => + this.logger.error(`Index assets could not be updated: ${e.message}`) + ); + + // register task to migrate knowledge base entries to include semantic_text field + registerAndScheduleKbSemanticTextMigrationTask({ core, taskManager: plugins.taskManager, - logger: this.logger, + logger: this.logger.get('kb_semantic_text_migration_task'), config: this.config, - }).catch((e) => { - this.logger.error(`Knowledge base migration was not successfully: ${e.message}`); - }); + }).catch((e) => + this.logger.error( + `Knowledge base semantic_text migration task could not be registered: ${e.message}` + ) + ); service.register(registerFunctions); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/get_global_observability_ai_assistant_route_repository.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/get_global_observability_ai_assistant_route_repository.ts index 846a05797f975..ffdf9939ad7de 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/get_global_observability_ai_assistant_route_repository.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/get_global_observability_ai_assistant_route_repository.ts @@ -10,9 +10,11 @@ import { connectorRoutes } from './connectors/route'; import { conversationRoutes } from './conversations/route'; import { functionRoutes } from './functions/route'; import { knowledgeBaseRoutes } from './knowledge_base/route'; +import { topLevelRoutes } from './top_level/route'; export function getGlobalObservabilityAIAssistantServerRouteRepository() { return { + ...topLevelRoutes, ...chatRoutes, ...conversationRoutes, ...connectorRoutes, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts index 1597d43ce52b9..e9c7078c0062e 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts @@ -101,7 +101,7 @@ const resetKnowledgeBase = createObservabilityAIAssistantServerRoute({ }); const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerRoute({ - endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration', + endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', security: { authz: { requiredPrivileges: ['ai_assistant'], @@ -114,7 +114,7 @@ const semanticTextMigrationKnowledgeBase = createObservabilityAIAssistantServerR throw notImplemented(); } - return client.migrateKnowledgeBaseToSemanticText(); + return client.reIndexKnowledgeBaseAndPopulateSemanticTextField(); }, }); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts new file mode 100644 index 0000000000000..dc817f378263f --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/top_level/route.ts @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createOrUpdateIndexAssets } from '../../service/create_or_update_index_assets'; +import { createObservabilityAIAssistantServerRoute } from '../create_observability_ai_assistant_server_route'; + +const createOrUpdateIndexAssetsRoute = createObservabilityAIAssistantServerRoute({ + endpoint: 'POST /internal/observability_ai_assistant/index_assets', + security: { + authz: { + requiredPrivileges: ['ai_assistant'], + }, + }, + handler: async (resources): Promise => { + return createOrUpdateIndexAssets({ + logger: resources.logger, + core: resources.plugins.core.setup, + }); + }, +}); + +export const topLevelRoutes = { + ...createOrUpdateIndexAssetsRoute, +}; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts index 2d3a19444462b..bb2064a3dbb6b 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts @@ -72,9 +72,9 @@ import { extractTokenCount } from './operators/extract_token_count'; import { getGeneratedTitle } from './operators/get_generated_title'; import { instrumentAndCountTokens } from './operators/instrument_and_count_tokens'; import { - runSemanticTextKnowledgeBaseMigration, - scheduleSemanticTextMigration, -} from '../task_manager_definitions/register_migrate_knowledge_base_entries_task'; + reIndexKnowledgeBaseAndPopulateSemanticTextField, + scheduleKbSemanticTextMigrationTask, +} from '../task_manager_definitions/register_kb_semantic_text_migration_task'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; import { getElserModelId } from '../knowledge_base_service/get_elser_model_id'; @@ -660,12 +660,11 @@ export class ObservabilityAIAssistantClient { core .getStartServices() - .then(([_, pluginsStart]) => { - logger.debug('Schedule semantic text migration task'); - return scheduleSemanticTextMigration(pluginsStart); - }) + .then(([_, pluginsStart]) => + scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }) + ) .catch((error) => { - logger.error(`Failed to run semantic text migration task: ${error}`); + logger.error(`Failed to schedule semantic text migration task: ${error}`); }); return res; @@ -676,8 +675,8 @@ export class ObservabilityAIAssistantClient { return this.dependencies.knowledgeBaseService.reset(esClient); }; - migrateKnowledgeBaseToSemanticText = () => { - return runSemanticTextKnowledgeBaseMigration({ + reIndexKnowledgeBaseAndPopulateSemanticTextField = () => { + return reIndexKnowledgeBaseAndPopulateSemanticTextField({ esClient: this.dependencies.esClient, logger: this.dependencies.logger, config: this.dependencies.config, diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts similarity index 65% rename from x-pack/platform/plugins/shared/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts rename to x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts index b56628ce4b7ee..85a0e9ba8e42a 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/create_or_update_index_assets.ts @@ -6,13 +6,39 @@ */ import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server'; -import type { CoreSetup, Logger } from '@kbn/core/server'; +import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server'; import type { ObservabilityAIAssistantPluginStartDependencies } from '../types'; import { conversationComponentTemplate } from './conversation_component_template'; import { kbComponentTemplate } from './kb_component_template'; import { resourceNames } from '.'; -export async function setupConversationAndKbIndexAssets({ +export async function updateExistingIndexAssets({ + logger, + core, +}: { + logger: Logger; + core: CoreSetup; +}) { + const [coreStart] = await core.getStartServices(); + const { asInternalUser } = coreStart.elasticsearch.client; + + const hasKbIndex = await asInternalUser.indices.exists({ + index: resourceNames.aliases.kb, + }); + + const hasConversationIndex = await asInternalUser.indices.exists({ + index: resourceNames.aliases.conversations, + }); + + if (!hasKbIndex && !hasConversationIndex) { + logger.debug('Index assets do not exist. Aborting updating index assets'); + return; + } + + await createOrUpdateIndexAssets({ logger, core }); +} + +export async function createOrUpdateIndexAssets({ logger, core, }: { @@ -56,7 +82,7 @@ export async function setupConversationAndKbIndexAssets({ alias: conversationAliasName, pattern: `${conversationAliasName}*`, basePattern: `${conversationAliasName}*`, - name: `${conversationAliasName}-000001`, + name: resourceNames.concreteIndexName.conversations, template: resourceNames.indexTemplate.conversations, }, dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), @@ -86,20 +112,7 @@ export async function setupConversationAndKbIndexAssets({ }); // Knowledge base: write index - const kbAliasName = resourceNames.aliases.kb; - await createConcreteWriteIndex({ - esClient: asInternalUser, - logger, - totalFieldsLimit: 10000, - indexPatterns: { - alias: kbAliasName, - pattern: `${kbAliasName}*`, - basePattern: `${kbAliasName}*`, - name: `${kbAliasName}-000001`, - template: resourceNames.indexTemplate.kb, - }, - dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), - }); + await createKbConcreteIndex({ logger, esClient: coreStart.elasticsearch.client }); logger.info('Successfully set up index assets'); } catch (error) { @@ -107,3 +120,28 @@ export async function setupConversationAndKbIndexAssets({ logger.debug(error); } } + +export async function createKbConcreteIndex({ + logger, + esClient, +}: { + logger: Logger; + esClient: { + asInternalUser: ElasticsearchClient; + }; +}) { + const kbAliasName = resourceNames.aliases.kb; + return createConcreteWriteIndex({ + esClient: esClient.asInternalUser, + logger, + totalFieldsLimit: 10000, + indexPatterns: { + alias: kbAliasName, + pattern: `${kbAliasName}*`, + basePattern: `${kbAliasName}*`, + name: resourceNames.concreteIndexName.kb, + template: resourceNames.indexTemplate.kb, + }, + dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), + }); +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts index dcd79f5d57873..adc7ea2822747 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/index.ts @@ -17,7 +17,7 @@ import { ObservabilityAIAssistantClient } from './client'; import { KnowledgeBaseService } from './knowledge_base_service'; import type { RegistrationCallback, RespondFunctionResources } from './types'; import { ObservabilityAIAssistantConfig } from '../config'; -import { setupConversationAndKbIndexAssets } from './setup_conversation_and_kb_index_assets'; +import { createOrUpdateIndexAssets } from './create_or_update_index_assets'; function getResourceName(resource: string) { return `.kibana-observability-ai-assistant-${resource}`; @@ -40,11 +40,15 @@ export const resourceNames = { conversations: getResourceName('index-template-conversations'), kb: getResourceName('index-template-kb'), }, + concreteIndexName: { + conversations: getResourceName('conversations-000001'), + kb: getResourceName('kb-000001'), + }, }; const createIndexAssetsOnce = once( (logger: Logger, core: CoreSetup) => - pRetry(() => setupConversationAndKbIndexAssets({ logger, core })) + pRetry(() => createOrUpdateIndexAssets({ logger, core })) ); export class ObservabilityAIAssistantService { diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts index a73be984920c4..aea384b4c0aa6 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -28,6 +28,11 @@ import { import { recallFromSearchConnectors } from './recall_from_search_connectors'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; +import { + isKnowledgeBaseIndexWriteBlocked, + isSemanticTextUnsupportedError, +} from './reindex_knowledge_base'; +import { scheduleKbSemanticTextMigrationTask } from '../task_manager_definitions/register_kb_semantic_text_migration_task'; interface Dependencies { core: CoreSetup; @@ -406,7 +411,9 @@ export class KnowledgeBaseService { } try { - await this.dependencies.esClient.asInternalUser.index({ + await this.dependencies.esClient.asInternalUser.index< + Omit & { namespace: string } + >({ index: resourceNames.aliases.kb, id, document: { @@ -418,10 +425,40 @@ export class KnowledgeBaseService { }, refresh: 'wait_for', }); + this.dependencies.logger.debug(`Entry added to knowledge base`); } catch (error) { + this.dependencies.logger.debug(`Failed to add entry to knowledge base ${error}`); if (isInferenceEndpointMissingOrUnavailable(error)) { throwKnowledgeBaseNotReady(error.body); } + + if (isSemanticTextUnsupportedError(error)) { + this.dependencies.core + .getStartServices() + .then(([_, pluginsStart]) => { + return scheduleKbSemanticTextMigrationTask({ + taskManager: pluginsStart.taskManager, + logger: this.dependencies.logger, + runSoon: true, + }); + }) + .catch((e) => { + this.dependencies.logger.error( + `Failed to schedule knowledge base semantic text migration task: ${e}` + ); + }); + + throw serverUnavailable( + 'The knowledge base is currently being re-indexed. Please try again later' + ); + } + + if (isKnowledgeBaseIndexWriteBlocked(error)) { + throw new Error( + `Writes to the knowledge base are currently blocked due to an Elasticsearch write index block. This is most likely due to an ongoing re-indexing operation. Please try again later. Error: ${error.message}` + ); + } + throw error; } }; diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts new file mode 100644 index 0000000000000..7b65576a1e6da --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { errors as EsErrors } from '@elastic/elasticsearch'; +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { resourceNames } from '..'; +import { createKbConcreteIndex } from '../create_or_update_index_assets'; + +export async function reIndexKnowledgeBase({ + logger, + esClient, +}: { + logger: Logger; + esClient: { + asInternalUser: ElasticsearchClient; + }; +}): Promise { + logger.debug('Initiating knowledge base re-indexing...'); + + try { + const originalIndex = resourceNames.concreteIndexName.kb; + const tempIndex = `${resourceNames.aliases.kb}-000002`; + + const indexSettingsResponse = await esClient.asInternalUser.indices.getSettings({ + index: originalIndex, + }); + + const indexSettings = indexSettingsResponse[originalIndex].settings; + const createdVersion = parseInt(indexSettings?.index?.version?.created ?? '', 10); + + // Check if the index was created before version 8.11 + const versionThreshold = 8110000; // Version 8.11.0 + if (createdVersion >= versionThreshold) { + logger.warn( + `Knowledge base index "${originalIndex}" was created in version ${createdVersion}, and does not require re-indexing. Semantic text field is already supported. Aborting` + ); + return; + } + + logger.info( + `Knowledge base index was created in ${createdVersion} and must be re-indexed in order to support semantic_text field. Re-indexing now...` + ); + + // Create temporary index + logger.debug(`Creating temporary index "${tempIndex}"...`); + await esClient.asInternalUser.indices.delete({ index: tempIndex }, { ignore: [404] }); + await esClient.asInternalUser.indices.create({ index: tempIndex }); + + // Perform reindex to temporary index + logger.debug(`Re-indexing knowledge base to temporary index "${tempIndex}"...`); + await esClient.asInternalUser.reindex({ + body: { + source: { index: originalIndex }, + dest: { index: tempIndex }, + }, + refresh: true, + wait_for_completion: true, + }); + + // Delete and re-create original index + logger.debug(`Deleting original index "${originalIndex}" and re-creating it...`); + await esClient.asInternalUser.indices.delete({ index: originalIndex }); + await createKbConcreteIndex({ logger, esClient }); + + // Perform reindex back to original index + logger.debug(`Re-indexing knowledge base back to original index "${originalIndex}"...`); + await esClient.asInternalUser.reindex({ + body: { + source: { index: tempIndex }, + dest: { index: originalIndex }, + }, + refresh: true, + wait_for_completion: true, + }); + + // Delete temporary index + logger.debug(`Deleting temporary index "${tempIndex}"...`); + await esClient.asInternalUser.indices.delete({ index: tempIndex }); + + logger.info( + 'Re-indexing knowledge base completed successfully. Semantic text field is now supported.' + ); + } catch (error) { + throw new Error(`Failed to reindex knowledge base: ${error.message}`); + } +} + +export function isKnowledgeBaseIndexWriteBlocked(error: any) { + return ( + error instanceof EsErrors.ResponseError && + error.message.includes( + `cluster_block_exception: index [${resourceNames.concreteIndexName.kb}] blocked` + ) + ); +} + +export function isSemanticTextUnsupportedError(error: Error) { + const semanticTextUnsupportedError = + 'The [sparse_vector] field type is not supported on indices created on versions 8.0 to 8.10'; + + const isSemanticTextUnspported = + error instanceof EsErrors.ResponseError && + (error.message.includes(semanticTextUnsupportedError) || + // @ts-expect-error + error.meta?.body?.error?.caused_by?.reason.includes(semanticTextUnsupportedError)); + + return isSemanticTextUnspported; +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts new file mode 100644 index 0000000000000..29dd1418b2818 --- /dev/null +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_kb_semantic_text_migration_task.ts @@ -0,0 +1,228 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import pLimit from 'p-limit'; +import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server'; +import type { CoreSetup, CoreStart, Logger } from '@kbn/core/server'; +import pRetry from 'p-retry'; +import { KnowledgeBaseEntry } from '../../../common'; +import { resourceNames } from '..'; +import { getElserModelStatus } from '../inference_endpoint'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { ObservabilityAIAssistantConfig } from '../../config'; +import { reIndexKnowledgeBase } from '../knowledge_base_service/reindex_knowledge_base'; + +const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id'; +const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration'; + +// This task will re-index all knowledge base entries without `semantic_text` field +// to ensure the field is populated with the correct embeddings. +// After the migration we will no longer need to use the `ml.tokens` field. +export async function registerAndScheduleKbSemanticTextMigrationTask({ + taskManager, + logger, + core, + config, +}: { + taskManager: TaskManagerSetupContract; + logger: Logger; + core: CoreSetup; + config: ObservabilityAIAssistantConfig; +}) { + const [coreStart, pluginsStart] = await core.getStartServices(); + + // register task + registerKbSemanticTextMigrationTask({ taskManager, logger, coreStart, config }); + + // schedule task + await scheduleKbSemanticTextMigrationTask({ taskManager: pluginsStart.taskManager, logger }); +} + +function registerKbSemanticTextMigrationTask({ + taskManager, + logger, + coreStart, + config, +}: { + taskManager: TaskManagerSetupContract; + logger: Logger; + coreStart: CoreStart; + config: ObservabilityAIAssistantConfig; +}) { + try { + logger.debug(`Register task "${TASK_TYPE}"`); + taskManager.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Add support for semantic_text in Knowledge Base', + description: `This task will reindex the knowledge base and populate the semantic_text fields for all entries without it.`, + timeout: '1h', + maxAttempts: 5, + createTaskRunner() { + return { + async run() { + logger.debug(`Run task: "${TASK_TYPE}"`); + const esClient = coreStart.elasticsearch.client; + + const hasKbIndex = await esClient.asInternalUser.indices.exists({ + index: resourceNames.aliases.kb, + }); + + if (!hasKbIndex) { + logger.debug('Knowledge base index does not exist. Skipping migration.'); + return; + } + + if (config.disableKbSemanticTextMigration) { + logger.info( + 'Semantic text migration is disabled via config "xpack.observabilityAIAssistant.disableKbSemanticTextMigration=true". Skipping migration.' + ); + return; + } + + await reIndexKnowledgeBaseAndPopulateSemanticTextField({ esClient, logger, config }); + }, + }; + }, + }, + }); + } catch (error) { + logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`); + } +} + +export async function scheduleKbSemanticTextMigrationTask({ + taskManager, + logger, + runSoon = false, +}: { + taskManager: ObservabilityAIAssistantPluginStartDependencies['taskManager']; + logger: Logger; + runSoon?: boolean; +}) { + logger.debug('Schedule migration task'); + await taskManager.ensureScheduled({ + id: TASK_ID, + taskType: TASK_TYPE, + scope: ['aiAssistant'], + params: {}, + state: {}, + }); + + if (runSoon) { + logger.debug('Run migration task soon'); + await taskManager.runSoon(TASK_ID); + } +} + +export async function reIndexKnowledgeBaseAndPopulateSemanticTextField({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + logger.debug('Starting migration...'); + + try { + await reIndexKnowledgeBase({ logger, esClient }); + await populateSemanticTextFieldRecursively({ esClient, logger, config }); + } catch (e) { + logger.error(`Migration failed: ${e.message}`); + } + + logger.debug('Migration succeeded'); +} + +async function populateSemanticTextFieldRecursively({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + logger.debug('Populating semantic_text field for entries without it'); + + const response = await esClient.asInternalUser.search({ + size: 100, + track_total_hits: true, + index: [resourceNames.aliases.kb], + query: { + bool: { + must_not: { + exists: { + field: 'semantic_text', + }, + }, + }, + }, + _source: { + excludes: ['ml.tokens'], + }, + }); + + if (response.hits.hits.length === 0) { + logger.debug('No remaining entries to migrate'); + return; + } + + logger.debug(`Found ${response.hits.hits.length} entries to migrate`); + + await waitForModel({ esClient, logger, config }); + + // Limit the number of concurrent requests to avoid overloading the cluster + const limiter = pLimit(10); + const promises = response.hits.hits.map((hit) => { + return limiter(() => { + if (!hit._source || !hit._id) { + return; + } + + return esClient.asInternalUser.update({ + refresh: 'wait_for', + index: resourceNames.aliases.kb, + id: hit._id, + body: { + doc: { + ...hit._source, + semantic_text: hit._source.text, + }, + }, + }); + }); + }); + + await Promise.all(promises); + + logger.debug(`Populated ${promises.length} entries`); + await populateSemanticTextFieldRecursively({ esClient, logger, config }); +} + +async function waitForModel({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + return pRetry( + async () => { + const { ready } = await getElserModelStatus({ esClient, logger, config }); + if (!ready) { + logger.debug('Elser model is not yet ready. Retrying...'); + throw new Error('Elser model is not yet ready'); + } + }, + { retries: 30, factor: 2, maxTimeout: 30_000 } + ); +} diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts deleted file mode 100644 index b75074dc7ea54..0000000000000 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import pLimit from 'p-limit'; -import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server'; -import type { CoreSetup, Logger } from '@kbn/core/server'; -import pRetry from 'p-retry'; -import { KnowledgeBaseEntry } from '../../../common'; -import { resourceNames } from '..'; -import { getElserModelStatus } from '../inference_endpoint'; -import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; -import { ObservabilityAIAssistantConfig } from '../../config'; -import { setupConversationAndKbIndexAssets } from '../setup_conversation_and_kb_index_assets'; - -const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id'; -const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration'; - -// This task will re-index all knowledge base entries without `semantic_text` field -// to ensure the field is populated with the correct embeddings. -// After the migration we will no longer need to use the `ml.tokens` field. -export async function registerMigrateKnowledgeBaseEntriesTask({ - taskManager, - logger, - core, - config, -}: { - taskManager: TaskManagerSetupContract; - logger: Logger; - core: CoreSetup; - config: ObservabilityAIAssistantConfig; -}) { - const [coreStart, pluginsStart] = await core.getStartServices(); - - try { - logger.debug(`Register task "${TASK_TYPE}"`); - taskManager.registerTaskDefinitions({ - [TASK_TYPE]: { - title: 'Migrate AI Assistant Knowledge Base', - description: `Migrates AI Assistant knowledge base entries`, - timeout: '1h', - maxAttempts: 5, - createTaskRunner() { - return { - async run() { - logger.debug(`Run task: "${TASK_TYPE}"`); - const esClient = coreStart.elasticsearch.client; - - const hasKbIndex = await esClient.asInternalUser.indices.exists({ - index: resourceNames.aliases.kb, - }); - - if (!hasKbIndex) { - logger.debug( - 'Knowledge base index does not exist. Skipping semantic text migration.' - ); - return; - } - - // update fields and mappings - await setupConversationAndKbIndexAssets({ logger, core }); - - // run migration - await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config }); - }, - }; - }, - }, - }); - } catch (error) { - logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`); - } - - try { - logger.debug(`Scheduled task: "${TASK_TYPE}"`); - await scheduleSemanticTextMigration(pluginsStart); - } catch (error) { - logger.error(`Failed to schedule task "${TASK_TYPE}". Error: ${error}`); - } -} - -export function scheduleSemanticTextMigration( - pluginsStart: ObservabilityAIAssistantPluginStartDependencies -) { - return pluginsStart.taskManager.ensureScheduled({ - id: TASK_ID, - taskType: TASK_TYPE, - scope: ['aiAssistant'], - params: {}, - state: {}, - }); -} - -export async function runSemanticTextKnowledgeBaseMigration({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - logger.debug('Knowledge base migration: Running migration'); - - try { - const response = await esClient.asInternalUser.search({ - size: 100, - track_total_hits: true, - index: [resourceNames.aliases.kb], - query: { - bool: { - must_not: { - exists: { - field: 'semantic_text', - }, - }, - }, - }, - _source: { - excludes: ['ml.tokens'], - }, - }); - - if (response.hits.hits.length === 0) { - logger.debug('Knowledge base migration: No remaining entries to migrate'); - return; - } - - logger.debug(`Knowledge base migration: Found ${response.hits.hits.length} entries to migrate`); - - await waitForModel({ esClient, logger, config }); - - // Limit the number of concurrent requests to avoid overloading the cluster - const limiter = pLimit(10); - const promises = response.hits.hits.map((hit) => { - return limiter(() => { - if (!hit._source || !hit._id) { - return; - } - - return esClient.asInternalUser.update({ - refresh: 'wait_for', - index: resourceNames.aliases.kb, - id: hit._id, - body: { - doc: { - ...hit._source, - semantic_text: hit._source.text, - }, - }, - }); - }); - }); - - await Promise.all(promises); - logger.debug(`Knowledge base migration: Migrated ${promises.length} entries`); - await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config }); - } catch (e) { - logger.error(`Knowledge base migration failed: ${e.message}`); - } -} - -async function waitForModel({ - esClient, - logger, - config, -}: { - esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; - config: ObservabilityAIAssistantConfig; -}) { - return pRetry( - async () => { - const { ready } = await getElserModelStatus({ esClient, logger, config }); - if (!ready) { - logger.debug('Elser model is not yet ready. Retrying...'); - throw new Error('Elser model is not yet ready'); - } - }, - { retries: 30, factor: 2, maxTimeout: 30_000 } - ); -} diff --git a/x-pack/test/api_integration/config.ts b/x-pack/test/api_integration/config.ts index e43c76d42adfa..ff0f9ab4b8f18 100644 --- a/x-pack/test/api_integration/config.ts +++ b/x-pack/test/api_integration/config.ts @@ -38,7 +38,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi serverArgs: [ ...xPackFunctionalTestsConfig.get('esTestCluster.serverArgs'), 'node.attr.name=apiIntegrationTestNode', - 'path.repo=/tmp/repo,/tmp/repo_1,/tmp/repo_2,/tmp/cloud-snapshots/', + `path.repo=/tmp/repo,/tmp/repo_1,/tmp/repo_2,/tmp/cloud-snapshots/`, ], }, }; diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/.gitignore b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/.gitignore new file mode 100644 index 0000000000000..d555c9d94945b --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/.gitignore @@ -0,0 +1,2 @@ +# unzipped snapshot folder +knowledge_base/snapshot_kb_8.10/ \ No newline at end of file diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts index 3f756ecd11247..1d3d41ddb4400 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/index.ts @@ -20,7 +20,10 @@ export default function aiAssistantApiIntegrationTests({ loadTestFile(require.resolve('./complete/functions/summarize.spec.ts')); loadTestFile(require.resolve('./public_complete/public_complete.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_setup.spec.ts')); - loadTestFile(require.resolve('./knowledge_base/knowledge_base_migration.spec.ts')); + loadTestFile( + require.resolve('./knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts') + ); + loadTestFile(require.resolve('./knowledge_base/knowledge_base_reindex.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_status.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base.spec.ts')); loadTestFile(require.resolve('./knowledge_base/knowledge_base_user_instructions.spec.ts')); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_migration.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts similarity index 98% rename from x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_migration.spec.ts rename to x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts index b1d6f82345ca7..3390cd3ef35c4 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_migration.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_add_semantic_text_field_migration.spec.ts @@ -98,7 +98,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon describe('after migrating', () => { before(async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration', + endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', }); expect(status).to.be(200); }); @@ -137,7 +137,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon it('returns entries correctly via API', async () => { const { status } = await observabilityAIAssistantAPIClient.editor({ - endpoint: 'POST /internal/observability_ai_assistant/kb/semantic_text_migration', + endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', }); expect(status).to.be(200); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts new file mode 100644 index 0000000000000..79c6b963a852e --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_reindex.spec.ts @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { resourceNames } from '@kbn/observability-ai-assistant-plugin/server/service'; +import AdmZip from 'adm-zip'; +import path from 'path'; +import { AI_ASSISTANT_SNAPSHOT_REPO_PATH } from '../../../../default_configs/stateful.config.base'; +import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; +import { + deleteKnowledgeBaseModel, + importTinyElserModel, + deleteInferenceEndpoint, + setupKnowledgeBase, + waitForKnowledgeBaseReady, +} from './helpers'; + +export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderContext) { + const observabilityAIAssistantAPIClient = getService('observabilityAIAssistantApi'); + const es = getService('es'); + const ml = getService('ml'); + const retry = getService('retry'); + const log = getService('log'); + + describe('when the knowledge base index was created before 8.11', function () { + // Intentionally skipped in all serverless environnments (local and MKI) + // because the migration scenario being tested is not relevant to MKI and Serverless. + this.tags(['skipServerless']); + + before(async () => { + const zipFilePath = `${AI_ASSISTANT_SNAPSHOT_REPO_PATH}.zip`; + log.debug(`Unzipping ${zipFilePath} to ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}`); + new AdmZip(zipFilePath).extractAllTo(path.dirname(AI_ASSISTANT_SNAPSHOT_REPO_PATH), true); + + await importTinyElserModel(ml); + await setupKnowledgeBase(observabilityAIAssistantAPIClient); + await waitForKnowledgeBaseReady({ observabilityAIAssistantAPIClient, log, retry }); + }); + + beforeEach(async () => { + await deleteKbIndex(); + await restoreKbSnapshot(); + await createOrUpdateIndexAssets(); + }); + + after(async () => { + await deleteKbIndex(); + await createOrUpdateIndexAssets(); + await deleteKnowledgeBaseModel(ml); + await deleteInferenceEndpoint({ es }); + }); + + it('has an index created version earlier than 8.11', async () => { + await retry.try(async () => { + expect(await getKbIndexCreatedVersion()).to.be.lessThan(8110000); + }); + }); + + function createKnowledgeBaseEntry() { + const knowledgeBaseEntry = { + id: 'my-doc-id-1', + title: 'My title', + text: 'My content', + }; + + return observabilityAIAssistantAPIClient.editor({ + endpoint: 'POST /internal/observability_ai_assistant/kb/entries/save', + params: { body: knowledgeBaseEntry }, + }); + } + + it('cannot add new entries to KB', async () => { + const { status, body } = await createKnowledgeBaseEntry(); + + // @ts-expect-error + expect(body.message).to.eql( + 'The knowledge base is currently being re-indexed. Please try again later' + ); + + expect(status).to.be(503); + }); + + it('can add new entries after re-indexing', async () => { + await runKbSemanticTextMigration(); + + await retry.try(async () => { + const { status } = await createKnowledgeBaseEntry(); + expect(status).to.be(200); + }); + }); + }); + + async function getKbIndexCreatedVersion() { + const indexSettings = await es.indices.getSettings({ + index: resourceNames.concreteIndexName.kb, + }); + + const { settings } = Object.values(indexSettings)[0]; + return parseInt(settings?.index?.version?.created ?? '', 10); + } + + async function deleteKbIndex() { + log.debug('Deleting KB index'); + + await es.indices.delete( + { index: resourceNames.concreteIndexName.kb, ignore_unavailable: true }, + { ignore: [404] } + ); + } + + async function restoreKbSnapshot() { + log.debug( + `Restoring snapshot of ${resourceNames.concreteIndexName.kb} from ${AI_ASSISTANT_SNAPSHOT_REPO_PATH}` + ); + const snapshotRepoName = 'snapshot-repo-8-10'; + const snapshotName = 'my_snapshot'; + await es.snapshot.createRepository({ + name: snapshotRepoName, + repository: { + type: 'fs', + settings: { location: AI_ASSISTANT_SNAPSHOT_REPO_PATH }, + }, + }); + + await es.snapshot.restore({ + repository: snapshotRepoName, + snapshot: snapshotName, + wait_for_completion: true, + body: { + indices: resourceNames.concreteIndexName.kb, + }, + }); + + await es.snapshot.deleteRepository({ name: snapshotRepoName }); + } + + async function createOrUpdateIndexAssets() { + const { status } = await observabilityAIAssistantAPIClient.editor({ + endpoint: 'POST /internal/observability_ai_assistant/index_assets', + }); + expect(status).to.be(200); + } + + async function runKbSemanticTextMigration() { + const { status } = await observabilityAIAssistantAPIClient.editor({ + endpoint: 'POST /internal/observability_ai_assistant/kb/migrations/kb_semantic_text', + }); + expect(status).to.be(200); + } +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/snapshot_kb_8.10.zip b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/snapshot_kb_8.10.zip new file mode 100644 index 0000000000000..0e65dd1848246 Binary files /dev/null and b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/snapshot_kb_8.10.zip differ diff --git a/x-pack/test/api_integration/deployment_agnostic/default_configs/stateful.config.base.ts b/x-pack/test/api_integration/deployment_agnostic/default_configs/stateful.config.base.ts index 1668c3d634671..5b65152aea809 100644 --- a/x-pack/test/api_integration/deployment_agnostic/default_configs/stateful.config.base.ts +++ b/x-pack/test/api_integration/deployment_agnostic/default_configs/stateful.config.base.ts @@ -34,6 +34,11 @@ interface CreateTestConfigOptions { suiteTags?: { include?: string[]; exclude?: string[] }; } +export const AI_ASSISTANT_SNAPSHOT_REPO_PATH = path.resolve( + REPO_ROOT, + 'x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/snapshot_kb_8.10' +); + export function createStatefulTestConfig( options: CreateTestConfigOptions ) { @@ -121,6 +126,7 @@ export function createStatefulTestConfig