diff --git a/src/platform/packages/shared/kbn-workflows/spec/schema.ts b/src/platform/packages/shared/kbn-workflows/spec/schema.ts index 85d1b721a2b64..4a24bbdf6a5fb 100644 --- a/src/platform/packages/shared/kbn-workflows/spec/schema.ts +++ b/src/platform/packages/shared/kbn-workflows/spec/schema.ts @@ -233,3 +233,4 @@ export const WorkflowYamlSchema = z.object({ }); export type WorkflowYaml = z.infer; +export type WorkflowSchema = z.infer; diff --git a/src/platform/packages/shared/kbn-workflows/types/execution/index.ts b/src/platform/packages/shared/kbn-workflows/types/execution/index.ts index d12a6386371d6..aadc5691b7984 100644 --- a/src/platform/packages/shared/kbn-workflows/types/execution/index.ts +++ b/src/platform/packages/shared/kbn-workflows/types/execution/index.ts @@ -16,3 +16,4 @@ export type { ExitConditionBranchNodeSchema, } from './nodes/branching_nodes'; export type { EnterForeachNode, ExitForeachNode } from './nodes/loop_nodes'; +export type { AtomicGraphNode } from './nodes/base'; diff --git a/src/platform/packages/shared/kbn-workflows/types/execution/nodes/base.ts b/src/platform/packages/shared/kbn-workflows/types/execution/nodes/base.ts index 6d46e6a17f311..570202776cef0 100644 --- a/src/platform/packages/shared/kbn-workflows/types/execution/nodes/base.ts +++ b/src/platform/packages/shared/kbn-workflows/types/execution/nodes/base.ts @@ -16,3 +16,10 @@ export const ExecutionGraphNodeSchema = z.object({ }); export type ExecutionGraphNode = z.infer; + +export const AtomicGraphNodeSchema = z.object({ + id: z.string(), + type: z.literal('atomic'), + configuration: z.any(), +}); +export type AtomicGraphNode = z.infer; diff --git a/src/platform/packages/shared/kbn-workflows/types/latest.ts b/src/platform/packages/shared/kbn-workflows/types/latest.ts index e57ec397f6502..2f69db4f016d8 100644 --- a/src/platform/packages/shared/kbn-workflows/types/latest.ts +++ b/src/platform/packages/shared/kbn-workflows/types/latest.ts @@ -52,4 +52,5 @@ export type { ExitConditionBranchNodeSchema, EnterForeachNode, ExitForeachNode, + AtomicGraphNode, } from './execution'; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/atomic_step/atomic_step_impl.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/atomic_step/atomic_step_impl.ts new file mode 100644 index 0000000000000..439183d671adf --- /dev/null +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/atomic_step/atomic_step_impl.ts @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +import { AtomicGraphNode } from '@kbn/workflows/types/execution/nodes/base'; +import { StepImplementation } from '../step_base'; +import { ConnectorStepImpl } from '../connector_step'; +import { WorkflowContextManager } from '../../workflow_context_manager/workflow_context_manager'; +import { ConnectorExecutor } from '../../connector_executor'; +import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager'; + +/** + * Implements the execution logic for an atomic workflow step. + * + * `AtomicStepImpl` is responsible for running a single atomic step within a workflow. + * It delegates the execution to a `ConnectorStepImpl`, passing the necessary configuration, + * context manager, connector executor, and workflow state. + * + * @remarks + * This class is typically used internally by the workflow execution engine to process + * atomic nodes in the workflow graph. + * + * @param node - The atomic graph node containing step configuration. + * @param contextManager - Manages workflow context and state. + * @param connectorExecutor - Executes connector operations for the step. + * @param workflowState - Manages the runtime state of workflow execution. + */ +export class AtomicStepImpl implements StepImplementation { + constructor( + private node: AtomicGraphNode, + private contextManager: WorkflowContextManager, + private connectorExecutor: ConnectorExecutor, + private workflowState: WorkflowExecutionRuntimeManager + ) {} + + async run(): Promise { + // This class should decide what action to take based on action type + // like connector, logger, http call, etc. + // for now it only calls ConnectorStepImpl + await new ConnectorStepImpl( + this.node.configuration, + this.contextManager, + this.connectorExecutor, + this.workflowState + ).run(); + } +} diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts b/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts index 3a3226ced802c..c460f0e83d10c 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/step/step_factory.ts @@ -12,7 +12,6 @@ import { WorkflowContextManager } from '../workflow_context_manager/workflow_con import { StepImplementation } from './step_base'; // Import schema and inferred types import { ConnectorExecutor } from '../connector_executor'; -import { ConnectorStepImpl } from './connector_step'; import { EnterConditionBranchNodeImpl, EnterIfNodeImpl, @@ -21,6 +20,7 @@ import { } from './if_step'; import { WorkflowExecutionRuntimeManager } from '../workflow_context_manager/workflow_execution_runtime_manager'; import { EnterForeachNodeImpl, ExitForeachNodeImpl } from './foreach_step'; +import { AtomicStepImpl } from './atomic_step/atomic_step_impl'; // Import specific step implementations // import { ForEachStepImpl } from './foreach-step'; // To be created // import { IfStepImpl } from './if-step'; // To be created @@ -30,7 +30,7 @@ import { EnterForeachNodeImpl, ExitForeachNodeImpl } from './foreach_step'; export class StepFactory { public create( - step: TStep, // Use z.infer when fully defined + step: TStep, // TODO: TStep must refer to a node type, not BaseStep (IfElseNode, ForeachNode, etc.) contextManager: WorkflowContextManager, connectorExecutor: ConnectorExecutor, // this is temporary, we will remove it when we have a proper connector executor workflowState: WorkflowExecutionRuntimeManager @@ -55,13 +55,13 @@ export class StepFactory { case 'exit-if': return new ExitIfNodeImpl(step as any, workflowState); case 'atomic': - // return new AtomicStepImpl(step as AtomicStep, contextManager); + return new AtomicStepImpl(step as any, contextManager, connectorExecutor, workflowState); case 'parallel': // return new ParallelStepImpl(step as ParallelStep, contextManager); case 'merge': // return new MergeStepImpl(step as MergeStep, contextManager); default: - return new ConnectorStepImpl(step as any, contextManager, connectorExecutor, workflowState); + throw new Error(`Unknown node type: ${stepType}`); } } } diff --git a/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.test.ts b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.test.ts new file mode 100644 index 0000000000000..1b2e827f376f2 --- /dev/null +++ b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.test.ts @@ -0,0 +1,450 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import { + IfStep, + ForEachStep, + EnterIfNode, + ExitIfNode, + EnterForeachNode, + ExitForeachNode, + WorkflowSchema, + EnterConditionBranchNode, + ExitConditionBranchNode, + ConnectorStep, + AtomicGraphNode, +} from '@kbn/workflows'; +import { convertToWorkflowGraph } from './build_execution_graph'; +import { graphlib } from '@dagrejs/dagre'; + +describe('convertToWorkflowGraph', () => { + describe('atomic steps', () => { + const workflowDefinition = { + steps: [ + { + name: 'testAtomicStep1', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from atomic step 1', + }, + } as ConnectorStep, + { + name: 'testAtomicStep2', + type: 'openai', + connectorId: 'openai', + with: { + message: 'Hello from atomic step 2', + }, + } as ConnectorStep, + ], + } as Partial; + + it('should return nodes for atomic step in correct topological order', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const topSort = graphlib.alg.topsort(executionGraph); + expect(topSort).toHaveLength(2); + expect(topSort).toEqual(['testAtomicStep1', 'testAtomicStep2']); + }); + + it('should return correct edges for atomic step graph', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const edges = executionGraph.edges(); + expect(edges).toEqual([{ v: 'testAtomicStep1', w: 'testAtomicStep2' }]); + }); + + it('should configure the atomic step correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const node = executionGraph.node('testAtomicStep1'); + expect(node).toEqual({ + id: 'testAtomicStep1', + type: 'atomic', + configuration: { + name: 'testAtomicStep1', + type: 'slack', + connectorId: 'slack', + with: { message: 'Hello from atomic step 1' }, + }, + } as AtomicGraphNode); + }); + }); + + describe('if step', () => { + const workflowDefinition = { + steps: [ + { + name: 'testIfStep', + type: 'if', + condition: 'true', + steps: [ + { + name: 'firstThenTestConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from then step 1', + }, + } as ConnectorStep, + { + name: 'secondThenTestConnectorStep', + type: 'openai', + connectorId: 'openai', + with: { + message: 'Hello from then nested step 2', + }, + } as ConnectorStep, + ], + else: [ + { + name: 'elseTestConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from else nested step', + }, + } as ConnectorStep, + ], + } as IfStep, + ], + } as Partial; + + it('should return nodes for if condition in correct topological order', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const topSort = graphlib.alg.topsort(executionGraph); + expect(topSort).toHaveLength(9); + expect(topSort).toEqual([ + 'testIfStep', + 'enterThen(testIfStep)', + 'firstThenTestConnectorStep', + 'secondThenTestConnectorStep', + 'exitThen(testIfStep)', + 'enterElse(testIfStep)', + 'elseTestConnectorStep', + 'exitElse(testIfStep)', + 'exitCondition(testIfStep)', + ]); + }); + + it('should return correct edges for if step graph', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const edges = executionGraph.edges(); + expect(edges).toEqual( + expect.arrayContaining([ + { v: 'testIfStep', w: 'enterThen(testIfStep)' }, + { v: 'enterThen(testIfStep)', w: 'firstThenTestConnectorStep' }, + { v: 'firstThenTestConnectorStep', w: 'secondThenTestConnectorStep' }, + { v: 'secondThenTestConnectorStep', w: 'exitThen(testIfStep)' }, + { v: 'testIfStep', w: 'enterElse(testIfStep)' }, + { v: 'enterElse(testIfStep)', w: 'elseTestConnectorStep' }, + { v: 'elseTestConnectorStep', w: 'exitElse(testIfStep)' }, + { v: 'exitThen(testIfStep)', w: 'exitCondition(testIfStep)' }, + { v: 'exitElse(testIfStep)', w: 'exitCondition(testIfStep)' }, + ]) + ); + expect(edges).toHaveLength(9); + }); + + it('should configure enter-if node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const enterIfNode = executionGraph.node('testIfStep'); + expect(enterIfNode).toEqual({ + id: 'testIfStep', + type: 'enter-if', + exitNodeId: 'exitCondition(testIfStep)', + configuration: { + name: 'testIfStep', + type: 'if', + condition: 'true', + }, + } as EnterIfNode); + }); + + it('should configure enter-then branch node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const enterThenBranchNode = executionGraph.node('enterThen(testIfStep)'); + expect(enterThenBranchNode).toEqual({ + id: 'enterThen(testIfStep)', + type: 'enter-condition-branch', + condition: 'true', + } as EnterConditionBranchNode); + }); + + it('should configure exit-then branch node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const exitThenBranchNode = executionGraph.node('exitThen(testIfStep)'); + expect(exitThenBranchNode).toEqual({ + id: 'exitThen(testIfStep)', + type: 'exit-condition-branch', + startNodeId: 'enterThen(testIfStep)', + } as ExitConditionBranchNode); + }); + + it('should configure enter-else branch node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const enterElseBranchNode = executionGraph.node('enterElse(testIfStep)'); + expect(enterElseBranchNode).toEqual({ + id: 'enterElse(testIfStep)', + type: 'enter-condition-branch', + condition: undefined, + } as EnterConditionBranchNode); + }); + + it('should configure exit-else branch node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const exitElseBranchNode = executionGraph.node('exitElse(testIfStep)'); + expect(exitElseBranchNode).toEqual({ + id: 'exitElse(testIfStep)', + type: 'exit-condition-branch', + startNodeId: 'enterElse(testIfStep)', + } as ExitConditionBranchNode); + }); + + it('should configure exit-if node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const exitConditionNode = executionGraph.node('exitCondition(testIfStep)'); + expect(exitConditionNode).toEqual({ + id: 'exitCondition(testIfStep)', + type: 'exit-if', + startNodeId: 'testIfStep', + } as ExitIfNode); + }); + + describe('if step without else branch', () => { + const workflowDefinitionWithoutElse = { + steps: [ + { + name: 'testIfStepWithoutElse', + type: 'if', + condition: 'true', + steps: [ + { + name: 'thenTestConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from then step', + }, + } as ConnectorStep, + ], + } as IfStep, + ], + } as Partial; + + it('should handle if step without else branch correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinitionWithoutElse as any); + const topSort = graphlib.alg.topsort(executionGraph); + expect(topSort).toHaveLength(5); + expect(topSort).toEqual([ + 'testIfStepWithoutElse', + 'enterThen(testIfStepWithoutElse)', + 'thenTestConnectorStep', + 'exitThen(testIfStepWithoutElse)', + 'exitCondition(testIfStepWithoutElse)', + ]); + }); + }); + }); + + describe('foreach step', () => { + const workflowDefinition = { + steps: [ + { + name: 'testForeachStep', + foreach: '["item1", "item2", "item3"]', + type: 'foreach', + steps: [ + { + name: 'firstTestForeachConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from foreach nested step 1', + }, + } as ConnectorStep, + { + name: 'secondTestForeachConnectorStep', + type: 'openai', + connectorId: 'openai', + with: { + message: 'Hello from foreach nested step 2', + }, + } as ConnectorStep, + ], + } as ForEachStep, + ], + } as Partial; + + it('should return nodes for foreach step in correct topological order', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const topSort = graphlib.alg.topsort(executionGraph); + expect(topSort).toHaveLength(4); + expect(topSort).toEqual([ + 'testForeachStep', + 'firstTestForeachConnectorStep', + 'secondTestForeachConnectorStep', + 'exitForeach(testForeachStep)', + ]); + }); + + it('should return correct edges for foreach step graph', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const edges = executionGraph.edges(); + expect(edges).toEqual( + expect.arrayContaining([ + { v: 'testForeachStep', w: 'firstTestForeachConnectorStep' }, + { v: 'firstTestForeachConnectorStep', w: 'secondTestForeachConnectorStep' }, + { v: 'secondTestForeachConnectorStep', w: 'exitForeach(testForeachStep)' }, + ]) + ); + expect(edges).toHaveLength(3); + }); + + it('should configure enter-foreach node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const enterForeachNode = executionGraph.node('testForeachStep'); + expect(enterForeachNode).toEqual({ + id: 'testForeachStep', + type: 'enter-foreach', + itemNodeIds: ['firstTestForeachConnectorStep', 'secondTestForeachConnectorStep'], + configuration: { + foreach: '["item1", "item2", "item3"]', + name: 'testForeachStep', + type: 'foreach', + }, + } as EnterForeachNode); + }); + + it('should configure exit-foreach node correctly', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const exitForeachNode = executionGraph.node('exitForeach(testForeachStep)'); + expect(exitForeachNode).toEqual({ + type: 'exit-foreach', + id: 'exitForeach(testForeachStep)', + startNodeId: 'testForeachStep', + } as ExitForeachNode); + }); + + describe('nested foreach steps', () => { + const nestedWorkflowDefinition = { + steps: [ + { + name: 'outerForeachStep', + foreach: '["outer1", "outer2"]', + type: 'foreach', + steps: [ + { + name: 'innerForeachStep', + foreach: '["inner1", "inner2"]', + type: 'foreach', + steps: [ + { + name: 'nestedConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from nested step', + }, + } as ConnectorStep, + ], + } as ForEachStep, + ], + } as ForEachStep, + ], + } as Partial; + + it('should handle nested foreach steps correctly', () => { + const executionGraph = convertToWorkflowGraph(nestedWorkflowDefinition as any); + const topSort = graphlib.alg.topsort(executionGraph); + expect(topSort).toHaveLength(5); + expect(topSort).toEqual([ + 'outerForeachStep', + 'innerForeachStep', + 'nestedConnectorStep', + 'exitForeach(innerForeachStep)', + 'exitForeach(outerForeachStep)', + ]); + }); + }); + }); + + describe('complex workflow', () => { + const workflowDefinition = { + steps: [ + { + name: 'firstConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from first step', + }, + } as ConnectorStep, + { + name: 'testForeachStep', + foreach: '["item1", "item2", "item3"]', + type: 'foreach', + steps: [ + { + name: 'testIfStep', + type: 'if', + condition: 'true', + steps: [ + { + name: 'firstThenTestConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from then step 1', + }, + } as ConnectorStep, + { + name: 'secondThenTestConnectorStep', + type: 'openai', + connectorId: 'openai', + with: { + message: 'Hello from then nested step 2', + }, + } as ConnectorStep, + ], + else: [ + { + name: 'elseTestConnectorStep', + type: 'slack', + connectorId: 'slack', + with: { + message: 'Hello from else nested step', + }, + } as ConnectorStep, + ], + } as IfStep, + ], + } as ForEachStep, + ], + } as Partial; + + it('should have correctly structured graph for complex nodes', () => { + const executionGraph = convertToWorkflowGraph(workflowDefinition as any); + const topsort = graphlib.alg.topsort(executionGraph); + const expectedComplexOrder = [ + 'firstConnectorStep', + 'testForeachStep', + 'testIfStep', + 'enterThen(testIfStep)', + 'firstThenTestConnectorStep', + 'secondThenTestConnectorStep', + 'exitThen(testIfStep)', + 'enterElse(testIfStep)', + 'elseTestConnectorStep', + 'exitElse(testIfStep)', + 'exitCondition(testIfStep)', + 'exitForeach(testForeachStep)', + ]; + expect(topsort).toEqual(expectedComplexOrder); + }); + }); +}); diff --git a/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts index c3a0a22df40ed..4bf045a9efbd2 100644 --- a/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts +++ b/src/platform/plugins/shared/workflows_management/common/lib/build_execution_graph/build_execution_graph.ts @@ -12,13 +12,14 @@ import { BaseStep, IfStep, ForEachStep, - WorkflowExecutionEngineModel, EnterIfNode, ExitIfNode, EnterForeachNode, ExitForeachNode, + WorkflowSchema, EnterConditionBranchNode, ExitConditionBranchNode, + AtomicGraphNode, } from '@kbn/workflows'; import { omit } from 'lodash'; @@ -29,8 +30,6 @@ function getNodeId(node: BaseStep): string { } function visitAbstractStep(graph: graphlib.Graph, previousStep: any, currentStep: any): any { - const currentStepId = getNodeId(currentStep); - if (currentStep.type === 'if') { return visitIfStep(graph, previousStep, currentStep); } @@ -39,13 +38,24 @@ function visitAbstractStep(graph: graphlib.Graph, previousStep: any, currentStep return visitForeachStep(graph, previousStep, currentStep); } - graph.setNode(getNodeId(currentStep), currentStep); + return visitAtomicStep(graph, previousStep, currentStep); +} + +export function visitAtomicStep(graph: graphlib.Graph, previousStep: any, currentStep: any): any { + const atomicNode: AtomicGraphNode = { + id: getNodeId(currentStep), + type: 'atomic', + configuration: { + ...currentStep, + }, + }; + graph.setNode(atomicNode.id, atomicNode); if (previousStep) { - graph.setEdge(getNodeId(previousStep), currentStepId); + graph.setEdge(getNodeId(previousStep), atomicNode.id); } - return currentStep; + return atomicNode; } export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentStep: any): any { @@ -76,11 +86,12 @@ export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentSte graph.setNode(enterThenBranchNode.id, enterThenBranchNode); graph.setEdge(enterConditionNodeId, enterThenBranchNode.id); - trueSteps.forEach((ifTrueCurrentStep: any, index: number) => { - const _previousStep = index > 0 ? trueSteps[index - 1] : enterThenBranchNode; - const currentNode = visitAbstractStep(graph, _previousStep, ifTrueCurrentStep); + let thenPreviousStep: any = enterThenBranchNode; + trueSteps.forEach((ifTrueCurrentStep: any) => { + const currentNode = visitAbstractStep(graph, thenPreviousStep, ifTrueCurrentStep); graph.setNode(getNodeId(currentNode), currentNode); - graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); + graph.setEdge(getNodeId(thenPreviousStep), getNodeId(currentNode)); + thenPreviousStep = currentNode; }); const exitThenBranchNode: ExitConditionBranchNode = { id: `exitThen(${enterConditionNodeId})`, @@ -88,7 +99,7 @@ export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentSte startNodeId: enterThenBranchNode.id, }; graph.setNode(exitThenBranchNode.id, exitThenBranchNode); - graph.setEdge(getNodeId(trueSteps[trueSteps.length - 1]), exitThenBranchNode.id); + graph.setEdge(getNodeId(thenPreviousStep), exitThenBranchNode.id); graph.setEdge(exitThenBranchNode.id, exitConditionNode.id); if (falseSteps?.length > 0) { @@ -98,11 +109,12 @@ export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentSte }; graph.setNode(enterElseBranchNode.id, enterElseBranchNode); graph.setEdge(enterConditionNodeId, enterElseBranchNode.id); - falseSteps.forEach((ifFalseCurrentStep: any, index: number) => { - const _previousStep = index > 0 ? falseSteps[index - 1] : enterElseBranchNode; - const currentNode = visitAbstractStep(graph, _previousStep, ifFalseCurrentStep); + let elsePreviousStep: any = enterElseBranchNode; + falseSteps.forEach((ifFalseCurrentStep: any) => { + const currentNode = visitAbstractStep(graph, elsePreviousStep, ifFalseCurrentStep); graph.setNode(getNodeId(currentNode), currentNode); - graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); + graph.setEdge(getNodeId(elsePreviousStep), getNodeId(currentNode)); + elsePreviousStep = currentNode; }); const exitElseBranchNode: ExitConditionBranchNode = { id: `exitElse(${enterConditionNodeId})`, @@ -110,7 +122,7 @@ export function visitIfStep(graph: graphlib.Graph, previousStep: any, currentSte startNodeId: enterElseBranchNode.id, }; graph.setNode(exitElseBranchNode.id, exitElseBranchNode); - graph.setEdge(getNodeId(falseSteps[falseSteps.length - 1]), exitElseBranchNode.id); + graph.setEdge(getNodeId(elsePreviousStep), exitElseBranchNode.id); graph.setEdge(exitElseBranchNode.id, exitConditionNode.id); } @@ -139,22 +151,21 @@ function visitForeachStep(graph: graphlib.Graph, previousStep: any, currentStep: }; const exitForeachNode: ExitForeachNode = { type: 'exit-foreach', - id: enterForeachNodeId + '_exit', + id: `exitForeach(${enterForeachNodeId})`, startNodeId: enterForeachNodeId, }; - foreachNestedSteps.forEach((step: any, index: number) => { - const _previousStep = index > 0 ? foreachNestedSteps[index - 1] : foreachStep; + let previousNodeToLink: any = enterForeachNode; + foreachNestedSteps.forEach((step: any) => { enterForeachNode.itemNodeIds.push(getNodeId(step)); - const currentNode = visitAbstractStep(graph, _previousStep, step); + const currentNode = visitAbstractStep(graph, previousNodeToLink, step); graph.setNode(getNodeId(currentNode), currentNode); - graph.setEdge(getNodeId(previousStep), getNodeId(currentNode)); + graph.setEdge(getNodeId(previousNodeToLink), getNodeId(currentNode)); + previousNodeToLink = currentNode; }); - const lastNestedForeachStep = foreachNestedSteps[foreachNestedSteps.length - 1]; - graph.setNode(exitForeachNode.id, exitForeachNode); - graph.setEdge(getNodeId(lastNestedForeachStep), exitForeachNode.id); + graph.setEdge(getNodeId(previousNodeToLink), exitForeachNode.id); graph.setNode(enterForeachNodeId, enterForeachNode); if (previousStep) { @@ -164,12 +175,13 @@ function visitForeachStep(graph: graphlib.Graph, previousStep: any, currentStep: return exitForeachNode; } -export function convertToWorkflowGraph(workflow: WorkflowExecutionEngineModel): graphlib.Graph { +export function convertToWorkflowGraph(workflowSchema: WorkflowSchema): graphlib.Graph { const graph = new graphlib.Graph({ directed: true }); - let previousStep: BaseStep | null = null; + let previousNode: any | null = null; - workflow.definition.workflow.steps.forEach((currentStep, index) => { - previousStep = visitAbstractStep(graph, previousStep, currentStep); + workflowSchema.steps.forEach((currentStep, index) => { + const currentNode = visitAbstractStep(graph, previousNode, currentStep); + previousNode = currentNode; }); return graph; diff --git a/src/platform/plugins/shared/workflows_management/server/scheduler/scheduler_service.ts b/src/platform/plugins/shared/workflows_management/server/scheduler/scheduler_service.ts index acf4ff9c029f7..42d62c1a88f79 100644 --- a/src/platform/plugins/shared/workflows_management/server/scheduler/scheduler_service.ts +++ b/src/platform/plugins/shared/workflows_management/server/scheduler/scheduler_service.ts @@ -60,7 +60,7 @@ export class SchedulerService { workflow: WorkflowExecutionEngineModel, inputs: Record ): Promise { - const executionGraph = convertToWorkflowGraph(workflow); + const executionGraph = convertToWorkflowGraph(workflow.definition.workflow); workflow.executionGraph = convertToSerializableGraph(executionGraph); // TODO: It's not good approach, it's temporary const connectorCredentials = await extractConnectorIds(this.actionsClient); diff --git a/src/platform/plugins/shared/workflows_management/server/tasks/workflow_task_runner.ts b/src/platform/plugins/shared/workflows_management/server/tasks/workflow_task_runner.ts index d5255fcbc7c70..765cb373c82c5 100644 --- a/src/platform/plugins/shared/workflows_management/server/tasks/workflow_task_runner.ts +++ b/src/platform/plugins/shared/workflows_management/server/tasks/workflow_task_runner.ts @@ -59,7 +59,7 @@ export function createWorkflowTaskRunner({ } // Convert to execution model - const executionGraph = convertToWorkflowGraph(workflow); + const executionGraph = convertToWorkflowGraph(workflow.definition.workflow); const workflowExecutionModel: WorkflowExecutionEngineModel = { id: workflow.id, name: workflow.name,