Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const mockProcessTaskOutput = jest.fn().mockResolvedValue({});
const mockInitialize = jest.fn().mockResolvedValue(undefined);

class TestMigrationTaskRunner extends SiemMigrationTaskRunner {
protected taskConcurrency = 10;
protected TaskRunnerClass = SiemMigrationTaskRunner;
protected EvaluatorClass = undefined;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import type {
import { ActionsClientChat } from './util/actions_client_chat';
import type { SiemMigrationTelemetryClient } from './siem_migrations_telemetry_client';

/** Number of concurrent item translations in the pool */
const TASK_CONCURRENCY = 10 as const;
/** Number of items loaded in memory to be translated in the pool */
const TASK_BATCH_SIZE = 100 as const;
/** The timeout of each individual agent invocation in minutes */
Expand All @@ -51,7 +49,7 @@ const EXECUTOR_SLEEP = {

/** This limit should never be reached, it's a safety net to prevent infinite loops.
* It represents the max number of consecutive rate limit recovery & failure attempts.
* This can only happen when the API can not process TASK_CONCURRENCY translations at a time,
* This can only happen when the API can not process all concurrenct translations ( based on taskConcurrency ) at a time,
* even after the executor sleep is increased on every attempt.
**/
const EXECUTOR_RECOVER_MAX_ATTEMPTS = 3 as const;
Expand All @@ -78,6 +76,8 @@ export abstract class SiemMigrationTaskRunner<
private abort: ReturnType<typeof abortSignalToPromise>;
private executorSleepMultiplier: number = EXECUTOR_SLEEP.initialValueSeconds;
public isWaiting: boolean = false;
/** Number of concurrent items to process. Each item triggers one instance of graph */
protected abstract readonly taskConcurrency: number;

constructor(
public readonly migrationId: string,
Expand Down Expand Up @@ -124,7 +124,7 @@ export abstract class SiemMigrationTaskRunner<
}

const migrateItemTask = this.createMigrateItemTask(invocationConfig);
this.logger.debug(`Started translations. Concurrency is: ${TASK_CONCURRENCY}`);
this.logger.debug(`Started translations. Concurrency is: ${this.taskConcurrency}`);

try {
do {
Expand All @@ -139,7 +139,7 @@ export abstract class SiemMigrationTaskRunner<
this.logger.debug(`Start processing batch of ${migrationItems.length} items`);

const { errors } = await initPromisePool<Stored<I>, void, Error>({
concurrency: TASK_CONCURRENCY,
concurrency: this.taskConcurrency,
abortSignal: this.abortController.signal,
items: migrationItems,
executor: async (migrationItem) => {
Expand Down Expand Up @@ -207,6 +207,9 @@ export abstract class SiemMigrationTaskRunner<
const config: RunnableConfig<C> = {
timeout: AGENT_INVOKE_TIMEOUT_MIN * 60 * 1000, // milliseconds timeout
...invocationConfig,
metadata: {
migrationId: this.migrationId,
},
signal: this.abortController.signal,
};

Expand Down Expand Up @@ -251,6 +254,7 @@ export abstract class SiemMigrationTaskRunner<
await this.executorSleep(); // Random sleep, increased every time we hit the rate limit.
return await invoke();
} catch (error) {
this.logger.debug(`Error during migration item translation: ${error.toString()}`);
if (!this.isRateLimitError(error) || recoverAttemptsLeft === 0) {
throw error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@
* 2.0.
*/

import type { RetryPolicy } from '@langchain/langgraph';

/* The "dummy" index pattern to use during the query translation, it is replaced at the end by the actual index pattern.
We have detected that
- using the placeholder "[indexPattern]": makes the LLM get confused because it is not the syntax it expects
- using the wildcard "*": makes it harder for the LLM to understand that it is the index pattern
*/
export const TRANSLATION_INDEX_PATTERN = 'logs*';

export const RETRY_POLICY: RetryPolicy = {
initialInterval: 1000,
maxAttempts: 8,
jitter: true,
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { getParseOriginalDashboardNode } from './nodes/parse_original_dashboard'
import { getCreateDescriptionsNode } from './nodes/create_descriptions';
import { getTranslatePanelNode } from './nodes/translate_panel/translate_panel';
import { getAggregateDashboardNode } from './nodes/aggregate_dashboard';
import { RETRY_POLICY } from './constants';

export function getDashboardMigrationAgent(params: MigrateDashboardGraphParams) {
const parseOriginalDashboardNode = getParseOriginalDashboardNode();
Expand All @@ -25,8 +26,12 @@ export function getDashboardMigrationAgent(params: MigrateDashboardGraphParams)
)
// Nodes
.addNode('parseOriginalDashboard', parseOriginalDashboardNode)
.addNode('createDescriptions', createDescriptionsNode)
.addNode('translatePanel', translatePanel.node, { subgraphs: [translatePanel.subgraph] })
.addNode('createDescriptions', createDescriptionsNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('translatePanel', translatePanel.node, {
subgraphs: [translatePanel.subgraph],
})
.addNode('aggregateDashboard', aggregateDashboardNode)
// Edges
.addEdge(START, 'parseOriginalDashboard')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export type TranslatePanelNode = ((
subgraph?: ReturnType<typeof getTranslatePanelGraph>;
};

/** Number of panels to be processed concurrently per dashboard */
const DEFAULT_PANELS_CONCURRENCY = 4;

export interface TranslatePanel {
node: TranslatePanelNode;
conditionalEdge: (state: MigrateDashboardState) => Send[];
Expand All @@ -41,7 +44,9 @@ export const getTranslatePanelNode = (params: TranslatePanelGraphParams): Transl
}

// Invoke the subgraph to translate the panel
const output = await translatePanelSubGraph.invoke(nodeParams);
const output = await translatePanelSubGraph.invoke(nodeParams, {
maxConcurrency: DEFAULT_PANELS_CONCURRENCY,
});

if (!output.elastic_panel) {
throw new Error('No panel visualization generated');
Expand All @@ -54,8 +59,8 @@ export const getTranslatePanelNode = (params: TranslatePanelGraphParams): Transl
comments: output.comments,
};
} catch (err) {
params.logger.error(`Error translating panel: ${err}`);
const message = `Error translating panel: ${err.toString()}`;
params.logger.error(message);
translatedPanel = {
index,
title: nodeParams.parsed_panel.title,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { translateDashboardPanelState } from './state';
import type { TranslatePanelGraphParams, TranslateDashboardPanelState } from './types';
import { migrateDashboardConfigSchema } from '../../state';
import { getSelectIndexPatternNode } from './nodes/select_index_pattern';
import { RETRY_POLICY } from '../../constants';
import { getExtractColumnsFromEsqlQueryNode } from './nodes/extract_columns';

export function getTranslatePanelGraph(params: TranslatePanelGraphParams) {
Expand All @@ -34,12 +35,22 @@ export function getTranslatePanelGraph(params: TranslatePanelGraphParams) {
migrateDashboardConfigSchema
)
// Nodes
.addNode('inlineQuery', inlineQueryNode)
.addNode('translateQuery', translateQueryNode)
.addNode('inlineQuery', inlineQueryNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('translateQuery', translateQueryNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('validation', validationNode)
.addNode('fixQueryErrors', fixQueryErrorsNode)
.addNode('ecsMapping', ecsMappingNode)
.addNode('extractColumnsFromEsql', extractColumnsFromEsqlNode)
.addNode('fixQueryErrors', fixQueryErrorsNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('ecsMapping', ecsMappingNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('extractColumnsFromEsql', extractColumnsFromEsqlNode, {
retryPolicy: RETRY_POLICY,
})
.addNode('selectIndexPattern', selectIndexPatternNode)
.addNode('translationResult', translationResultNode)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class DashboardMigrationTaskRunner extends SiemMigrationTaskRunner<
DashboardMigrationTaskOutput
> {
private retriever: DashboardMigrationsRetriever;
protected readonly taskConcurrency = 3;

constructor(
public readonly migrationId: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class RuleMigrationTaskRunner extends SiemMigrationTaskRunner<
RuleMigrationTaskOutput
> {
private retriever: RuleMigrationsRetriever;
protected readonly taskConcurrency = 10;

constructor(
public readonly migrationId: string,
Expand Down