From 75d7b1043d85fd1c98d412dac22620d7855acd2a Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Mon, 4 Aug 2025 11:09:17 +0200 Subject: [PATCH 1/6] Fix bug when updateWorkflowExecution replaced document instead of partially updaiting it --- .../repositories/step_execution_repository.ts | 24 +++++++++++++++++++ .../workflow_execution_repository.ts | 21 ++++++++++++++-- .../workflow_execution_runtime_manager.ts | 2 -- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts index 835bd636143bb..afae192b83b93 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts @@ -23,6 +23,12 @@ export class StepExecutionRepository { // return []; // } + /** + * Creates a new step execution document in Elasticsearch. + * + * @param stepExecution - A partial object representing the workflow step execution to be indexed. + * @returns A promise that resolves when the document has been successfully indexed. + */ public async createStepExecution(stepExecution: Partial): Promise { await this.esClient.index({ index: this.indexName, @@ -32,10 +38,28 @@ export class StepExecutionRepository { }); } + /** + * Updates a single workflow step execution in the repository. + * + * @param stepExecution - A partial object representing the workflow step execution to update. + * @returns A promise that resolves when the update operation is complete. + */ public updateStepExecution(stepExecution: Partial): Promise { return this.updateStepExecutions([stepExecution]); } + /** + * Updates multiple step executions in Elasticsearch. + * + * This method takes an array of partial `EsWorkflowStepExecution` objects, + * validates that each has an `id`, and performs a bulk update operation + * in Elasticsearch for all provided step executions. + * + * @param stepExecutions - An array of partial step execution objects to update. + * Each object must include an `id` property. + * @throws {Error} If any step execution does not have an `id`. + * @returns A promise that resolves when the bulk update operation completes. + */ public async updateStepExecutions( stepExecutions: Array> ): Promise { diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts index a20b0cd7c38dc..0968550a20379 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts @@ -15,6 +15,12 @@ export class WorkflowExecutionRepository { private indexName = WORKFLOWS_EXECUTIONS_INDEX; constructor(private esClient: ElasticsearchClient) {} + /** + * Creates a new workflow execution document in Elasticsearch. + * + * @param workflowExecution - A partial object representing the workflow execution to be created. + * @returns A promise that resolves when the workflow execution has been indexed. + */ public async createWorkflowExecution( workflowExecution: Partial ): Promise { @@ -26,6 +32,17 @@ export class WorkflowExecutionRepository { }); } + /** + * Partially updates an existing workflow execution in Elasticsearch. + * + * This method requires the `id` property to be present in the `workflowExecution` object. + * If the `id` is missing, an error is thrown. + * The update operation is performed using the Elasticsearch client, and the document is refreshed after the update. + * + * @param workflowExecution - A partial object representing the workflow execution to update. Must include the `id` property. + * @throws {Error} If the `id` property is not provided in the `workflowExecution` object. + * @returns A promise that resolves when the update operation is complete. + */ public async updateWorkflowExecution( workflowExecution: Partial ): Promise { @@ -33,11 +50,11 @@ export class WorkflowExecutionRepository { throw new Error('Workflow execution ID is required for update'); } - await this.esClient.index({ + await this.esClient.update>({ index: this.indexName, id: workflowExecution.id, refresh: true, - document: workflowExecution, + doc: workflowExecution, }); } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts b/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts index 45f742231d107..2c6e9e58caffb 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts @@ -220,7 +220,6 @@ export class WorkflowExecutionRuntimeManager { status: ExecutionStatus.RUNNING, startedAt: new Date().toISOString(), workflowId: this.workflowExecution.workflowId, - triggeredBy: this.workflowExecution.triggeredBy, }; await this.workflowExecutionRepository.updateWorkflowExecution(updatedWorkflowExecution); this.workflowExecution = { @@ -256,7 +255,6 @@ export class WorkflowExecutionRuntimeManager { id: this.workflowExecution.id, workflowId: this.workflowExecution.workflowId, startedAt: this.workflowExecution.startedAt, - triggeredBy: this.workflowExecution.triggeredBy, }; if (this.isWorkflowFinished()) { From 1aea7b1cb614f62d6d0e20d29e5213b7f6e15394 Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Mon, 4 Aug 2025 11:44:55 +0200 Subject: [PATCH 2/6] Add validation for ID in create and update methods of Step and Workflow Execution Repositories --- .../step_execution_repository.test.ts | 89 +++++++++++++++++++ .../repositories/step_execution_repository.ts | 4 + .../workflow_execution_repository.test.ts | 63 +++++++++++++ .../workflow_execution_repository.ts | 4 + 4 files changed, 160 insertions(+) create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.test.ts create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.test.ts diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.test.ts new file mode 100644 index 0000000000000..d802c70a704e9 --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.test.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { WORKFLOWS_STEP_EXECUTIONS_INDEX } from '../../common'; +import { StepExecutionRepository } from './step_execution_repository'; +import { ExecutionStatus } from '@kbn/workflows'; + +describe('StepExecutionRepository', () => { + let repository: StepExecutionRepository; + let esClient: { index: jest.Mock; update: jest.Mock; bulk: jest.Mock }; + + beforeEach(() => { + esClient = { + index: jest.fn(), + update: jest.fn(), + bulk: jest.fn(), + }; + repository = new StepExecutionRepository(esClient as any); + }); + + describe('createStepExecution', () => { + it('should create a step execution', async () => { + const stepExecution = { id: '1', stepId: 'test-step' }; + await repository.createStepExecution(stepExecution); + expect(esClient.index).toHaveBeenCalledWith({ + index: WORKFLOWS_STEP_EXECUTIONS_INDEX, + id: '1', + refresh: true, + document: stepExecution, + }); + }); + + it('should throw an error if ID is missing during create', async () => { + await expect(repository.createStepExecution({})).rejects.toThrow( + 'Step execution ID is required for creation' + ); + }); + }); + + describe('updateStepExecution', () => { + it('should update a step execution', async () => { + const stepExecution = { id: '1', status: ExecutionStatus.RUNNING }; + await repository.updateStepExecution(stepExecution); + expect(esClient.bulk).toHaveBeenCalledWith({ + index: WORKFLOWS_STEP_EXECUTIONS_INDEX, + refresh: true, + body: [{ update: { _id: '1' } }, { doc: { id: '1', status: ExecutionStatus.RUNNING } }], + }); + }); + + it('should throw an error if ID is missing during update', async () => { + await expect(repository.updateStepExecution({})).rejects.toThrow( + 'Step execution ID is required for update' + ); + }); + }); + + describe('updateStepExecutions', () => { + it('should update multiple step executions', async () => { + const stepExecutions = [ + { id: '1', status: ExecutionStatus.COMPLETED }, + { id: '2', status: ExecutionStatus.FAILED }, + ]; + await repository.updateStepExecutions(stepExecutions); + expect(esClient.bulk).toHaveBeenCalledWith({ + index: WORKFLOWS_STEP_EXECUTIONS_INDEX, + refresh: true, + body: [ + { update: { _id: '1' } }, + { doc: { id: '1', status: ExecutionStatus.COMPLETED } }, + { update: { _id: '2' } }, + { doc: { id: '2', status: ExecutionStatus.FAILED } }, + ], + }); + }); + + it('should throw an error if ID is missing during bulk update', async () => { + await expect(repository.updateStepExecutions([{}])).rejects.toThrow( + 'Step execution ID is required for update' + ); + }); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts index afae192b83b93..a46227ed4fd6d 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/step_execution_repository.ts @@ -30,6 +30,10 @@ export class StepExecutionRepository { * @returns A promise that resolves when the document has been successfully indexed. */ public async createStepExecution(stepExecution: Partial): Promise { + if (!stepExecution.id) { + throw new Error('Step execution ID is required for creation'); + } + await this.esClient.index({ index: this.indexName, id: stepExecution.id, diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.test.ts new file mode 100644 index 0000000000000..a27551ce4378a --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.test.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { ExecutionStatus } from '@kbn/workflows'; +import { WorkflowExecutionRepository } from './workflow_execution_repository'; +import { WORKFLOWS_EXECUTIONS_INDEX } from '../../common'; + +describe('WorkflowExecutionRepository', () => { + let repository: WorkflowExecutionRepository; + let esClient: { index: jest.Mock; update: jest.Mock }; + + beforeEach(() => { + esClient = { + index: jest.fn(), + update: jest.fn(), + }; + repository = new WorkflowExecutionRepository(esClient as any); + }); + + describe('createWorkflowExecution', () => { + it('should create a workflow execution', async () => { + const workflowExecution = { id: '1', workflowId: 'test-workflow' }; + await repository.createWorkflowExecution(workflowExecution); + expect(esClient.index).toHaveBeenCalledWith({ + index: WORKFLOWS_EXECUTIONS_INDEX, + id: '1', + refresh: true, + document: workflowExecution, + }); + }); + + it('should throw an error if ID is missing during creation', async () => { + await expect(repository.createWorkflowExecution({})).rejects.toThrow( + 'Workflow execution ID is required for creation' + ); + }); + }); + + describe('updateWorkflowExecution', () => { + it('should update a workflow execution', async () => { + const workflowExecution = { id: '1', status: ExecutionStatus.RUNNING }; + await repository.updateWorkflowExecution(workflowExecution); + expect(esClient.update).toHaveBeenCalledWith({ + index: WORKFLOWS_EXECUTIONS_INDEX, + id: '1', + refresh: true, + doc: workflowExecution, + }); + }); + + it('should throw an error if ID is missing during update', async () => { + await expect(repository.updateWorkflowExecution({})).rejects.toThrow( + 'Workflow execution ID is required for update' + ); + }); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts index 0968550a20379..7d39dae40ba19 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/repositories/workflow_execution_repository.ts @@ -24,6 +24,10 @@ export class WorkflowExecutionRepository { public async createWorkflowExecution( workflowExecution: Partial ): Promise { + if (!workflowExecution.id) { + throw new Error('Workflow execution ID is required for creation'); + } + await this.esClient.index({ index: this.indexName, id: workflowExecution.id, From aad5c904d443d8231c54b9071b3d523322f60d3d Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Mon, 4 Aug 2025 15:28:04 +0200 Subject: [PATCH 3/6] introduce condition branch node and implement their executors --- .../kbn-workflows/types/execution/index.ts | 9 ++- .../types/execution/nodes/branching_nodes.ts | 18 ++++- .../shared/kbn-workflows/types/latest.ts | 11 ++- .../enter_condition_branch_node_impl.ts | 19 +++++ .../server/step/if_step/enter_if_node_impl.ts | 31 ++++--- .../exit_condition_branch_node_impl.ts | 33 ++++++++ .../server/step/if_step/index.ts | 2 + .../server/step/step_factory.ts | 11 ++- .../workflow_execution_runtime_manager.ts | 10 +++ .../build_execution_graph.ts | 81 ++++++++++++------- 10 files changed, 181 insertions(+), 44 deletions(-) create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts diff --git a/src/platform/packages/shared/kbn-workflows/types/execution/index.ts b/src/platform/packages/shared/kbn-workflows/types/execution/index.ts index c74b5da8dd57a..d12a6386371d6 100644 --- a/src/platform/packages/shared/kbn-workflows/types/execution/index.ts +++ b/src/platform/packages/shared/kbn-workflows/types/execution/index.ts @@ -7,5 +7,12 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -export type { EnterIfNode, ExitIfNode } from './nodes/branching_nodes'; +export type { + EnterIfNode, + ExitIfNode, + EnterConditionBranchNode, + EnterConditionBranchNodeSchema, + ExitConditionBranchNode, + ExitConditionBranchNodeSchema, +} from './nodes/branching_nodes'; export type { EnterForeachNode, ExitForeachNode } from './nodes/loop_nodes'; diff --git a/src/platform/packages/shared/kbn-workflows/types/execution/nodes/branching_nodes.ts b/src/platform/packages/shared/kbn-workflows/types/execution/nodes/branching_nodes.ts index 8cda87bd5ce72..8672607b74c92 100644 --- a/src/platform/packages/shared/kbn-workflows/types/execution/nodes/branching_nodes.ts +++ b/src/platform/packages/shared/kbn-workflows/types/execution/nodes/branching_nodes.ts @@ -13,16 +13,28 @@ import { IfStepSchema } from '../../../spec/schema'; export const EnterIfNodeSchema = z.object({ id: z.string(), type: z.literal('enter-if'), - trueNodeIds: z.array(z.string()), - falseNodeIds: z.array(z.string()), + exitNodeId: z.string(), configuration: IfStepSchema.omit({ steps: true, else: true, }), }); - export type EnterIfNode = z.infer; +export const EnterConditionBranchNodeSchema = z.object({ + id: z.string(), + type: z.literal('enter-condition-branch'), + condition: z.union([z.string(), z.undefined()]), +}); +export type EnterConditionBranchNode = z.infer; + +export const ExitConditionBranchNodeSchema = z.object({ + id: z.string(), + type: z.literal('exit-condition-branch'), + startNodeId: z.string(), +}); +export type ExitConditionBranchNode = z.infer; + export const ExitIfNodeSchema = z.object({ id: z.string(), type: z.literal('exit-if'), diff --git a/src/platform/packages/shared/kbn-workflows/types/latest.ts b/src/platform/packages/shared/kbn-workflows/types/latest.ts index ff0170c40b8e0..e57ec397f6502 100644 --- a/src/platform/packages/shared/kbn-workflows/types/latest.ts +++ b/src/platform/packages/shared/kbn-workflows/types/latest.ts @@ -43,4 +43,13 @@ export { CreateWorkflowCommandSchema, } from './v1'; -export type { EnterIfNode, ExitIfNode, EnterForeachNode, ExitForeachNode } from './execution'; +export type { + EnterIfNode, + ExitIfNode, + EnterConditionBranchNode, + EnterConditionBranchNodeSchema, + ExitConditionBranchNode, + ExitConditionBranchNodeSchema, + EnterForeachNode, + ExitForeachNode, +} from './execution'; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts new file mode 100644 index 0000000000000..81ce50ed497b9 --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { StepImplementation } from '../step_base'; +import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; + +export class EnterConditionBranchNodeImpl implements StepImplementation { + constructor(private workflowState: WorkflowExecutionRuntimeManager) {} + + public async run(): Promise { + this.workflowState.goToNextStep(); + } +} diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts index 209fdb22a39e6..1647757da010f 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -import { EnterIfNode } from '@kbn/workflows'; +import { EnterIfNode, EnterConditionBranchNode } from '@kbn/workflows'; import { StepImplementation } from '../step_base'; import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; @@ -16,20 +16,29 @@ export class EnterIfNodeImpl implements StepImplementation { public async run(): Promise { await this.workflowState.startStep(this.step.id); - const evaluatedConditionResult = this.step.configuration.condition; // must be real condition from step definition + const successors: any[] = this.workflowState.getNodeSuccessors(this.step.id); - let runningBranch: string[]; - let notRunningBranch: string[]; + const thenNode = successors?.find((node) => + Object.hasOwn(node, 'condition') + ) as EnterConditionBranchNode; + // multiple else-if could be implemented similarly to thenNode + const elseNode = successors?.find( + (node) => !Object.hasOwn(node, 'condition') + ) as EnterConditionBranchNode; + + const evaluatedConditionResult = + typeof thenNode.condition === 'boolean' + ? thenNode.condition + : thenNode.condition?.toLowerCase() === 'true'; // must be real condition from step definition) if (evaluatedConditionResult) { - runningBranch = this.step.trueNodeIds; - notRunningBranch = this.step.falseNodeIds; + this.workflowState.goToStep(thenNode.id); + } else if (elseNode) { + this.workflowState.goToStep(elseNode.id); } else { - runningBranch = this.step.falseNodeIds; - notRunningBranch = this.step.trueNodeIds; + // in the case when the condition evaluates to false and no else branch is defined + // we go straight to the exit node skipping "then" branch + this.workflowState.goToStep(this.step.exitNodeId); } - - await this.workflowState.skipSteps(notRunningBranch); - this.workflowState.goToStep(runningBranch[0]); } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts new file mode 100644 index 0000000000000..5cf3b56c796b4 --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { ExitConditionBranchNode } from '@kbn/workflows'; +import { StepImplementation } from '../step_base'; +import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; + +export class ExitConditionBranchNodeImpl implements StepImplementation { + constructor( + private step: ExitConditionBranchNode, + private workflowState: WorkflowExecutionRuntimeManager + ) {} + + public async run(): Promise { + const successors = this.workflowState.getNodeSuccessors(this.step.startNodeId); + + if (successors.length !== 1) { + throw new Error( + `ExitConditionBranchNode with id ${this.step.id} must have exactly one successor, but found ${successors.length}.` + ); + } + + // After the branch finishes, we go to the end of If condition + const exitIfNode = successors[0]; + this.workflowState.goToStep(exitIfNode.id); + } +} diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/index.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/index.ts index 3cf05ead86b4d..2bbe2138e02f4 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/index.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/index.ts @@ -8,4 +8,6 @@ */ export { EnterIfNodeImpl } from './enter_if_node_impl'; +export { EnterConditionBranchNodeImpl } from './enter_condition_branch_node_impl'; +export { ExitConditionBranchNodeImpl } from './exit_condition_branch_node_impl'; export { ExitIfNodeImpl } from './exit_if_node_impl'; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts index 2d7047788cec1..3a3226ced802c 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts @@ -13,7 +13,12 @@ import { StepImplementation } from './step_base'; // Import schema and inferred types import { ConnectorExecutor } from '../connector_executor'; import { ConnectorStepImpl } from './connector_step'; -import { EnterIfNodeImpl, ExitIfNodeImpl } from './if_step'; +import { + EnterConditionBranchNodeImpl, + EnterIfNodeImpl, + ExitIfNodeImpl, + ExitConditionBranchNodeImpl, +} from './if_step'; import { WorkflowExecutionRuntimeManager } from '../workflow_context_manager/workflow_execution_runtime_manager'; import { EnterForeachNodeImpl, ExitForeachNodeImpl } from './foreach_step'; // Import specific step implementations @@ -43,6 +48,10 @@ export class StepFactory { return new ExitForeachNodeImpl(step as any, workflowState); case 'enter-if': return new EnterIfNodeImpl(step as any, workflowState); + case 'enter-condition-branch': + return new EnterConditionBranchNodeImpl(workflowState); + case 'exit-condition-branch': + return new ExitConditionBranchNodeImpl(step as any, workflowState); case 'exit-if': return new ExitIfNodeImpl(step as any, workflowState); case 'atomic': diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts b/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts index 2c6e9e58caffb..e434f12737353 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/workflow_execution_runtime_manager.ts @@ -72,6 +72,16 @@ export class WorkflowExecutionRuntimeManager { return this.workflowExecution.status; } + public getNodeSuccessors(nodeId: string): any[] { + const successors = this.workflowExecutionGraph.successors(nodeId); + + if (!successors) { + return []; + } + + return successors.map((successorId) => this.workflowExecutionGraph.node(successorId)) as any[]; + } + public getCurrentStep(): any { // must be a proper type if (this.currentStepIndex < 0 || this.currentStepIndex >= this.topologicalOrder.length) { diff --git a/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts index 60dc6b4112c8e..c3a0a22df40ed 100644 --- a/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts +++ b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts @@ -17,6 +17,8 @@ import { ExitIfNode, EnterForeachNode, ExitForeachNode, + EnterConditionBranchNode, + ExitConditionBranchNode, } from '@kbn/workflows'; import { omit } from 'lodash'; @@ -47,54 +49,79 @@ function visitAbstractStep(graph: graphlib.Graph, previousStep: any, currentStep } export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentStep: any): any { - const enterIfNodeId = getNodeId(currentStep); + const enterConditionNodeId = getNodeId(currentStep); + const exitConditionNodeId = `exitCondition(${enterConditionNodeId})`; const ifElseStep = currentStep as IfStep; const trueSteps: BaseStep[] = ifElseStep.steps || []; const falseSteps: BaseStep[] = ifElseStep.else || []; - const ifElseNode: EnterIfNode = { - id: enterIfNodeId, + + const conditionNode: EnterIfNode = { + id: enterConditionNodeId, + exitNodeId: exitConditionNodeId, type: 'enter-if', - trueNodeIds: [], - falseNodeIds: [], configuration: { ...omit(ifElseStep, ['steps', 'else']), // No need to include them as they will be represented in the graph }, }; - const ifElseEnd: ExitIfNode = { + const exitConditionNode: ExitIfNode = { type: 'exit-if', - id: enterIfNodeId + '_exit', - startNodeId: enterIfNodeId, + id: exitConditionNodeId, + startNodeId: enterConditionNodeId, + }; + const enterThenBranchNode: EnterConditionBranchNode = { + id: `enterThen(${enterConditionNodeId})`, + type: 'enter-condition-branch', + condition: ifElseStep.condition, }; + graph.setNode(enterThenBranchNode.id, enterThenBranchNode); + graph.setEdge(enterConditionNodeId, enterThenBranchNode.id); trueSteps.forEach((ifTrueCurrentStep: any, index: number) => { - const _previousStep = index > 0 ? trueSteps[index - 1] : ifElseStep; - ifElseNode.trueNodeIds.push(getNodeId(ifTrueCurrentStep)); + const _previousStep = index > 0 ? trueSteps[index - 1] : enterThenBranchNode; const currentNode = visitAbstractStep(graph, _previousStep, ifTrueCurrentStep); graph.setNode(getNodeId(currentNode), currentNode); graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); }); + const exitThenBranchNode: ExitConditionBranchNode = { + id: `exitThen(${enterConditionNodeId})`, + type: 'exit-condition-branch', + startNodeId: enterThenBranchNode.id, + }; + graph.setNode(exitThenBranchNode.id, exitThenBranchNode); + graph.setEdge(getNodeId(trueSteps[trueSteps.length - 1]), exitThenBranchNode.id); + graph.setEdge(exitThenBranchNode.id, exitConditionNode.id); + + if (falseSteps?.length > 0) { + const enterElseBranchNode: EnterConditionBranchNode = { + id: `enterElse(${enterConditionNodeId})`, + type: 'enter-condition-branch', + }; + graph.setNode(enterElseBranchNode.id, enterElseBranchNode); + graph.setEdge(enterConditionNodeId, enterElseBranchNode.id); + falseSteps.forEach((ifFalseCurrentStep: any, index: number) => { + const _previousStep = index > 0 ? falseSteps[index - 1] : enterElseBranchNode; + const currentNode = visitAbstractStep(graph, _previousStep, ifFalseCurrentStep); + graph.setNode(getNodeId(currentNode), currentNode); + graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); + }); + const exitElseBranchNode: ExitConditionBranchNode = { + id: `exitElse(${enterConditionNodeId})`, + type: 'exit-condition-branch', + startNodeId: enterElseBranchNode.id, + }; + graph.setNode(exitElseBranchNode.id, exitElseBranchNode); + graph.setEdge(getNodeId(falseSteps[falseSteps.length - 1]), exitElseBranchNode.id); + graph.setEdge(exitElseBranchNode.id, exitConditionNode.id); + } - falseSteps.forEach((ifFalseCurrentStep: any, index: number) => { - const _previousStep = index > 0 ? falseSteps[index - 1] : ifElseStep; - ifElseNode.falseNodeIds.push(getNodeId(ifFalseCurrentStep)); - const currentNode = visitAbstractStep(graph, _previousStep, ifFalseCurrentStep); - graph.setNode(getNodeId(currentNode), currentNode); - graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); - }); - - const lastIfTrueStep = trueSteps[trueSteps.length - 1]; - const lastIfFalseStep = falseSteps[falseSteps.length - 1]; - - graph.setNode(ifElseEnd.id, ifElseEnd); - graph.setEdge(getNodeId(lastIfTrueStep), ifElseEnd.id); - graph.setEdge(getNodeId(lastIfFalseStep), ifElseEnd.id); - graph.setNode(enterIfNodeId, ifElseNode); + graph.setNode(exitConditionNode.id, exitConditionNode); + graph.setNode(enterConditionNodeId, conditionNode); if (previousStep) { - graph.setEdge(getNodeId(previousStep), enterIfNodeId); + graph.setEdge(getNodeId(previousStep), enterConditionNodeId); } - return ifElseEnd; + return exitConditionNode; } function visitForeachStep(graph: graphlib.Graph, previousStep: any, currentStep: any): any { From ba91a17d15dd819b0a730eaa82897f724fe7ae7c Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Mon, 4 Aug 2025 15:28:26 +0200 Subject: [PATCH 4/6] Add tests for EnterConditionBranchNodeImpl, EnterIfNodeImpl, ExitConditionBranchNodeImpl, and ExitIfNodeImpl --- .../enter_condition_branch_node_impl.test.ts | 28 +++++++ .../if_step/tests/enter_if_node_impl.test.ts | 78 +++++++++++++++++++ .../exit_condition_branch_node_impl.test.ts | 66 ++++++++++++++++ .../if_step/tests/exit_if_node_impl.test.ts | 42 ++++++++++ 4 files changed, 214 insertions(+) create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts create mode 100644 src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts new file mode 100644 index 0000000000000..6076c42b0dbed --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { EnterConditionBranchNodeImpl } from '../enter_condition_branch_node_impl'; +import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manager/workflow_execution_runtime_manager'; + +describe('EnterConditionBranchNodeImpl', () => { + let workflowState: WorkflowExecutionRuntimeManager; + let impl: EnterConditionBranchNodeImpl; + + beforeEach(() => { + workflowState = { + goToNextStep: jest.fn(), + } as any; + impl = new EnterConditionBranchNodeImpl(workflowState); + }); + + it('should go to next step', async () => { + await impl.run(); + expect(workflowState.goToNextStep).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts new file mode 100644 index 0000000000000..05a51735f26cc --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { EnterIfNodeImpl } from '../enter_if_node_impl'; +import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manager/workflow_execution_runtime_manager'; +import { EnterConditionBranchNode, EnterIfNode } from '@kbn/workflows'; + +describe('EnterIfNodeImpl', () => { + let step: EnterIfNode; + let workflowState: WorkflowExecutionRuntimeManager; + let impl: EnterIfNodeImpl; + let startStep: jest.Mock; + let goToStep: jest.Mock; + let getNodeSuccessors: jest.Mock; + + beforeEach(() => { + startStep = jest.fn(); + goToStep = jest.fn(); + getNodeSuccessors = jest.fn(); + step = { id: 'testStep', type: 'enter-if', exitNodeId: 'exitIfNode', configuration: {} as any }; + workflowState = { + startStep, + goToStep, + getNodeSuccessors, + } as any; + impl = new EnterIfNodeImpl(step, workflowState); + + getNodeSuccessors.mockReturnValue([ + { + id: 'thenNode', + type: 'enter-condition-branch', + condition: 'true', + } as EnterConditionBranchNode, + { + id: 'elseNode', + type: 'enter-condition-branch', + } as EnterConditionBranchNode, + ]); + }); + + it('should start the step and go to the next step', async () => { + await impl.run(); + expect(workflowState.startStep).toHaveBeenCalledWith(step.id); + }); + + it('should evaluate condition and go to thenNode if condition is true', async () => { + getNodeSuccessors.mockReturnValueOnce([ + { id: 'thenNode', condition: 'true' }, + { id: 'elseNode' }, + ]); + await impl.run(); + expect(workflowState.goToStep).toHaveBeenCalledTimes(1); + expect(workflowState.goToStep).toHaveBeenCalledWith('thenNode'); + }); + + it('should evaluate condition and go to elseNode if condition is false', async () => { + getNodeSuccessors.mockReturnValueOnce([ + { id: 'thenNode', condition: 'false' }, + { id: 'elseNode' }, + ]); + await impl.run(); + expect(workflowState.goToStep).toHaveBeenCalledTimes(1); + expect(workflowState.goToStep).toHaveBeenCalledWith('elseNode'); + }); + + it('should evaluate condition and go to exit node if no else branch is defined', async () => { + getNodeSuccessors.mockReturnValueOnce([{ id: 'thenNode', condition: 'false' }]); + await impl.run(); + expect(workflowState.goToStep).toHaveBeenCalledTimes(1); + expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts new file mode 100644 index 0000000000000..0995d87ab88fe --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { ExitConditionBranchNode, ExitIfNode } from '@kbn/workflows'; +import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manager/workflow_execution_runtime_manager'; +import { ExitConditionBranchNodeImpl } from '../exit_condition_branch_node_impl'; + +describe('ExitConditionBranchNodeImpl', () => { + let step: ExitConditionBranchNode; + let workflowState: WorkflowExecutionRuntimeManager; + let impl: ExitConditionBranchNodeImpl; + let goToStep: jest.Mock; + let getNodeSuccessors: jest.Mock; + + beforeEach(() => { + goToStep = jest.fn(); + getNodeSuccessors = jest.fn(); + step = { + id: 'testStep', + type: 'exit-condition-branch', + startNodeId: 'startBranchNode', + }; + workflowState = { + goToStep, + getNodeSuccessors, + } as any; + impl = new ExitConditionBranchNodeImpl(step, workflowState); + + getNodeSuccessors.mockReturnValue([ + { + id: 'exitIfNode', + type: 'exit-if', + } as ExitIfNode, + ]); + }); + + it('should raise an error if there are multiple successors', async () => { + getNodeSuccessors.mockReturnValue([ + { id: 'exitIfNode1', type: 'exit-if' }, + { id: 'exitIfNode2', type: 'exit-if' }, + ]); + + await expect(impl.run()).rejects.toThrow( + `ExitConditionBranchNode with id ${step.id} must have exactly one successor, but found 2.` + ); + }); + + it('should raise an error if no successors', async () => { + getNodeSuccessors.mockReturnValue([]); + + await expect(impl.run()).rejects.toThrow( + `ExitConditionBranchNode with id ${step.id} must have exactly one successor, but found 0.` + ); + }); + + it('should go to the exitIfNode after running', async () => { + await impl.run(); + expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts new file mode 100644 index 0000000000000..08e053b3c98d4 --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { ExitIfNode } from '@kbn/workflows'; +import { ExitIfNodeImpl } from '../exit_if_node_impl'; +import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manager/workflow_execution_runtime_manager'; + +describe('ExitIfNodeImpl', () => { + let step: ExitIfNode; + let workflowState: WorkflowExecutionRuntimeManager; + let impl: ExitIfNodeImpl; + + beforeEach(() => { + step = { + id: 'testStep', + type: 'exit-if', + startNodeId: 'enterIfNode', + }; + workflowState = { + goToNextStep: jest.fn(), + finishStep: jest.fn(), + } as any; + impl = new ExitIfNodeImpl(step, workflowState); + }); + + it('should finish enterIfNode', async () => { + await impl.run(); + expect(workflowState.finishStep).toHaveBeenCalledTimes(1); + expect(workflowState.finishStep).toHaveBeenCalledWith('enterIfNode'); + }); + + it('should go to the next step', async () => { + await impl.run(); + expect(workflowState.goToNextStep).toHaveBeenCalledTimes(1); + }); +}); From cee2c86552288150ae3d30e12d4ec3318a77b0e7 Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Mon, 4 Aug 2025 16:11:27 +0200 Subject: [PATCH 5/6] Enforce successor type constraints in EnterIfNodeImpl and ExitConditionBranchNodeImpl - Added validation to ensure EnterIfNodeImpl only has 'enter-condition-branch' successors. - Implemented error handling in ExitConditionBranchNodeImpl to check for 'exit-if' successor type. - Updated tests to cover new validation scenarios for both node implementations. --- .../server/step/if_step/enter_if_node_impl.ts | 10 +++++ .../exit_condition_branch_node_impl.ts | 8 +++- .../if_step/tests/enter_if_node_impl.test.ts | 38 ++++++++++++++++--- .../exit_condition_branch_node_impl.test.ts | 7 ++++ 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts index 1647757da010f..7f3b4b8b795c8 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts @@ -18,6 +18,16 @@ export class EnterIfNodeImpl implements StepImplementation { await this.workflowState.startStep(this.step.id); const successors: any[] = this.workflowState.getNodeSuccessors(this.step.id); + if (successors.some((node) => node.type !== 'enter-condition-branch')) { + throw new Error( + `EnterIfNode with id ${ + this.step.id + } must have only 'enter-condition-branch' successors, but found: ${successors + .map((node) => node.type) + .join(', ')}.` + ); + } + const thenNode = successors?.find((node) => Object.hasOwn(node, 'condition') ) as EnterConditionBranchNode; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts index 5cf3b56c796b4..a48b1544a4dd1 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts @@ -18,7 +18,7 @@ export class ExitConditionBranchNodeImpl implements StepImplementation { ) {} public async run(): Promise { - const successors = this.workflowState.getNodeSuccessors(this.step.startNodeId); + const successors = this.workflowState.getNodeSuccessors(this.step.id); if (successors.length !== 1) { throw new Error( @@ -26,6 +26,12 @@ export class ExitConditionBranchNodeImpl implements StepImplementation { ); } + if (successors[0].type !== 'exit-if') { + throw new Error( + `ExitConditionBranchNode with id ${this.step.id} must have an exit-if successor, but found ${successors[0].type} with id ${successors[0].id}.` + ); + } + // After the branch finishes, we go to the end of If condition const exitIfNode = successors[0]; this.workflowState.goToStep(exitIfNode.id); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts index 05a51735f26cc..6d4bd0edfbfa5 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts @@ -51,8 +51,16 @@ describe('EnterIfNodeImpl', () => { it('should evaluate condition and go to thenNode if condition is true', async () => { getNodeSuccessors.mockReturnValueOnce([ - { id: 'thenNode', condition: 'true' }, - { id: 'elseNode' }, + { + id: 'thenNode', + type: 'enter-condition-branch', + condition: 'true', + } as EnterConditionBranchNode, + { + id: 'elseNode', + type: 'enter-condition-branch', + condition: 'false', + } as EnterConditionBranchNode, ]); await impl.run(); expect(workflowState.goToStep).toHaveBeenCalledTimes(1); @@ -61,8 +69,15 @@ describe('EnterIfNodeImpl', () => { it('should evaluate condition and go to elseNode if condition is false', async () => { getNodeSuccessors.mockReturnValueOnce([ - { id: 'thenNode', condition: 'false' }, - { id: 'elseNode' }, + { + id: 'thenNode', + type: 'enter-condition-branch', + condition: 'false', + } as EnterConditionBranchNode, + { + id: 'elseNode', + type: 'enter-condition-branch', + } as EnterConditionBranchNode, ]); await impl.run(); expect(workflowState.goToStep).toHaveBeenCalledTimes(1); @@ -70,9 +85,22 @@ describe('EnterIfNodeImpl', () => { }); it('should evaluate condition and go to exit node if no else branch is defined', async () => { - getNodeSuccessors.mockReturnValueOnce([{ id: 'thenNode', condition: 'false' }]); + getNodeSuccessors.mockReturnValueOnce([ + { + id: 'thenNode', + type: 'enter-condition-branch', + condition: 'false', + } as EnterConditionBranchNode, + ]); await impl.run(); expect(workflowState.goToStep).toHaveBeenCalledTimes(1); expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); }); + + it('should throw an error if successors are not enter-condition-branch', async () => { + getNodeSuccessors.mockReturnValueOnce([{ id: 'someOtherNode', type: 'some-other-type' }]); + await expect(impl.run()).rejects.toThrow( + `EnterIfNode with id ${step.id} must have only 'enter-condition-branch' successors, but found: some-other-type.` + ); + }); }); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts index 0995d87ab88fe..4b5577ff1201c 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts @@ -59,6 +59,13 @@ describe('ExitConditionBranchNodeImpl', () => { ); }); + it('should raise an error if successor is not exit-if', async () => { + getNodeSuccessors.mockReturnValue([{ id: 'someOtherNode', type: 'some-other-type' }]); + await expect(impl.run()).rejects.toThrow( + `ExitConditionBranchNode with id ${step.id} must have an exit-if successor, but found some-other-type with id someOtherNode.` + ); + }); + it('should go to the exitIfNode after running', async () => { await impl.run(); expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); From 468fce2e7f62bcd312a7e7ac76953eed5d3ed6cf Mon Sep 17 00:00:00 2001 From: Ihor Panasiuk Date: Tue, 5 Aug 2025 09:35:04 +0200 Subject: [PATCH 6/6] refactor: Rename workflowState to wfExecutionRuntimeManager for consistency across step implementations --- .../enter_condition_branch_node_impl.ts | 4 ++-- .../server/step/if_step/enter_if_node_impl.ts | 15 ++++++++------ .../exit_condition_branch_node_impl.ts | 6 +++--- .../server/step/if_step/exit_if_node_impl.ts | 9 ++++++--- .../enter_condition_branch_node_impl.test.ts | 8 ++++---- .../if_step/tests/enter_if_node_impl.test.ts | 20 +++++++++---------- .../exit_condition_branch_node_impl.test.ts | 8 ++++---- .../if_step/tests/exit_if_node_impl.test.ts | 12 +++++------ 8 files changed, 44 insertions(+), 38 deletions(-) diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts index 81ce50ed497b9..80640a1153134 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_condition_branch_node_impl.ts @@ -11,9 +11,9 @@ import { StepImplementation } from '../step_base'; import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; export class EnterConditionBranchNodeImpl implements StepImplementation { - constructor(private workflowState: WorkflowExecutionRuntimeManager) {} + constructor(private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager) {} public async run(): Promise { - this.workflowState.goToNextStep(); + this.wfExecutionRuntimeManager.goToNextStep(); } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts index 7f3b4b8b795c8..a90d1d62ad459 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/enter_if_node_impl.ts @@ -12,11 +12,14 @@ import { StepImplementation } from '../step_base'; import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; export class EnterIfNodeImpl implements StepImplementation { - constructor(private step: EnterIfNode, private workflowState: WorkflowExecutionRuntimeManager) {} + constructor( + private step: EnterIfNode, + private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager + ) {} public async run(): Promise { - await this.workflowState.startStep(this.step.id); - const successors: any[] = this.workflowState.getNodeSuccessors(this.step.id); + await this.wfExecutionRuntimeManager.startStep(this.step.id); + const successors: any[] = this.wfExecutionRuntimeManager.getNodeSuccessors(this.step.id); if (successors.some((node) => node.type !== 'enter-condition-branch')) { throw new Error( @@ -42,13 +45,13 @@ export class EnterIfNodeImpl implements StepImplementation { : thenNode.condition?.toLowerCase() === 'true'; // must be real condition from step definition) if (evaluatedConditionResult) { - this.workflowState.goToStep(thenNode.id); + this.wfExecutionRuntimeManager.goToStep(thenNode.id); } else if (elseNode) { - this.workflowState.goToStep(elseNode.id); + this.wfExecutionRuntimeManager.goToStep(elseNode.id); } else { // in the case when the condition evaluates to false and no else branch is defined // we go straight to the exit node skipping "then" branch - this.workflowState.goToStep(this.step.exitNodeId); + this.wfExecutionRuntimeManager.goToStep(this.step.exitNodeId); } } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts index a48b1544a4dd1..6161a9d72f9ca 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_condition_branch_node_impl.ts @@ -14,11 +14,11 @@ import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/ export class ExitConditionBranchNodeImpl implements StepImplementation { constructor( private step: ExitConditionBranchNode, - private workflowState: WorkflowExecutionRuntimeManager + private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager ) {} public async run(): Promise { - const successors = this.workflowState.getNodeSuccessors(this.step.id); + const successors = this.wfExecutionRuntimeManager.getNodeSuccessors(this.step.id); if (successors.length !== 1) { throw new Error( @@ -34,6 +34,6 @@ export class ExitConditionBranchNodeImpl implements StepImplementation { // After the branch finishes, we go to the end of If condition const exitIfNode = successors[0]; - this.workflowState.goToStep(exitIfNode.id); + this.wfExecutionRuntimeManager.goToStep(exitIfNode.id); } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_if_node_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_if_node_impl.ts index 59334a52a1688..0882c8263a738 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_if_node_impl.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/exit_if_node_impl.ts @@ -12,10 +12,13 @@ import { StepImplementation } from '../step_base'; import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; export class ExitIfNodeImpl implements StepImplementation { - constructor(private step: ExitIfNode, private workflowState: WorkflowExecutionRuntimeManager) {} + constructor( + private step: ExitIfNode, + private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager + ) {} public async run(): Promise { - await this.workflowState.finishStep(this.step.startNodeId); - this.workflowState.goToNextStep(); + await this.wfExecutionRuntimeManager.finishStep(this.step.startNodeId); + this.wfExecutionRuntimeManager.goToNextStep(); } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts index 6076c42b0dbed..42c0dd6298a52 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_condition_branch_node_impl.test.ts @@ -11,18 +11,18 @@ import { EnterConditionBranchNodeImpl } from '../enter_condition_branch_node_imp import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manager/workflow_execution_runtime_manager'; describe('EnterConditionBranchNodeImpl', () => { - let workflowState: WorkflowExecutionRuntimeManager; + let wfExecutionRuntimeManagerMock: WorkflowExecutionRuntimeManager; let impl: EnterConditionBranchNodeImpl; beforeEach(() => { - workflowState = { + wfExecutionRuntimeManagerMock = { goToNextStep: jest.fn(), } as any; - impl = new EnterConditionBranchNodeImpl(workflowState); + impl = new EnterConditionBranchNodeImpl(wfExecutionRuntimeManagerMock); }); it('should go to next step', async () => { await impl.run(); - expect(workflowState.goToNextStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.goToNextStep).toHaveBeenCalledTimes(1); }); }); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts index 6d4bd0edfbfa5..03d7cde67ea58 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/enter_if_node_impl.test.ts @@ -13,7 +13,7 @@ import { EnterConditionBranchNode, EnterIfNode } from '@kbn/workflows'; describe('EnterIfNodeImpl', () => { let step: EnterIfNode; - let workflowState: WorkflowExecutionRuntimeManager; + let wfExecutionRuntimeManagerMock: WorkflowExecutionRuntimeManager; let impl: EnterIfNodeImpl; let startStep: jest.Mock; let goToStep: jest.Mock; @@ -24,12 +24,12 @@ describe('EnterIfNodeImpl', () => { goToStep = jest.fn(); getNodeSuccessors = jest.fn(); step = { id: 'testStep', type: 'enter-if', exitNodeId: 'exitIfNode', configuration: {} as any }; - workflowState = { + wfExecutionRuntimeManagerMock = { startStep, goToStep, getNodeSuccessors, } as any; - impl = new EnterIfNodeImpl(step, workflowState); + impl = new EnterIfNodeImpl(step, wfExecutionRuntimeManagerMock); getNodeSuccessors.mockReturnValue([ { @@ -46,7 +46,7 @@ describe('EnterIfNodeImpl', () => { it('should start the step and go to the next step', async () => { await impl.run(); - expect(workflowState.startStep).toHaveBeenCalledWith(step.id); + expect(wfExecutionRuntimeManagerMock.startStep).toHaveBeenCalledWith(step.id); }); it('should evaluate condition and go to thenNode if condition is true', async () => { @@ -63,8 +63,8 @@ describe('EnterIfNodeImpl', () => { } as EnterConditionBranchNode, ]); await impl.run(); - expect(workflowState.goToStep).toHaveBeenCalledTimes(1); - expect(workflowState.goToStep).toHaveBeenCalledWith('thenNode'); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledWith('thenNode'); }); it('should evaluate condition and go to elseNode if condition is false', async () => { @@ -80,8 +80,8 @@ describe('EnterIfNodeImpl', () => { } as EnterConditionBranchNode, ]); await impl.run(); - expect(workflowState.goToStep).toHaveBeenCalledTimes(1); - expect(workflowState.goToStep).toHaveBeenCalledWith('elseNode'); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledWith('elseNode'); }); it('should evaluate condition and go to exit node if no else branch is defined', async () => { @@ -93,8 +93,8 @@ describe('EnterIfNodeImpl', () => { } as EnterConditionBranchNode, ]); await impl.run(); - expect(workflowState.goToStep).toHaveBeenCalledTimes(1); - expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledWith('exitIfNode'); }); it('should throw an error if successors are not enter-condition-branch', async () => { diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts index 4b5577ff1201c..d4733b2f05fae 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_condition_branch_node_impl.test.ts @@ -13,7 +13,7 @@ import { ExitConditionBranchNodeImpl } from '../exit_condition_branch_node_impl' describe('ExitConditionBranchNodeImpl', () => { let step: ExitConditionBranchNode; - let workflowState: WorkflowExecutionRuntimeManager; + let wfExecutionRuntimeManagerMock: WorkflowExecutionRuntimeManager; let impl: ExitConditionBranchNodeImpl; let goToStep: jest.Mock; let getNodeSuccessors: jest.Mock; @@ -26,11 +26,11 @@ describe('ExitConditionBranchNodeImpl', () => { type: 'exit-condition-branch', startNodeId: 'startBranchNode', }; - workflowState = { + wfExecutionRuntimeManagerMock = { goToStep, getNodeSuccessors, } as any; - impl = new ExitConditionBranchNodeImpl(step, workflowState); + impl = new ExitConditionBranchNodeImpl(step, wfExecutionRuntimeManagerMock); getNodeSuccessors.mockReturnValue([ { @@ -68,6 +68,6 @@ describe('ExitConditionBranchNodeImpl', () => { it('should go to the exitIfNode after running', async () => { await impl.run(); - expect(workflowState.goToStep).toHaveBeenCalledWith('exitIfNode'); + expect(wfExecutionRuntimeManagerMock.goToStep).toHaveBeenCalledWith('exitIfNode'); }); }); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts index 08e053b3c98d4..a31525054b2ff 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/if_step/tests/exit_if_node_impl.test.ts @@ -13,7 +13,7 @@ import { WorkflowExecutionRuntimeManager } from '../../../workflow_context_manag describe('ExitIfNodeImpl', () => { let step: ExitIfNode; - let workflowState: WorkflowExecutionRuntimeManager; + let wfExecutionRuntimeManagerMock: WorkflowExecutionRuntimeManager; let impl: ExitIfNodeImpl; beforeEach(() => { @@ -22,21 +22,21 @@ describe('ExitIfNodeImpl', () => { type: 'exit-if', startNodeId: 'enterIfNode', }; - workflowState = { + wfExecutionRuntimeManagerMock = { goToNextStep: jest.fn(), finishStep: jest.fn(), } as any; - impl = new ExitIfNodeImpl(step, workflowState); + impl = new ExitIfNodeImpl(step, wfExecutionRuntimeManagerMock); }); it('should finish enterIfNode', async () => { await impl.run(); - expect(workflowState.finishStep).toHaveBeenCalledTimes(1); - expect(workflowState.finishStep).toHaveBeenCalledWith('enterIfNode'); + expect(wfExecutionRuntimeManagerMock.finishStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.finishStep).toHaveBeenCalledWith('enterIfNode'); }); it('should go to the next step', async () => { await impl.run(); - expect(workflowState.goToNextStep).toHaveBeenCalledTimes(1); + expect(wfExecutionRuntimeManagerMock.goToNextStep).toHaveBeenCalledTimes(1); }); });