diff --git a/package.json b/package.json index 4ae63c62f869a..da593458d42bd 100644 --- a/package.json +++ b/package.json @@ -1299,6 +1299,7 @@ "papaparse": "^5.5.3", "pbf": "3.2.1", "pdfmake": "^0.2.15", + "piscina": "^3.2.0", "polished": "^4.3.1", "pretty-ms": "6.0.0", "prop-types": "^15.8.1", @@ -1922,7 +1923,6 @@ "peggy": "^4.2.0", "picomatch": "^4.0.2", "pirates": "^4.0.7", - "piscina": "^3.2.0", "pixelmatch": "^5.3.0", "playwright": "1.53.1", "pngjs": "^7.0.0", diff --git a/x-pack/platform/packages/shared/ai-infra/inference-common/index.ts b/x-pack/platform/packages/shared/ai-infra/inference-common/index.ts index 9aa685c9f4e3c..9472d2d703e5e 100644 --- a/x-pack/platform/packages/shared/ai-infra/inference-common/index.ts +++ b/x-pack/platform/packages/shared/ai-infra/inference-common/index.ts @@ -70,6 +70,7 @@ export { type DeanonymizationOutput, type DeanonymizedMessage, type AnonymizationSettings, + type AnonymizationRegexWorkerTaskPayload, } from './src/chat_complete'; export type { BoundInferenceClient, InferenceClient } from './src/inference_client'; diff --git a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/index.ts b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/index.ts index 8ddcc5ea53d60..bd5764ee29795 100644 --- a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/index.ts +++ b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/index.ts @@ -16,4 +16,5 @@ export type { RegexAnonymizationRule, NamedEntityRecognitionRule, AnonymizationSettings, + AnonymizationRegexWorkerTaskPayload, } from './types'; diff --git a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/types.ts b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/types.ts index 38181e9df9700..a5992018526f0 100644 --- a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/types.ts +++ b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/anonymization/types.ts @@ -63,3 +63,7 @@ export interface DeanonymizationOutput { } export type DeanonymizedMessage = Message & { deanonymizations: Deanonymization[] }; +export interface AnonymizationRegexWorkerTaskPayload { + rule: RegexAnonymizationRule; + records: Array>; +} diff --git a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/index.ts b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/index.ts index 6d25da01f9a8c..674019e0c0dce 100644 --- a/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/index.ts +++ b/x-pack/platform/packages/shared/ai-infra/inference-common/src/chat_complete/index.ts @@ -79,4 +79,5 @@ export type { RegexAnonymizationRule, NamedEntityRecognitionRule, AnonymizationSettings, + AnonymizationRegexWorkerTaskPayload, } from './anonymization'; diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.test.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.test.ts index 9b27caf3f5d26..ccba05adaab91 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.test.ts @@ -6,6 +6,7 @@ */ import { MlInferenceResponseResult } from '@elastic/elasticsearch/lib/api/types'; +import { loggerMock, type MockedLogger } from '@kbn/logging-mocks'; import { anonymizeMessages } from './anonymize_messages'; import { AnonymizationRule, @@ -16,16 +17,23 @@ import { } from '@kbn/inference-common'; import { messageToAnonymizationRecords } from './message_to_anonymization_records'; import { getEntityMask } from './get_entity_mask'; - +import { RegexWorkerService } from './regex_worker_service'; +import { AnonymizationWorkerConfig } from '../../config'; const mockEsClient = { ml: { inferTrainedModel: jest.fn(), }, } as any; - +const testConfig = { + enabled: false, +} as AnonymizationWorkerConfig; describe('anonymizeMessages', () => { + let logger: MockedLogger; + let regexWorker: RegexWorkerService; beforeEach(() => { jest.resetAllMocks(); + logger = loggerMock.create(); + regexWorker = new RegexWorkerService(testConfig, logger); }); const setupMockResponse = (entities: MlInferenceResponseResult[]) => { @@ -88,6 +96,7 @@ describe('anonymizeMessages', () => { const result = await anonymizeMessages({ messages, anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }); @@ -126,6 +135,7 @@ describe('anonymizeMessages', () => { anonymizeMessages({ messages, anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }) ).resolves.toBeDefined(); @@ -137,6 +147,7 @@ describe('anonymizeMessages', () => { const result = await anonymizeMessages({ messages, anonymizationRules: [disabledRule], + regexWorker, esClient: mockEsClient, }); @@ -154,6 +165,7 @@ describe('anonymizeMessages', () => { const result = await anonymizeMessages({ messages, anonymizationRules: [disabledRule], + regexWorker, esClient: mockEsClient, }); @@ -181,6 +193,7 @@ describe('anonymizeMessages', () => { const result = await anonymizeMessages({ messages, anonymizationRules: [regexRule], + regexWorker, esClient: mockEsClient, }); @@ -251,6 +264,7 @@ describe('anonymizeMessages', () => { const result = await anonymizeMessages({ messages, + regexWorker, anonymizationRules: [nerRule], esClient: mockEsClient, }); @@ -295,6 +309,7 @@ describe('anonymizeMessages', () => { system: systemPrompt, messages: [], anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }); expect(result.system).toBe( @@ -349,6 +364,7 @@ describe('anonymizeMessages', () => { }, ], anonymizationRules: [nerRule], // nerRule allows only PER + regexWorker, esClient: mockEsClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.ts index 1270413245471..ab717e62c014e 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_messages.ts @@ -11,16 +11,19 @@ import { merge } from 'lodash'; import { anonymizeRecords } from './anonymize_records'; import { messageFromAnonymizationRecords } from './message_from_anonymization_records'; import { messageToAnonymizationRecords } from './message_to_anonymization_records'; +import { RegexWorkerService } from './regex_worker_service'; export async function anonymizeMessages({ system, messages, anonymizationRules, + regexWorker, esClient, }: { system?: string | undefined; messages: Message[]; anonymizationRules: AnonymizationRule[]; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; }): Promise { const rules = anonymizationRules.filter((rule) => rule.enabled); @@ -41,6 +44,7 @@ export async function anonymizeMessages({ const { records, anonymizations } = await anonymizeRecords({ input: toAnonymize, anonymizationRules: rules, + regexWorker, esClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.test.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.test.ts index 0db032fc07644..42debaece5470 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.test.ts @@ -8,7 +8,9 @@ import { anonymizeRecords } from './anonymize_records'; import { AnonymizationRule } from '@kbn/inference-common'; import { MlInferenceResponseResult } from '@elastic/elasticsearch/lib/api/types'; - +import { loggerMock, type MockedLogger } from '@kbn/logging-mocks'; +import { RegexWorkerService } from './regex_worker_service'; +import { AnonymizationWorkerConfig } from '../../config'; const mockEsClient = { ml: { inferTrainedModel: jest.fn(), @@ -20,27 +22,36 @@ const setupMockResponse = (entitiesPerDoc: MlInferenceResponseResult[]) => { inference_results: entitiesPerDoc, }); }; +const nerRule: AnonymizationRule = { + type: 'NER', + enabled: true, + modelId: 'model-1', +}; +const nerRule2: AnonymizationRule = { + type: 'NER', + enabled: true, + modelId: 'model-2', +}; +const regexRule: AnonymizationRule = { + type: 'RegExp', + enabled: true, + entityClass: 'EMAIL', + pattern: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}', +}; + +const testConfig = { + enabled: false, +} as AnonymizationWorkerConfig; describe('anonymizeRecords', () => { - const nerRule: AnonymizationRule = { - type: 'NER', - enabled: true, - modelId: 'model-1', - }; - const nerRule2: AnonymizationRule = { - type: 'NER', - enabled: true, - modelId: 'model-2', - }; - const regexRule: AnonymizationRule = { - type: 'RegExp', - enabled: true, - entityClass: 'EMAIL', - pattern: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}', - }; + let logger: MockedLogger; + + let regexWorker: RegexWorkerService; beforeEach(() => { jest.resetAllMocks(); + logger = loggerMock.create(); + regexWorker = new RegexWorkerService(testConfig, logger); }); it('masks values using regex rule', async () => { @@ -49,6 +60,7 @@ describe('anonymizeRecords', () => { const { records, anonymizations } = await anonymizeRecords({ input, anonymizationRules: [regexRule], + regexWorker, esClient: mockEsClient, }); @@ -63,6 +75,7 @@ describe('anonymizeRecords', () => { await anonymizeRecords({ input: [{ content: shortText }], anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }); @@ -80,6 +93,7 @@ describe('anonymizeRecords', () => { const { records } = await anonymizeRecords({ input: [{ content: longText }], anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }); @@ -134,6 +148,7 @@ describe('anonymizeRecords', () => { const { records, anonymizations } = await anonymizeRecords({ input, anonymizationRules: [nerRule, nerRule2], + regexWorker, esClient: mockEsClient, }); @@ -156,6 +171,7 @@ describe('anonymizeRecords', () => { const result = await anonymizeRecords({ input, anonymizationRules: [regexRule], + regexWorker, esClient: mockEsClient, }); @@ -190,6 +206,7 @@ describe('anonymizeRecords', () => { const result = await anonymizeRecords({ input, anonymizationRules: [nerRule], + regexWorker, esClient: mockEsClient, }); @@ -242,6 +259,7 @@ describe('anonymizeRecords', () => { const result = await anonymizeRecords({ input, anonymizationRules: [nerRule, nerRule2], + regexWorker, esClient: mockEsClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.ts index 6c57f881c86e3..cb302c739d172 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/anonymize_records.ts @@ -11,24 +11,29 @@ import { partition } from 'lodash'; import { AnonymizationState } from './types'; import { executeRegexRule } from './execute_regex_rule'; import { executeNerRule } from './execute_ner_rule'; +import { RegexWorkerService } from './regex_worker_service'; export async function anonymizeRecords>({ input, anonymizationRules, + regexWorker, esClient, }: { input: T[]; anonymizationRules: AnonymizationRule[]; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; }): Promise; export async function anonymizeRecords({ input, anonymizationRules, + regexWorker, esClient, }: { input: Array>; anonymizationRules: AnonymizationRule[]; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; }): Promise { let state: AnonymizationState = { @@ -42,9 +47,10 @@ export async function anonymizeRecords({ ); for (const rule of regexRules) { - state = executeRegexRule({ + state = await executeRegexRule({ rule, state, + regexWorker, }); } diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule.ts index baab29d39a58c..1290af98cdf0c 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule.ts @@ -5,41 +5,26 @@ * 2.0. */ -import { Anonymization, RegexAnonymizationRule } from '@kbn/inference-common'; +import { RegexAnonymizationRule } from '@kbn/inference-common'; import { AnonymizationState } from './types'; -import { getEntityMask } from './get_entity_mask'; +import { RegexWorkerService } from './regex_worker_service'; /** * Executes a regex anonymization rule, by iterating over the matches, * and replacing each occurrence with a masked value. */ -export function executeRegexRule({ +export async function executeRegexRule({ state, rule, + regexWorker, }: { state: AnonymizationState; rule: RegexAnonymizationRule; -}): AnonymizationState { - const regex = new RegExp(rule.pattern, 'g'); - - const anonymizations: Anonymization[] = state.anonymizations.concat(); - - const nextRecords = state.records.map((record) => { - const newRecord: Record = {}; - for (const [key, value] of Object.entries(record)) { - newRecord[key] = value.replace(regex, (match) => { - const mask = getEntityMask({ value: match, class_name: rule.entityClass }); - - anonymizations.push({ - entity: { value: match, class_name: rule.entityClass, mask }, - rule: { type: rule.type }, - }); - - return mask; - }); - } - return newRecord; + regexWorker: RegexWorkerService; +}): Promise { + const { records, anonymizations } = await regexWorker.run({ + rule, + records: state.records, }); - - return { records: nextRecords, anonymizations }; + return { records, anonymizations: state.anonymizations.concat(anonymizations) }; } diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule_task.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule_task.ts new file mode 100644 index 0000000000000..12e1a473b5b14 --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/execute_regex_rule_task.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { RegexAnonymizationRule } from '@kbn/inference-common'; +import { Anonymization } from '@kbn/inference-common'; +import { getEntityMask } from './get_entity_mask'; +import { AnonymizationState } from './types'; + +export function executeRegexRuleTask({ + rule, + records, +}: { + rule: RegexAnonymizationRule; + records: Array>; +}): AnonymizationState { + const regex = new RegExp(rule.pattern, 'g'); + const anonymizations: Anonymization[] = []; + const nextRecords = records.map((record: Record) => { + const newRecord: Record = {}; + for (const [key, value] of Object.entries(record)) { + newRecord[key] = value.replace(regex, (match) => { + const mask = getEntityMask({ value: match, class_name: rule.entityClass }); + + anonymizations.push({ + entity: { value: match, class_name: rule.entityClass, mask }, + rule: { type: rule.type }, + }); + + return mask; + }); + } + return newRecord; + }); + + return { records: nextRecords, anonymizations }; +} diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.test.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.test.ts new file mode 100644 index 0000000000000..20f785ef8c4bc --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.test.ts @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { AnonymizationRule } from '@kbn/inference-common'; +import { loggerMock, type MockedLogger } from '@kbn/logging-mocks'; +import { RegexWorkerService } from './regex_worker_service'; +import { AnonymizationWorkerConfig } from '../../config'; + +const regexEmailRule: AnonymizationRule = { + type: 'RegExp', + enabled: true, + entityClass: 'EMAIL', + pattern: '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}', +}; +const backTrackingRule: AnonymizationRule = { + type: 'RegExp', + enabled: true, + entityClass: 'TEST', + pattern: '(a+)+$', +}; +const taskPayload = { + records: [{ email: 'jorge21@gmail.com' }], + rule: regexEmailRule, +}; +function createTestConfig( + overrides: Partial = {} +): AnonymizationWorkerConfig { + return { + enabled: true, + minThreads: 1, + maxThreads: 3, + idleTimeout: { asMilliseconds: () => 30000 }, + taskTimeout: { asMilliseconds: () => 15000 }, + ...overrides, + } as AnonymizationWorkerConfig; +} + +describe('RegexWorkerService', () => { + let logger: MockedLogger; + + beforeEach(() => { + jest.resetAllMocks(); + logger = loggerMock.create(); + }); + + it('anonymizes through the worker', async () => { + const regexWorker = new RegexWorkerService(createTestConfig(), logger); + const result = await regexWorker.run(taskPayload); + const worker = (regexWorker as any).worker; + expect(worker).toBeDefined(); + expect(worker.completed).toBe(1); + expect(result.records[0].email).not.toContain('jorge21@gmail.com'); + expect(result.anonymizations.length).toBe(1); + + // worker completed 2 tasks + await regexWorker.run(taskPayload); + expect(worker.completed).toBe(2); + }); + it('times out task if greater than taskTimeout time', async () => { + const regexWorker = new RegexWorkerService( + createTestConfig({ taskTimeout: { asMilliseconds: () => 1 } } as any), + logger + ); + const longA = 'a'.repeat(10_000) + 'b'; + await expect( + regexWorker.run({ + records: [{ content: longA }], + rule: backTrackingRule, + }) + ).rejects.toThrow('Regex anonymization task timed out'); + }); + it('runs task synchronously when worker is disabled', async () => { + const regexWorker = new RegexWorkerService(createTestConfig({ enabled: false }), logger); + const result = await regexWorker.run(taskPayload); + const worker = (regexWorker as any).worker; + expect(worker).toBeUndefined(); + expect(result.records[0].email).not.toContain('jorge21@gmail.com'); + expect(result.anonymizations.length).toBe(1); + }); +}); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.ts new file mode 100644 index 0000000000000..323a43e711c59 --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_service.ts @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import Piscina from 'piscina'; +import type { Logger } from '@kbn/logging'; +import type { AnonymizationRegexWorkerTaskPayload } from '@kbn/inference-common'; +import type { AnonymizationWorkerConfig } from '../../config'; +import { AnonymizationState } from './types'; +import { executeRegexRuleTask } from './execute_regex_rule_task'; + +function runTaskSync(payload: AnonymizationRegexWorkerTaskPayload): AnonymizationState { + return executeRegexRuleTask(payload); +} + +export class RegexWorkerService { + private readonly enabled: boolean; + private worker?: Piscina; + private readonly config: AnonymizationWorkerConfig; + + constructor(config: AnonymizationWorkerConfig, private readonly logger: Logger) { + this.config = config; + this.enabled = config.enabled; + + if (this.enabled) { + this.logger.debug( + `Initializing regex worker pool (min=${this.config.minThreads} | max=${ + this.config.maxThreads + } | idle=${this.config.idleTimeout.asMilliseconds()}ms)` + ); + + this.worker = new Piscina({ + filename: require.resolve('./regex_worker_wrapper.js'), + minThreads: this.config.minThreads, + maxThreads: this.config.maxThreads, + idleTimeout: this.config.idleTimeout.asMilliseconds(), + }); + } + } + + /** + * Execute a task in a worker. Falls back to synchronous execution when the + * worker is disabled + */ + async run(payload: AnonymizationRegexWorkerTaskPayload): Promise { + if (!this.enabled) { + return runTaskSync(payload); + } + if (!this.worker) { + throw new Error('Regex worker pool was not initialized'); + } + + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), this.config.taskTimeout.asMilliseconds()); + + try { + return await this.worker.run(payload, { signal: controller.signal }); + } catch (err) { + if (err?.name === 'AbortError') { + if (this.worker.threads.length > 0) { + // Attempt to terminate stuck threads + await Promise.all( + this.worker.threads.map(async (thread) => { + try { + await thread.terminate(); + } catch (e) { + // Ignore termination errors + } + }) + ); + } + throw new Error('Regex anonymization task timed out'); + } + throw err; + } finally { + clearTimeout(timer); + } + } + + async stop(): Promise { + await this.worker?.destroy(); + } +} diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_task.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_task.ts new file mode 100644 index 0000000000000..ec9a6cca88f14 --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_task.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AnonymizationRegexWorkerTaskPayload } from '@kbn/inference-common'; +import { executeRegexRuleTask } from './execute_regex_rule_task'; + +// eslint-disable-next-line import/no-default-export +export default function ({ rule, records }: AnonymizationRegexWorkerTaskPayload) { + return executeRegexRuleTask({ rule, records }); +} diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_wrapper.js b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_wrapper.js new file mode 100644 index 0000000000000..845cede94a224 --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/anonymization/regex_worker_wrapper.js @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +const { REPO_ROOT } = require('@kbn/repo-info'); +const { join } = require('path'); + +/* eslint-disable @kbn/imports/no_boundary_crossing */ +/* eslint-disable import/no-dynamic-require */ + +if (process.env.NODE_ENV !== 'production') { + require(join(REPO_ROOT, 'src', 'setup_node_env')); +} else { + require(join(REPO_ROOT, 'src', 'setup_node_env/dist')); +} + +module.exports = require('./regex_worker_task'); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/api.test.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/api.test.ts index f141422ff3ede..cbff6c1f6bb79 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/api.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/api.test.ts @@ -24,6 +24,7 @@ import { chunkEvent, } from '../test_utils'; import { createChatCompleteApi } from './api'; +import { createRegexWorkerServiceMock } from '../test_utils'; describe('createChatCompleteApi', () => { let request: ReturnType; @@ -32,6 +33,7 @@ describe('createChatCompleteApi', () => { let inferenceAdapter: ReturnType; let inferenceConnector: ReturnType; let inferenceExecutor: ReturnType; + let regexWorker: ReturnType; let chatComplete: ChatCompleteAPI; const mockEsClient = { @@ -43,13 +45,14 @@ describe('createChatCompleteApi', () => { request = httpServerMock.createKibanaRequest(); logger = loggerMock.create(); actions = actionsMock.createStart(); - + regexWorker = createRegexWorkerServiceMock(); chatComplete = createChatCompleteApi({ request, actions, logger, esClient: mockEsClient, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); inferenceAdapter = createInferenceConnectorAdapterMock(); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/api.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/api.ts index 517c55cef3d24..d77eb3f3faef1 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/api.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/api.ts @@ -15,6 +15,7 @@ export function createChatCompleteApi({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }: CreateChatCompleteApiOptions) { const callbackApi = createChatCompleteCallbackApi({ @@ -22,6 +23,7 @@ export function createChatCompleteApi({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/callback_api.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/callback_api.ts index e582c5e8d7b2a..dad692298f358 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/callback_api.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/callback_api.ts @@ -34,12 +34,14 @@ import { getRetryFilter } from '../../common/utils/error_retry_filter'; import { anonymizeMessages } from './anonymization/anonymize_messages'; import { deanonymizeMessage } from './anonymization/deanonymize_message'; import { addAnonymizationInstruction } from './anonymization/add_anonymization_instruction'; +import { RegexWorkerService } from './anonymization/regex_worker_service'; interface CreateChatCompleteApiOptions { request: KibanaRequest; actions: ActionsPluginStart; logger: Logger; anonymizationRulesPromise: Promise; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; } @@ -73,6 +75,7 @@ export function createChatCompleteCallbackApi({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }: CreateChatCompleteApiOptions) { return ( @@ -121,6 +124,7 @@ export function createChatCompleteCallbackApi({ system, messages, anonymizationRules, + regexWorker, esClient, }) ).pipe( diff --git a/x-pack/platform/plugins/shared/inference/server/chat_complete/types.ts b/x-pack/platform/plugins/shared/inference/server/chat_complete/types.ts index f87544292aca0..655aa2124f210 100644 --- a/x-pack/platform/plugins/shared/inference/server/chat_complete/types.ts +++ b/x-pack/platform/plugins/shared/inference/server/chat_complete/types.ts @@ -20,6 +20,7 @@ import { KibanaRequest } from '@kbn/core/server'; import { PluginStartContract as ActionsPluginsStart } from '@kbn/actions-plugin/server'; import { ElasticsearchClient } from '@kbn/core/server'; import type { InferenceExecutor } from './utils'; +import { RegexWorkerService } from './anonymization/regex_worker_service'; /** * Adapter in charge of communicating with a specific inference connector @@ -68,5 +69,6 @@ export interface CreateChatCompleteApiOptions { actions: ActionsPluginsStart; logger: Logger; anonymizationRulesPromise: Promise; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; } diff --git a/x-pack/platform/plugins/shared/inference/server/config.ts b/x-pack/platform/plugins/shared/inference/server/config.ts index 72cb031bfc38f..b179a0ce06b93 100644 --- a/x-pack/platform/plugins/shared/inference/server/config.ts +++ b/x-pack/platform/plugins/shared/inference/server/config.ts @@ -9,6 +9,17 @@ import { schema, type TypeOf } from '@kbn/config-schema'; export const configSchema = schema.object({ enabled: schema.boolean({ defaultValue: true }), + workers: schema.object({ + anonymization: schema.object({ + enabled: schema.boolean({ defaultValue: true }), + minThreads: schema.number({ defaultValue: 0, min: 0 }), + maxThreads: schema.number({ defaultValue: 3, min: 1 }), + idleTimeout: schema.duration({ defaultValue: '30s' }), + taskTimeout: schema.duration({ defaultValue: '15s' }), + }), + }), }); export type InferenceConfig = TypeOf; + +export type AnonymizationWorkerConfig = InferenceConfig['workers']['anonymization']; diff --git a/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.test.ts b/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.test.ts index 60315e1c254c7..4e582ced48efc 100644 --- a/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.test.ts @@ -20,6 +20,7 @@ const getConnectorByIdMock = getConnectorById as unknown as jest.MockedFn; @@ -28,6 +29,7 @@ describe('createChatModel', () => { let logger: MockedLogger; let actions: ReturnType; let request: ReturnType; + let regexWorker: ReturnType; const mockEsClient = { ml: { inferTrainedModel: jest.fn(), @@ -38,6 +40,7 @@ describe('createChatModel', () => { logger = loggerMock.create(); actions = actionsMock.createStart(); request = httpServerMock.createKibanaRequest(); + regexWorker = createRegexWorkerServiceMock(); createClientMock.mockReturnValue({ chatComplete: jest.fn(), @@ -60,6 +63,7 @@ describe('createChatModel', () => { temperature: 0.3, }, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, esClient: mockEsClient, }); @@ -68,8 +72,9 @@ describe('createChatModel', () => { actions, request, logger, - anonymizationRulesPromise: Promise.resolve([]), esClient: mockEsClient, + anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); }); @@ -86,6 +91,7 @@ describe('createChatModel', () => { temperature: 0.3, }, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, esClient: mockEsClient, }); @@ -114,6 +120,7 @@ describe('createChatModel', () => { temperature: 0.3, }, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, esClient: mockEsClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.ts b/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.ts index e88d3d9096287..b5d69a9b7783b 100644 --- a/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.ts +++ b/x-pack/platform/plugins/shared/inference/server/inference_client/create_chat_model.ts @@ -13,6 +13,7 @@ import { ElasticsearchClient } from '@kbn/core/server'; import { AnonymizationRule } from '@kbn/inference-common'; import { getConnectorById } from '../util/get_connector_by_id'; import { createClient } from './create_client'; +import { RegexWorkerService } from '../chat_complete/anonymization/regex_worker_service'; export interface CreateChatModelOptions { request: KibanaRequest; @@ -21,6 +22,7 @@ export interface CreateChatModelOptions { logger: Logger; chatModelOptions: Omit; anonymizationRulesPromise: Promise; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; } @@ -31,12 +33,14 @@ export const createChatModel = async ({ logger, chatModelOptions, anonymizationRulesPromise, + regexWorker, esClient, }: CreateChatModelOptions): Promise => { const client = createClient({ actions, request, anonymizationRulesPromise, + regexWorker, esClient, logger, }); diff --git a/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.test.ts b/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.test.ts index 28f0aa5cdbfa9..911934163c8d3 100644 --- a/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.test.ts @@ -14,6 +14,7 @@ jest.mock('./inference_client'); jest.mock('../../common/inference_client/bind_client'); import { createInferenceClient } from './inference_client'; import { bindClient } from '../../common/inference_client/bind_client'; +import { createRegexWorkerServiceMock } from '../test_utils'; const bindClientMock = bindClient as jest.MockedFn; const createInferenceClientMock = createInferenceClient as jest.MockedFn< @@ -28,11 +29,13 @@ describe('createClient', () => { let logger: MockedLogger; let actions: ReturnType; let request: ReturnType; + let regexWorker: ReturnType; beforeEach(() => { logger = loggerMock.create(); actions = actionsMock.createStart(); request = httpServerMock.createKibanaRequest(); + regexWorker = createRegexWorkerServiceMock(); }); afterEach(() => { @@ -51,6 +54,7 @@ describe('createClient', () => { logger, esClient: mockEsClient, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); expect(createInferenceClientMock).toHaveBeenCalledTimes(1); @@ -58,8 +62,9 @@ describe('createClient', () => { request, actions, logger: logger.get('client'), - anonymizationRulesPromise: Promise.resolve([]), esClient: mockEsClient, + anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); expect(bindClientMock).not.toHaveBeenCalled(); @@ -78,6 +83,7 @@ describe('createClient', () => { logger, esClient: mockEsClient, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); // type check on client.chatComplete @@ -105,6 +111,7 @@ describe('createClient', () => { }, esClient: mockEsClient, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); expect(createInferenceClientMock).toHaveBeenCalledTimes(1); @@ -112,8 +119,9 @@ describe('createClient', () => { request, actions, logger: logger.get('client'), - anonymizationRulesPromise: Promise.resolve([]), esClient: mockEsClient, + anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); expect(bindClientMock).toHaveBeenCalledTimes(1); @@ -138,6 +146,7 @@ describe('createClient', () => { }, esClient: mockEsClient, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, }); // type check on client.chatComplete diff --git a/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.ts b/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.ts index 962f2c8ef3f13..24eba0a57b177 100644 --- a/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.ts +++ b/x-pack/platform/plugins/shared/inference/server/inference_client/create_client.ts @@ -13,12 +13,14 @@ import { AnonymizationRule } from '@kbn/inference-common'; import { ElasticsearchClient } from '@kbn/core/server'; import { createInferenceClient } from './inference_client'; import { bindClient } from '../../common/inference_client/bind_client'; +import { RegexWorkerService } from '../chat_complete/anonymization/regex_worker_service'; interface CreateClientOptions { request: KibanaRequest; actions: ActionsPluginStart; logger: Logger; anonymizationRulesPromise: Promise; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; } @@ -31,12 +33,13 @@ export function createClient(options: BoundCreateClientOptions): BoundInferenceC export function createClient( options: CreateClientOptions | BoundCreateClientOptions ): BoundInferenceClient | InferenceClient { - const { actions, request, logger, anonymizationRulesPromise, esClient } = options; + const { actions, request, logger, anonymizationRulesPromise, esClient, regexWorker } = options; const client = createInferenceClient({ request, actions, logger: logger.get('client'), anonymizationRulesPromise, + regexWorker, esClient, }); if ('bindTo' in options) { diff --git a/x-pack/platform/plugins/shared/inference/server/inference_client/inference_client.ts b/x-pack/platform/plugins/shared/inference/server/inference_client/inference_client.ts index d23cd885df93b..1f7e4068e21e1 100644 --- a/x-pack/platform/plugins/shared/inference/server/inference_client/inference_client.ts +++ b/x-pack/platform/plugins/shared/inference/server/inference_client/inference_client.ts @@ -15,18 +15,21 @@ import { createChatCompleteApi } from '../chat_complete'; import { createOutputApi } from '../../common/output/create_output_api'; import { getConnectorById } from '../util/get_connector_by_id'; import { createPromptApi } from '../prompt'; +import { RegexWorkerService } from '../chat_complete/anonymization/regex_worker_service'; export function createInferenceClient({ request, actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }: { request: KibanaRequest; logger: Logger; actions: ActionsPluginStart; anonymizationRulesPromise: Promise; + regexWorker: RegexWorkerService; esClient: ElasticsearchClient; }): InferenceClient { const chatComplete = createChatCompleteApi({ @@ -34,11 +37,19 @@ export function createInferenceClient({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }); return { chatComplete, - prompt: createPromptApi({ request, actions, logger, anonymizationRulesPromise, esClient }), + prompt: createPromptApi({ + request, + actions, + logger, + anonymizationRulesPromise, + regexWorker, + esClient, + }), output: createOutputApi(chatComplete), getConnectorById: async (connectorId: string) => { const actionsClient = await actions.getActionsClientWithRequest(request); diff --git a/x-pack/platform/plugins/shared/inference/server/plugin.ts b/x-pack/platform/plugins/shared/inference/server/plugin.ts index 372a6e93b8d28..8fe415cebccfd 100644 --- a/x-pack/platform/plugins/shared/inference/server/plugin.ts +++ b/x-pack/platform/plugins/shared/inference/server/plugin.ts @@ -15,6 +15,7 @@ import { } from '@kbn/inference-common'; import type { KibanaRequest } from '@kbn/core-http-server'; import { createClient as createInferenceClient, createChatModel } from './inference_client'; +import { RegexWorkerService } from './chat_complete/anonymization/regex_worker_service'; import { registerRoutes } from './routes'; import type { InferenceConfig } from './config'; import { @@ -37,11 +38,12 @@ export class InferencePlugin > { private logger: Logger; - - private shutdownProcessor?: () => Promise; + private config: InferenceConfig; + private regexWorker?: RegexWorkerService; constructor(context: PluginInitializerContext) { this.logger = context.logger.get(); + this.config = context.config.get(); } setup( coreSetup: CoreSetup, @@ -61,6 +63,11 @@ export class InferencePlugin } start(core: CoreStart, pluginsStart: InferenceStartDependencies): InferenceServerStart { + this.regexWorker = new RegexWorkerService( + this.config.workers.anonymization, + this.logger.get('regex_worker') + ); + const createAnonymizationRulesPromise = async (request: KibanaRequest) => { const soClient = core.savedObjects.getScopedClient(request); const uiSettingsClient = core.uiSettings.asScopedToClient(soClient); @@ -79,6 +86,7 @@ export class InferencePlugin return createInferenceClient({ ...options, anonymizationRulesPromise: createAnonymizationRulesPromise(options.request), + regexWorker: this.regexWorker!, actions: pluginsStart.actions, logger: this.logger.get('client'), esClient: core.elasticsearch.client.asScoped(options.request).asCurrentUser, @@ -92,6 +100,7 @@ export class InferencePlugin chatModelOptions: options.chatModelOptions, actions: pluginsStart.actions, anonymizationRulesPromise: createAnonymizationRulesPromise(options.request), + regexWorker: this.regexWorker!, esClient: core.elasticsearch.client.asScoped(options.request).asCurrentUser, logger: this.logger, }); @@ -100,6 +109,6 @@ export class InferencePlugin } async stop() { - await this.shutdownProcessor?.(); + await this.regexWorker?.stop(); } } diff --git a/x-pack/platform/plugins/shared/inference/server/prompt/api.test.ts b/x-pack/platform/plugins/shared/inference/server/prompt/api.test.ts index 6b39aead7d264..628c725bd81b9 100644 --- a/x-pack/platform/plugins/shared/inference/server/prompt/api.test.ts +++ b/x-pack/platform/plugins/shared/inference/server/prompt/api.test.ts @@ -9,6 +9,7 @@ import { of, isObservable, firstValueFrom, toArray } from 'rxjs'; import { loggerMock, type MockedLogger } from '@kbn/logging-mocks'; import { httpServerMock } from '@kbn/core/server/mocks'; import { actionsMock } from '@kbn/actions-plugin/server/mocks'; +import { createRegexWorkerServiceMock } from '../test_utils'; import { MessageRole, type PromptAPI, @@ -67,6 +68,7 @@ describe('createPromptApi', () => { let actions: ReturnType; let promptApi: PromptAPI; let mockCallbackApi: jest.MockedFn; + let regexWorker: ReturnType; const mockInput = { query: 'world' }; @@ -74,6 +76,7 @@ describe('createPromptApi', () => { request = httpServerMock.createKibanaRequest(); logger = loggerMock.create(); actions = actionsMock.createStart(); + regexWorker = createRegexWorkerServiceMock(); mockCallbackApi = jest.fn(); mockCreateChatCompleteCallbackApi.mockReturnValue(mockCallbackApi); @@ -83,6 +86,7 @@ describe('createPromptApi', () => { actions, logger, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, esClient: mockEsClient, }); }); @@ -97,6 +101,7 @@ describe('createPromptApi', () => { actions, logger, anonymizationRulesPromise: Promise.resolve([]), + regexWorker, esClient: mockEsClient, }); }); diff --git a/x-pack/platform/plugins/shared/inference/server/prompt/api.ts b/x-pack/platform/plugins/shared/inference/server/prompt/api.ts index aa932c3ab68c6..23347ffe7dc65 100644 --- a/x-pack/platform/plugins/shared/inference/server/prompt/api.ts +++ b/x-pack/platform/plugins/shared/inference/server/prompt/api.ts @@ -22,6 +22,7 @@ export function createPromptApi({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }: CreateChatCompleteApiOptions) { const callbackApi = createChatCompleteCallbackApi({ @@ -29,6 +30,7 @@ export function createPromptApi({ actions, logger, anonymizationRulesPromise, + regexWorker, esClient, }); diff --git a/x-pack/platform/plugins/shared/inference/server/test_utils/index.ts b/x-pack/platform/plugins/shared/inference/server/test_utils/index.ts index 2eafe20bfdcaf..0933df58342b6 100644 --- a/x-pack/platform/plugins/shared/inference/server/test_utils/index.ts +++ b/x-pack/platform/plugins/shared/inference/server/test_utils/index.ts @@ -9,3 +9,4 @@ export { chunkEvent, tokensEvent, messageEvent } from './chat_complete_events'; export { createInferenceConnectorMock } from './inference_connector'; export { createInferenceConnectorAdapterMock } from './inference_connector_adapter'; export { createInferenceExecutorMock } from './inference_executor'; +export { createRegexWorkerServiceMock } from './regex_worker_service.mock'; diff --git a/x-pack/platform/plugins/shared/inference/server/test_utils/regex_worker_service.mock.ts b/x-pack/platform/plugins/shared/inference/server/test_utils/regex_worker_service.mock.ts new file mode 100644 index 0000000000000..fd0ac969c00d4 --- /dev/null +++ b/x-pack/platform/plugins/shared/inference/server/test_utils/regex_worker_service.mock.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AnonymizationRegexWorkerTaskPayload } from '@kbn/inference-common'; +import type { AnonymizationState } from '../chat_complete/anonymization/types'; +import { RegexWorkerService } from '../chat_complete/anonymization/regex_worker_service'; + +export const createRegexWorkerServiceMock = () => { + const mock = { + run: jest.fn( + ({ records }: AnonymizationRegexWorkerTaskPayload): Promise => + Promise.resolve({ records, anonymizations: [] }) + ), + stop: jest.fn().mockResolvedValue(undefined), + }; + return mock as unknown as RegexWorkerService; +}; diff --git a/x-pack/platform/plugins/shared/inference/tsconfig.json b/x-pack/platform/plugins/shared/inference/tsconfig.json index 9747ffde8a0a3..f5148cac62e8d 100644 --- a/x-pack/platform/plugins/shared/inference/tsconfig.json +++ b/x-pack/platform/plugins/shared/inference/tsconfig.json @@ -18,6 +18,9 @@ ".storybook/**/*.js" ], "kbn_references": [ + { + "path": "../../../../../src/setup_node_env/tsconfig.json" + }, "@kbn/i18n", "@kbn/esql-ast", "@kbn/esql-validation-autocomplete", diff --git a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/ai_assistant/anonymization/anonymization.spec.ts b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/ai_assistant/anonymization/anonymization.spec.ts index 84fe3f6590074..133456d240eca 100644 --- a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/ai_assistant/anonymization/anonymization.spec.ts +++ b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/ai_assistant/anonymization/anonymization.spec.ts @@ -27,8 +27,7 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon const userText1 = 'My name is Claudia and my email is claudia@example.com'; const userText2 = 'my website is http://claudia.is'; // LLM proxy is not working on MKI - this.tags(['failsOnMKI']); - + this.tags(['skipCloud']); before(async () => { await clearConversations(es); proxy = await createLlmProxy(log);