Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ const saveKnowledgeBaseUserInstruction = createObservabilityAIAssistantServerRou

const { id, text, public: isPublic } = resources.params.body;
return client.addUserInstruction({
entry: { id, text, public: isPublic },
entry: { id, text, public: isPublic, title: `User instruction` },
});
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,16 +678,22 @@ export class ObservabilityAIAssistantClient {
}): Promise<void> => {
// for now we want to limit the number of user instructions to 1 per user
// if a user instruction already exists for the user, we get the id and update it
this.dependencies.logger.debug('Adding user instruction entry');

const existingId = await this.dependencies.knowledgeBaseService.getPersonalUserInstructionId({
isPublic: entry.public,
namespace: this.dependencies.namespace,
user: this.dependencies.user,
});

if (existingId) {
this.dependencies.logger.debug(
`Updating user instruction. id = "${existingId}", user = "${this.dependencies.user?.name}"`
);
entry.id = existingId;
this.dependencies.logger.debug(`Updating user instruction with id "${existingId}"`);
} else {
this.dependencies.logger.debug(
`Creating user instruction. id = "${entry.id}", user = "${this.dependencies.user?.name}"`
);
}

return this.dependencies.knowledgeBaseService.addEntry({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ export class KnowledgeBaseService {
refresh: 'wait_for',
});

this.dependencies.logger.debug(`Entry added to knowledge base`);
this.dependencies.logger.debug(
`Entry added to knowledge base. title = "${doc.title}", user = "${user?.name}, namespace = "${namespace}"`
);
} catch (error) {
this.dependencies.logger.debug(`Failed to add entry to knowledge base ${error}`);
if (isInferenceEndpointMissingOrUnavailable(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
*/

import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import pLimit from 'p-limit';
import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import type { CoreSetup, CoreStart, Logger } from '@kbn/core/server';
import pRetry from 'p-retry';
import { KnowledgeBaseEntry } from '../../../common';
import { resourceNames } from '..';
import { getElserModelStatus } from '../inference_endpoint';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
Expand Down Expand Up @@ -154,61 +152,30 @@ async function populateSemanticTextFieldRecursively({
logger: Logger;
config: ObservabilityAIAssistantConfig;
}) {
logger.debug('Populating semantic_text field for entries without it');

const response = await esClient.asInternalUser.search<KnowledgeBaseEntry>({
size: 100,
track_total_hits: true,
index: [resourceNames.aliases.kb],
query: {
bool: {
must_not: {
exists: {
field: 'semantic_text',
},
},
},
},
_source: {
excludes: ['ml.tokens'],
},
});

if (response.hits.hits.length === 0) {
logger.debug('No remaining entries to migrate');
return;
}

logger.debug(`Found ${response.hits.hits.length} entries to migrate`);

await waitForModel({ esClient, logger, config });
logger.debug('Initalizing semantic text migration for knowledge base entries...');

// Limit the number of concurrent requests to avoid overloading the cluster
const limiter = pLimit(10);
const promises = response.hits.hits.map((hit) => {
return limiter(() => {
if (!hit._source || !hit._id) {
return;
}

return esClient.asInternalUser.update({
refresh: 'wait_for',
index: resourceNames.aliases.kb,
id: hit._id,
body: {
doc: {
...hit._source,
semantic_text: hit._source.text,
await pRetry(
async () => {
await waitForModel({ esClient, logger, config });
await esClient.asInternalUser.updateByQuery({
index: [resourceNames.aliases.kb],
requests_per_second: 100,
refresh: true,
script: {
source: `ctx._source.semantic_text = ctx._source.text`,
lang: 'painless',
},
query: {
bool: {
filter: { exists: { field: 'text' } },
must_not: { exists: { field: 'semantic_text' } },
},
},
});
});
});

await Promise.all(promises);

logger.debug(`Populated ${promises.length} entries`);
await populateSemanticTextFieldRecursively({ esClient, logger, config });
},
{ retries: 10, minTimeout: 10_000 }
);
logger.debug('Semantic text migration completed successfully.');
}

async function waitForModel({
Expand Down