diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts index 9a6b3f89f7f69..b78dd82cc48e0 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/routes/knowledge_base/route.ts @@ -177,7 +177,7 @@ const saveKnowledgeBaseUserInstruction = createObservabilityAIAssistantServerRou const { id, text, public: isPublic } = resources.params.body; return client.addUserInstruction({ - entry: { id, text, public: isPublic }, + entry: { id, text, public: isPublic, title: `User instruction` }, }); }, }); diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts index 7d65a2c1857f6..6a9bb82c21587 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/client/index.ts @@ -767,7 +767,7 @@ export class ObservabilityAIAssistantClient { }): Promise => { // for now we want to limit the number of user instructions to 1 per user // if a user instruction already exists for the user, we get the id and update it - this.dependencies.logger.debug('Adding user instruction entry'); + const existingId = await this.dependencies.knowledgeBaseService.getPersonalUserInstructionId({ isPublic: entry.public, namespace: this.dependencies.namespace, @@ -775,8 +775,14 @@ export class ObservabilityAIAssistantClient { }); if (existingId) { + this.dependencies.logger.debug( + `Updating user instruction. id = "${existingId}", user = "${this.dependencies.user?.name}"` + ); entry.id = existingId; - this.dependencies.logger.debug(`Updating user instruction with id "${existingId}"`); + } else { + this.dependencies.logger.debug( + `Creating user instruction. id = "${entry.id}", user = "${this.dependencies.user?.name}"` + ); } return this.dependencies.knowledgeBaseService.addEntry({ diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts index 0552f925fb949..100e4782e4cbd 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -431,7 +431,9 @@ export class KnowledgeBaseService { refresh: 'wait_for', }); - this.dependencies.logger.debug(`Entry added to knowledge base`); + this.dependencies.logger.debug( + `Entry added to knowledge base. title = "${doc.title}", user = "${user?.name}, namespace = "${namespace}"` + ); } catch (error) { this.dependencies.logger.error(`Failed to add entry to knowledge base ${error}`); if (isInferenceEndpointMissingOrUnavailable(error)) { diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts index d9fca4130d735..58b7b98d1e31d 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/knowledge_base_service/reindex_knowledge_base.ts @@ -193,8 +193,5 @@ export async function isReIndexInProgress({ getActiveReindexingTaskId(esClient), ]); - logger.debug(`Lock: ${!!lock}`); - logger.debug(`ES re-indexing task: ${!!activeReindexingTask}`); - return lock !== undefined || activeReindexingTask !== undefined; } diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_fields.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_fields.ts index 119fbf83e72ea..b29d61f01a87f 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_fields.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/populate_missing_semantic_text_fields.ts @@ -6,16 +6,13 @@ */ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import pLimit from 'p-limit'; import type { CoreSetup, Logger } from '@kbn/core/server'; -import { uniq } from 'lodash'; import { LockManagerService } from '@kbn/lock-manager'; -import { KnowledgeBaseEntry } from '../../../common'; +import pRetry from 'p-retry'; import { resourceNames } from '..'; import { waitForKbModel } from '../inference_endpoint'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; import { ObservabilityAIAssistantConfig } from '../../config'; -import { sleep } from '../util/sleep'; import { getInferenceIdFromWriteIndex } from '../knowledge_base_service/get_inference_id_from_write_index'; const POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID = 'populate_missing_semantic_text_fields'; @@ -32,13 +29,12 @@ export async function populateMissingSemanticTextFieldWithLock({ }) { const lmService = new LockManagerService(core, logger); await lmService.withLock(POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID, async () => - populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config }) + populateMissingSemanticTextField({ core, esClient, logger, config }) ); } // Ensures that every doc has populated the `semantic_text` field. -// It retrieves entries without the field, updates them in batches, and continues until no entries remain. -async function populateMissingSemanticTextFieldRecursively({ +async function populateMissingSemanticTextField({ core, esClient, logger, @@ -49,64 +45,30 @@ async function populateMissingSemanticTextFieldRecursively({ logger: Logger; config: ObservabilityAIAssistantConfig; }) { - logger.debug( - 'Checking for remaining entries without semantic_text field that need to be migrated' - ); - - const response = await esClient.asInternalUser.search({ - size: 100, - track_total_hits: true, - index: [resourceNames.writeIndexAlias.kb], - query: { - bool: { - must_not: { - exists: { - field: 'semantic_text', - }, - }, - }, - }, - _source: { - excludes: ['ml.tokens'], - }, - }); + logger.debug('Initalizing semantic text migration for knowledge base entries...'); - if (response.hits.hits.length === 0) { - logger.debug('No remaining entries to migrate'); - return; - } + await pRetry( + async () => { + const inferenceId = await getInferenceIdFromWriteIndex(esClient); + await waitForKbModel({ core, esClient, logger, config, inferenceId }); - const inferenceId = await getInferenceIdFromWriteIndex(esClient); - await waitForKbModel({ core, esClient, logger, config, inferenceId }); - - const indicesWithOutdatedEntries = uniq(response.hits.hits.map((hit) => hit._index)); - logger.debug( - `Found ${response.hits.hits.length} entries without semantic_text field in "${indicesWithOutdatedEntries}". Updating now...` - ); - - // Limit the number of concurrent requests to avoid overloading the cluster - const limiter = pLimit(20); - const promises = response.hits.hits.map((hit) => { - return limiter(() => { - if (!hit._source || !hit._id) { - return; - } - - return esClient.asInternalUser.update({ - refresh: 'wait_for', + await esClient.asInternalUser.updateByQuery({ index: resourceNames.writeIndexAlias.kb, - id: hit._id, - doc: { - ...hit._source, - semantic_text: hit._source.text ?? 'No text', + requests_per_second: 100, + script: { + source: `ctx._source.semantic_text = ctx._source.text`, + lang: 'painless', + }, + query: { + bool: { + filter: { exists: { field: 'text' } }, + must_not: { exists: { field: 'semantic_text' } }, + }, }, }); - }); - }); - - await Promise.all(promises); - logger.debug(`Updated ${promises.length} entries`); + }, + { retries: 10, minTimeout: 10_000 } + ); - await sleep(100); - await populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config }); + logger.debug('Semantic text migration completed successfully.'); } diff --git a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/run_startup_migrations.ts b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/run_startup_migrations.ts index 201ae87e4722d..0005827b80c20 100644 --- a/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/run_startup_migrations.ts +++ b/x-pack/platform/plugins/shared/observability_ai_assistant/server/service/startup_migrations/run_startup_migrations.ts @@ -7,7 +7,6 @@ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { CoreSetup, Logger } from '@kbn/core/server'; -import pRetry from 'p-retry'; import { errors } from '@elastic/elasticsearch'; import { LockManagerService, isLockAcquisitionError } from '@kbn/lock-manager'; import { resourceNames } from '..'; @@ -58,21 +57,7 @@ export async function runStartupMigrations({ await reIndexKnowledgeBaseWithLock({ core, logger, esClient }); } - await pRetry( - async () => populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient }), - { - retries: 5, - minTimeout: 10_000, - onFailedAttempt: async (error) => { - // if the error is a LockAcquisitionError the operation is already in progress and we should not retry - // for other errors we should retry - // throwing the error will cause pRetry to abort all retries - if (isLockAcquisitionError(error)) { - throw error; - } - }, - } - ); + await populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient }); }) .catch((error) => { // we should propogate the error if it is not a LockAcquisitionError diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_8.16_upgrade_test.spec.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_8.16_upgrade_test.spec.ts index ccc827383f420..3e7185de96373 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_8.16_upgrade_test.spec.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/knowledge_base/knowledge_base_8.16_upgrade_test.spec.ts @@ -5,13 +5,20 @@ * 2.0. */ +import { v4 as uuidV4 } from 'uuid'; import expect from '@kbn/expect'; -import { KnowledgeBaseEntry } from '@kbn/observability-ai-assistant-plugin/common'; +import { + KnowledgeBaseEntry, + KnowledgeBaseEntryRole, +} from '@kbn/observability-ai-assistant-plugin/common'; import { sortBy } from 'lodash'; +import { resourceNames } from '@kbn/observability-ai-assistant-plugin/server/service'; +import { ElasticsearchClient } from '@kbn/core/server'; import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context'; import { getKnowledgeBaseEntriesFromEs, getKnowledgeBaseEntriesFromApi, + refreshKbIndex, } from '../utils/knowledge_base'; import { createOrUpdateIndexAssets, @@ -39,88 +46,114 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon // because the migration scenario being tested is not relevant to MKI and Serverless. this.tags(['skipServerless']); - before(async () => { - await teardownTinyElserModelAndInferenceEndpoint(getService); - await deleteIndexAssets(getService); - await restoreKbSnapshot({ - log, - es, - snapshotFolderName: 'snapshot_kb_8.16', - snapshotName: 'kb_snapshot_8.16', + describe('using a snapshot', () => { + before(async () => { + await teardownTinyElserModelAndInferenceEndpoint(getService); + await deleteIndexAssets(getService); + await restoreKbSnapshot({ + log, + es, + snapshotFolderName: 'snapshot_kb_8.16', + snapshotName: 'kb_snapshot_8.16', + }); + + await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + await deployTinyElserAndSetupKb(getService); }); - await createOrUpdateIndexAssets(observabilityAIAssistantAPIClient); + after(async () => { + await teardownTinyElserModelAndInferenceEndpoint(getService); + await restoreIndexAssets(getService); + }); - await deployTinyElserAndSetupKb(getService); - }); + describe('before migrating', () => { + it('the docs do not have semantic_text embeddings', async () => { + const hits = await getKnowledgeBaseEntriesFromEs(es); + const hasSemanticTextEmbeddings = hits.some((hit) => hit._source?.semantic_text); - after(async () => { - await teardownTinyElserModelAndInferenceEndpoint(getService); - await restoreIndexAssets(getService); - }); + expect(hits.length).to.be(60); + expect(hasSemanticTextEmbeddings).to.be(false); + }); + }); + + describe('after migrating', () => { + before(async () => { + await runStartupMigrations(observabilityAIAssistantAPIClient); + }); + + it('the docs have semantic_text field', async () => { + await retry.try(async () => { + const hits = await getKnowledgeBaseEntriesFromEs(es); + const hasSemanticTextField = hits.every((hit) => hit._source?.semantic_text); - describe('before migrating', () => { - it('the docs do not have semantic_text embeddings', async () => { - const hits = await getKnowledgeBaseEntriesFromEs(es); - const hasSemanticTextEmbeddings = hits.some((hit) => hit._source?.semantic_text); + expect(hits.length).to.be(60); + expect(hasSemanticTextField).to.be(true); + }); + }); - expect(hits.length).to.be(60); - expect(hasSemanticTextEmbeddings).to.be(false); + it('the docs have embeddings', async () => { + await retry.try(async () => { + const hits = await getKnowledgeBaseEntriesFromEs(es); + + const everyEntryHasEmbeddings = hits.every( + (hit) => + // @ts-expect-error + Object.keys(hit._source?.semantic_text.inference.chunks[0].embeddings).length > 0 + ); + expect(hits.length).to.be(60); + expect(everyEntryHasEmbeddings).to.be(true); + }); + }); + + it('returns entries correctly via API', async () => { + const res = await getKnowledgeBaseEntriesFromApi({ observabilityAIAssistantAPIClient }); + expect(res.status).to.be(200); + + expect( + sortBy( + res.body.entries + .filter(omitLensEntry) + .map(({ title, text, type }) => ({ title, text, type })), + ({ title }) => title + ) + ).to.eql([ + { title: 'movie_quote', type: 'contextual', text: 'To infinity and beyond!' }, + { + title: 'user_color', + type: 'contextual', + text: "The user's favourite color is blue.", + }, + ]); + }); }); }); - describe('after migrating', () => { + describe('manually created entries', () => { before(async () => { + await restoreIndexAssets(getService); + await deployTinyElserAndSetupKb(getService); + + // index sample documents + await Promise.all([ + addEntryWithoutSemanticText({ es, title: 'user_color', text: 'Red' }), + addEntryWithoutSemanticText({ es, title: 'user_nickname', text: 'Peter Parker' }), + addEntryWithoutSemanticText({ es, title: 'empty_text_doc', text: '' }), // important to test for empty text: https://github.com/elastic/kibana/issues/220339 + ]); + await refreshKbIndex(es); await runStartupMigrations(observabilityAIAssistantAPIClient); }); - it('the docs have semantic_text field', async () => { - await retry.try(async () => { - const hits = await getKnowledgeBaseEntriesFromEs(es); - const hasSemanticTextField = hits.every((hit) => hit._source?.semantic_text); - - expect(hits.length).to.be(60); - expect(hasSemanticTextField).to.be(true); - }); + after(async () => { + await teardownTinyElserModelAndInferenceEndpoint(getService); }); - it('the docs have embeddings', async () => { + it('should migrate entries with `text` and ignore entries without `text`', async () => { await retry.try(async () => { const hits = await getKnowledgeBaseEntriesFromEs(es); - const hasEmbeddings = hits.every( - (hit) => - // @ts-expect-error - Object.keys(hit._source?.semantic_text.inference.chunks[0].embeddings).length > 0 - ); - expect(hits.length).to.be(60); - expect(hasEmbeddings).to.be(true); + expect(hits.filter((hit) => hit._source?.semantic_text)).to.have.length(2); + expect(hits.length).to.be(3); }); }); - - it('returns entries correctly via API', async () => { - const res = await getKnowledgeBaseEntriesFromApi({ observabilityAIAssistantAPIClient }); - expect(res.status).to.be(200); - - expect( - sortBy( - res.body.entries - .filter(omitLensEntry) - .map(({ title, text, type }) => ({ title, text, type })), - ({ title }) => title - ) - ).to.eql([ - { - title: 'movie_quote', - type: 'contextual', - text: 'To infinity and beyond!', - }, - { - title: 'user_color', - type: 'contextual', - text: "The user's favourite color is blue.", - }, - ]); - }); }); }); } @@ -128,3 +161,28 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon function omitLensEntry(entry?: KnowledgeBaseEntry) { return entry?.labels?.category !== 'lens'; } + +async function addEntryWithoutSemanticText({ + es, + title, + text, +}: { + es: ElasticsearchClient; + title: string; + text: string; +}) { + await es.index({ + index: resourceNames.writeIndexAlias.kb, + document: { + id: uuidV4(), + title, + text, + confidence: 'high', + is_correction: false, + type: 'contextual', + public: false, + role: KnowledgeBaseEntryRole.UserEntry, + '@timestamp': new Date().toISOString(), + }, + }); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/knowledge_base.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/knowledge_base.ts index 3e47f56e35464..66a59bc93bc57 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/knowledge_base.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/ai_assistant/utils/knowledge_base.ts @@ -87,8 +87,12 @@ export async function addSampleDocsToInternalKb( }, }); - // refresh the index to make sure the documents are searchable - await es.indices.refresh({ index: resourceNames.indexPatterns.kb }); + await refreshKbIndex(es); +} + +// refresh the index to make sure the documents are searchable +export function refreshKbIndex(es: Client) { + return es.indices.refresh({ index: resourceNames.indexPatterns.kb }); } export async function addSampleDocsToCustomIndex(