diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.test.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.test.ts index 3f5b85841194b..4e5e7cae13982 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.test.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.test.ts @@ -46,6 +46,7 @@ const mockProcessTaskOutput = jest.fn().mockResolvedValue({}); const mockInitialize = jest.fn().mockResolvedValue(undefined); class TestMigrationTaskRunner extends SiemMigrationTaskRunner { + protected taskConcurrency = 10; protected TaskRunnerClass = SiemMigrationTaskRunner; protected EvaluatorClass = undefined; diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.ts index e1d14f7662bf7..a768d73a11244 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/common/task/siem_migrations_task_runner.ts @@ -23,8 +23,6 @@ import type { import { ActionsClientChat } from './util/actions_client_chat'; import type { SiemMigrationTelemetryClient } from './siem_migrations_telemetry_client'; -/** Number of concurrent item translations in the pool */ -const TASK_CONCURRENCY = 10 as const; /** Number of items loaded in memory to be translated in the pool */ const TASK_BATCH_SIZE = 100 as const; /** The timeout of each individual agent invocation in minutes */ @@ -51,7 +49,7 @@ const EXECUTOR_SLEEP = { /** This limit should never be reached, it's a safety net to prevent infinite loops. * It represents the max number of consecutive rate limit recovery & failure attempts. - * This can only happen when the API can not process TASK_CONCURRENCY translations at a time, + * This can only happen when the API can not process all concurrenct translations ( based on taskConcurrency ) at a time, * even after the executor sleep is increased on every attempt. **/ const EXECUTOR_RECOVER_MAX_ATTEMPTS = 3 as const; @@ -78,6 +76,8 @@ export abstract class SiemMigrationTaskRunner< private abort: ReturnType; private executorSleepMultiplier: number = EXECUTOR_SLEEP.initialValueSeconds; public isWaiting: boolean = false; + /** Number of concurrent items to process. Each item triggers one instance of graph */ + protected abstract readonly taskConcurrency: number; constructor( public readonly migrationId: string, @@ -124,7 +124,7 @@ export abstract class SiemMigrationTaskRunner< } const migrateItemTask = this.createMigrateItemTask(invocationConfig); - this.logger.debug(`Started translations. Concurrency is: ${TASK_CONCURRENCY}`); + this.logger.debug(`Started translations. Concurrency is: ${this.taskConcurrency}`); try { do { @@ -139,7 +139,7 @@ export abstract class SiemMigrationTaskRunner< this.logger.debug(`Start processing batch of ${migrationItems.length} items`); const { errors } = await initPromisePool, void, Error>({ - concurrency: TASK_CONCURRENCY, + concurrency: this.taskConcurrency, abortSignal: this.abortController.signal, items: migrationItems, executor: async (migrationItem) => { @@ -207,6 +207,9 @@ export abstract class SiemMigrationTaskRunner< const config: RunnableConfig = { timeout: AGENT_INVOKE_TIMEOUT_MIN * 60 * 1000, // milliseconds timeout ...invocationConfig, + metadata: { + migrationId: this.migrationId, + }, signal: this.abortController.signal, }; @@ -251,6 +254,7 @@ export abstract class SiemMigrationTaskRunner< await this.executorSleep(); // Random sleep, increased every time we hit the rate limit. return await invoke(); } catch (error) { + this.logger.debug(`Error during migration item translation: ${error.toString()}`); if (!this.isRateLimitError(error) || recoverAttemptsLeft === 0) { throw error; } diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/constants.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/constants.ts index eb214125274dc..1170935d7a21e 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/constants.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/constants.ts @@ -5,9 +5,17 @@ * 2.0. */ +import type { RetryPolicy } from '@langchain/langgraph'; + /* The "dummy" index pattern to use during the query translation, it is replaced at the end by the actual index pattern. We have detected that - using the placeholder "[indexPattern]": makes the LLM get confused because it is not the syntax it expects - using the wildcard "*": makes it harder for the LLM to understand that it is the index pattern */ export const TRANSLATION_INDEX_PATTERN = 'logs*'; + +export const RETRY_POLICY: RetryPolicy = { + initialInterval: 1000, + maxAttempts: 8, + jitter: true, +}; diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/graph.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/graph.ts index be34123ddcbc3..388a28153cd4d 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/graph.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/graph.ts @@ -12,6 +12,7 @@ import { getParseOriginalDashboardNode } from './nodes/parse_original_dashboard' import { getCreateDescriptionsNode } from './nodes/create_descriptions'; import { getTranslatePanelNode } from './nodes/translate_panel/translate_panel'; import { getAggregateDashboardNode } from './nodes/aggregate_dashboard'; +import { RETRY_POLICY } from './constants'; export function getDashboardMigrationAgent(params: MigrateDashboardGraphParams) { const parseOriginalDashboardNode = getParseOriginalDashboardNode(); @@ -25,8 +26,12 @@ export function getDashboardMigrationAgent(params: MigrateDashboardGraphParams) ) // Nodes .addNode('parseOriginalDashboard', parseOriginalDashboardNode) - .addNode('createDescriptions', createDescriptionsNode) - .addNode('translatePanel', translatePanel.node, { subgraphs: [translatePanel.subgraph] }) + .addNode('createDescriptions', createDescriptionsNode, { + retryPolicy: RETRY_POLICY, + }) + .addNode('translatePanel', translatePanel.node, { + subgraphs: [translatePanel.subgraph], + }) .addNode('aggregateDashboard', aggregateDashboardNode) // Edges .addEdge(START, 'parseOriginalDashboard') diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/nodes/translate_panel/translate_panel.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/nodes/translate_panel/translate_panel.ts index b630912625cb4..0b68885d55246 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/nodes/translate_panel/translate_panel.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/nodes/translate_panel/translate_panel.ts @@ -22,6 +22,9 @@ export type TranslatePanelNode = (( subgraph?: ReturnType; }; +/** Number of panels to be processed concurrently per dashboard */ +const DEFAULT_PANELS_CONCURRENCY = 4; + export interface TranslatePanel { node: TranslatePanelNode; conditionalEdge: (state: MigrateDashboardState) => Send[]; @@ -41,7 +44,9 @@ export const getTranslatePanelNode = (params: TranslatePanelGraphParams): Transl } // Invoke the subgraph to translate the panel - const output = await translatePanelSubGraph.invoke(nodeParams); + const output = await translatePanelSubGraph.invoke(nodeParams, { + maxConcurrency: DEFAULT_PANELS_CONCURRENCY, + }); if (!output.elastic_panel) { throw new Error('No panel visualization generated'); @@ -54,8 +59,8 @@ export const getTranslatePanelNode = (params: TranslatePanelGraphParams): Transl comments: output.comments, }; } catch (err) { - params.logger.error(`Error translating panel: ${err}`); const message = `Error translating panel: ${err.toString()}`; + params.logger.error(message); translatedPanel = { index, title: nodeParams.parsed_panel.title, diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/sub_graphs/translate_panel/graph.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/sub_graphs/translate_panel/graph.ts index 7a48d50096b2e..04a809f8913a9 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/sub_graphs/translate_panel/graph.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/agent/sub_graphs/translate_panel/graph.ts @@ -17,6 +17,7 @@ import { translateDashboardPanelState } from './state'; import type { TranslatePanelGraphParams, TranslateDashboardPanelState } from './types'; import { migrateDashboardConfigSchema } from '../../state'; import { getSelectIndexPatternNode } from './nodes/select_index_pattern'; +import { RETRY_POLICY } from '../../constants'; import { getExtractColumnsFromEsqlQueryNode } from './nodes/extract_columns'; export function getTranslatePanelGraph(params: TranslatePanelGraphParams) { @@ -34,12 +35,22 @@ export function getTranslatePanelGraph(params: TranslatePanelGraphParams) { migrateDashboardConfigSchema ) // Nodes - .addNode('inlineQuery', inlineQueryNode) - .addNode('translateQuery', translateQueryNode) + .addNode('inlineQuery', inlineQueryNode, { + retryPolicy: RETRY_POLICY, + }) + .addNode('translateQuery', translateQueryNode, { + retryPolicy: RETRY_POLICY, + }) .addNode('validation', validationNode) - .addNode('fixQueryErrors', fixQueryErrorsNode) - .addNode('ecsMapping', ecsMappingNode) - .addNode('extractColumnsFromEsql', extractColumnsFromEsqlNode) + .addNode('fixQueryErrors', fixQueryErrorsNode, { + retryPolicy: RETRY_POLICY, + }) + .addNode('ecsMapping', ecsMappingNode, { + retryPolicy: RETRY_POLICY, + }) + .addNode('extractColumnsFromEsql', extractColumnsFromEsqlNode, { + retryPolicy: RETRY_POLICY, + }) .addNode('selectIndexPattern', selectIndexPatternNode) .addNode('translationResult', translationResultNode) diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/dashboard_migrations_task_runner.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/dashboard_migrations_task_runner.ts index 616be19be8d04..246d864473429 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/dashboard_migrations_task_runner.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/dashboards/task/dashboard_migrations_task_runner.ts @@ -37,6 +37,7 @@ export class DashboardMigrationTaskRunner extends SiemMigrationTaskRunner< DashboardMigrationTaskOutput > { private retriever: DashboardMigrationsRetriever; + protected readonly taskConcurrency = 3; constructor( public readonly migrationId: string, diff --git a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/rules/task/rule_migrations_task_runner.ts b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/rules/task/rule_migrations_task_runner.ts index 2565f5fce3512..8271aefb9ad24 100644 --- a/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/rules/task/rule_migrations_task_runner.ts +++ b/x-pack/solutions/security/plugins/security_solution/server/lib/siem_migrations/rules/task/rule_migrations_task_runner.ts @@ -33,6 +33,7 @@ export class RuleMigrationTaskRunner extends SiemMigrationTaskRunner< RuleMigrationTaskOutput > { private retriever: RuleMigrationsRetriever; + protected readonly taskConcurrency = 10; constructor( public readonly migrationId: string,