Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ const saveKnowledgeBaseUserInstruction = createObservabilityAIAssistantServerRou

const { id, text, public: isPublic } = resources.params.body;
return client.addUserInstruction({
entry: { id, text, public: isPublic },
entry: { id, text, public: isPublic, title: `User instruction` },
});
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,16 +767,22 @@ export class ObservabilityAIAssistantClient {
}): Promise<void> => {
// for now we want to limit the number of user instructions to 1 per user
// if a user instruction already exists for the user, we get the id and update it
this.dependencies.logger.debug('Adding user instruction entry');

const existingId = await this.dependencies.knowledgeBaseService.getPersonalUserInstructionId({
isPublic: entry.public,
namespace: this.dependencies.namespace,
user: this.dependencies.user,
});

if (existingId) {
this.dependencies.logger.debug(
`Updating user instruction. id = "${existingId}", user = "${this.dependencies.user?.name}"`
);
entry.id = existingId;
this.dependencies.logger.debug(`Updating user instruction with id "${existingId}"`);
} else {
this.dependencies.logger.debug(
`Creating user instruction. id = "${entry.id}", user = "${this.dependencies.user?.name}"`
);
}

return this.dependencies.knowledgeBaseService.addEntry({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ export class KnowledgeBaseService {
refresh: 'wait_for',
});

this.dependencies.logger.debug(`Entry added to knowledge base`);
this.dependencies.logger.debug(
`Entry added to knowledge base. title = "${doc.title}", user = "${user?.name}, namespace = "${namespace}"`
);
} catch (error) {
this.dependencies.logger.error(`Failed to add entry to knowledge base ${error}`);
if (isInferenceEndpointMissingOrUnavailable(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,5 @@ export async function isReIndexInProgress({
getActiveReindexingTaskId(esClient),
]);

logger.debug(`Lock: ${!!lock}`);
logger.debug(`ES re-indexing task: ${!!activeReindexingTask}`);

return lock !== undefined || activeReindexingTask !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
*/

import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import pLimit from 'p-limit';
import type { CoreSetup, Logger } from '@kbn/core/server';
import { uniq } from 'lodash';
import { LockManagerService } from '@kbn/lock-manager';
import { KnowledgeBaseEntry } from '../../../common';
import pRetry from 'p-retry';
import { resourceNames } from '..';
import { waitForKbModel } from '../inference_endpoint';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';
import { sleep } from '../util/sleep';
import { getInferenceIdFromWriteIndex } from '../knowledge_base_service/get_inference_id_from_write_index';

const POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID = 'populate_missing_semantic_text_fields';
Expand All @@ -32,13 +29,12 @@ export async function populateMissingSemanticTextFieldWithLock({
}) {
const lmService = new LockManagerService(core, logger);
await lmService.withLock(POPULATE_MISSING_SEMANTIC_TEXT_FIELDS_LOCK_ID, async () =>
populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config })
populateMissingSemanticTextField({ core, esClient, logger, config })
);
}

// Ensures that every doc has populated the `semantic_text` field.
// It retrieves entries without the field, updates them in batches, and continues until no entries remain.
async function populateMissingSemanticTextFieldRecursively({
async function populateMissingSemanticTextField({
core,
esClient,
logger,
Expand All @@ -49,64 +45,30 @@ async function populateMissingSemanticTextFieldRecursively({
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
logger.debug(
'Checking for remaining entries without semantic_text field that need to be migrated'
);

const response = await esClient.asInternalUser.search<KnowledgeBaseEntry>({
size: 100,
track_total_hits: true,
index: [resourceNames.writeIndexAlias.kb],
query: {
bool: {
must_not: {
exists: {
field: 'semantic_text',
},
},
},
},
_source: {
excludes: ['ml.tokens'],
},
});
logger.debug('Initalizing semantic text migration for knowledge base entries...');

if (response.hits.hits.length === 0) {
logger.debug('No remaining entries to migrate');
return;
}
await pRetry(
async () => {
const inferenceId = await getInferenceIdFromWriteIndex(esClient);
await waitForKbModel({ core, esClient, logger, config, inferenceId });

const inferenceId = await getInferenceIdFromWriteIndex(esClient);
await waitForKbModel({ core, esClient, logger, config, inferenceId });

const indicesWithOutdatedEntries = uniq(response.hits.hits.map((hit) => hit._index));
logger.debug(
`Found ${response.hits.hits.length} entries without semantic_text field in "${indicesWithOutdatedEntries}". Updating now...`
);

// Limit the number of concurrent requests to avoid overloading the cluster
const limiter = pLimit(20);
const promises = response.hits.hits.map((hit) => {
return limiter(() => {
if (!hit._source || !hit._id) {
return;
}

return esClient.asInternalUser.update({
refresh: 'wait_for',
await esClient.asInternalUser.updateByQuery({
index: resourceNames.writeIndexAlias.kb,
id: hit._id,
doc: {
...hit._source,
semantic_text: hit._source.text ?? 'No text',
requests_per_second: 100,
script: {
source: `ctx._source.semantic_text = ctx._source.text`,
lang: 'painless',
},
query: {
bool: {
filter: { exists: { field: 'text' } },
must_not: { exists: { field: 'semantic_text' } },
},
},
});
});
});

await Promise.all(promises);
logger.debug(`Updated ${promises.length} entries`);
},
{ retries: 10, minTimeout: 10_000 }
);

await sleep(100);
await populateMissingSemanticTextFieldRecursively({ core, esClient, logger, config });
logger.debug('Semantic text migration completed successfully.');
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { CoreSetup, Logger } from '@kbn/core/server';
import pRetry from 'p-retry';
import { errors } from '@elastic/elasticsearch';
import { LockManagerService, isLockAcquisitionError } from '@kbn/lock-manager';
import { resourceNames } from '..';
Expand Down Expand Up @@ -58,21 +57,7 @@ export async function runStartupMigrations({
await reIndexKnowledgeBaseWithLock({ core, logger, esClient });
}

await pRetry(
async () => populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient }),
{
retries: 5,
minTimeout: 10_000,
onFailedAttempt: async (error) => {
// if the error is a LockAcquisitionError the operation is already in progress and we should not retry
// for other errors we should retry
// throwing the error will cause pRetry to abort all retries
if (isLockAcquisitionError(error)) {
throw error;
}
},
}
);
await populateMissingSemanticTextFieldWithLock({ core, logger, config, esClient });
})
.catch((error) => {
// we should propogate the error if it is not a LockAcquisitionError
Expand Down
Loading