Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
75d7b10
Fix bug when updateWorkflowExecution replaced document instead of par…
skynetigor Aug 4, 2025
1aea7b1
Add validation for ID in create and update methods of Step and Workfl…
skynetigor Aug 4, 2025
c850026
Merge branch 'elastic:main' into 13225-fix-bug-with-replacing-wf-exec…
skynetigor Aug 4, 2025
aad5c90
introduce condition branch node and implement their executors
skynetigor Aug 4, 2025
ba91a17
Add tests for EnterConditionBranchNodeImpl, EnterIfNodeImpl, ExitCond…
skynetigor Aug 4, 2025
cee2c86
Enforce successor type constraints in EnterIfNodeImpl and ExitConditi…
skynetigor Aug 4, 2025
8fa8ad0
Merge branch 'main' into 13176-reconsider-if-nodes
skynetigor Aug 4, 2025
468fce2
refactor: Rename workflowState to wfExecutionRuntimeManager for consi…
skynetigor Aug 5, 2025
12a20c3
Merge branch 'main' into 13176-reconsider-if-nodes-and-write-tests-fo…
skynetigor Aug 5, 2025
97895a9
fix: Update workflow graph conversion to use the correct workflow schema
skynetigor Aug 5, 2025
a8a1e42
fix: Update foreach step handling and adjust workflow graph conversio…
skynetigor Aug 5, 2025
74a584a
test: Add unit tests for convertToWorkflowGraph with foreach step han…
skynetigor Aug 5, 2025
4c5fb77
Merge branch '13176-reconsider-if-nodes-and-write-tests-for-if-step' …
skynetigor Aug 5, 2025
45e1721
fix: Ensure WorkflowSchema type inference is correctly exported
skynetigor Aug 5, 2025
b3f3b2f
test: Enhance unit tests for if and foreach steps in workflow graph c…
skynetigor Aug 5, 2025
9658661
Merge branch 'main' into 13412-Add-unit-tests-for-DAG-builder
skynetigor Aug 5, 2025
7dc8a88
test: Update test descriptions for if and foreach steps in workflow g…
skynetigor Aug 5, 2025
5a29af0
Merge branch 'elastic:main' into 13412-Add-unit-tests-for-DAG-builder
skynetigor Aug 5, 2025
3576f4e
Refactor getNodeId function to ensure all steps have a unique id
skynetigor Aug 5, 2025
d200aba
Add AtomicStepImpl class and update execution graph handling for atom…
skynetigor Aug 5, 2025
d1af342
Implement AtomicStepImpl in StepFactory for atomic step handling
skynetigor Aug 5, 2025
00b1c05
Add AtomicGraphNode type export to execution index
skynetigor Aug 5, 2025
2ccc8ef
Add tests for atomic node in graph
skynetigor Aug 5, 2025
c32a67c
add test for complex graph
skynetigor Aug 5, 2025
6cdb5e0
Fix handling of unknown step types in StepFactory by throwing an error
skynetigor Aug 5, 2025
f1e360c
Remove unused import for ConnectorStepImpl in StepFactory
skynetigor Aug 5, 2025
4287609
Fix bug that caused graph cycling
skynetigor Aug 5, 2025
f511438
add more tests
skynetigor Aug 6, 2025
99e9c20
Refactor visitIfStep to simplify iteration over true and false steps
skynetigor Aug 6, 2025
0259e95
Merge branch 'main' into 13412-Add-unit-tests-for-DAG-builder
skynetigor Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,4 @@ export const WorkflowYamlSchema = z.object({
});

export type WorkflowYaml = z.infer<typeof WorkflowYamlSchema>;
export type WorkflowSchema = z.infer<typeof WorkflowSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export type {
ExitConditionBranchNodeSchema,
} from './nodes/branching_nodes';
export type { EnterForeachNode, ExitForeachNode } from './nodes/loop_nodes';
export type { AtomicGraphNode } from './nodes/base';
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@ export const ExecutionGraphNodeSchema = z.object({
});

export type ExecutionGraphNode = z.infer<typeof ExecutionGraphNodeSchema>;

export const AtomicGraphNodeSchema = z.object({
id: z.string(),
type: z.literal('atomic'),
configuration: z.any(),
});
export type AtomicGraphNode = z.infer<typeof AtomicGraphNodeSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ export type {
ExitConditionBranchNodeSchema,
EnterForeachNode,
ExitForeachNode,
AtomicGraphNode,
} from './execution';
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
import { AtomicGraphNode } from '@kbn/workflows/types/execution/nodes/base';
import { StepImplementation } from '../step_base';
import { ConnectorStepImpl } from '../connector_step';
import { WorkflowContextManager } from '../../workflow_context_manager/workflow_context_manager';
import { ConnectorExecutor } from '../../connector_executor';
import { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager';

/**
* Implements the execution logic for an atomic workflow step.
*
* `AtomicStepImpl` is responsible for running a single atomic step within a workflow.
* It delegates the execution to a `ConnectorStepImpl`, passing the necessary configuration,
* context manager, connector executor, and workflow state.
*
* @remarks
* This class is typically used internally by the workflow execution engine to process
* atomic nodes in the workflow graph.
*
* @param node - The atomic graph node containing step configuration.
* @param contextManager - Manages workflow context and state.
* @param connectorExecutor - Executes connector operations for the step.
* @param workflowState - Manages the runtime state of workflow execution.
*/
export class AtomicStepImpl implements StepImplementation {
constructor(
private node: AtomicGraphNode,
private contextManager: WorkflowContextManager,
private connectorExecutor: ConnectorExecutor,
private workflowState: WorkflowExecutionRuntimeManager
) {}

async run(): Promise<void> {
// This class should decide what action to take based on action type
// like connector, logger, http call, etc.
// for now it only calls ConnectorStepImpl
await new ConnectorStepImpl(
this.node.configuration,
this.contextManager,
this.connectorExecutor,
this.workflowState
).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { WorkflowContextManager } from '../workflow_context_manager/workflow_con
import { StepImplementation } from './step_base';
// Import schema and inferred types
import { ConnectorExecutor } from '../connector_executor';
import { ConnectorStepImpl } from './connector_step';
import {
EnterConditionBranchNodeImpl,
EnterIfNodeImpl,
Expand All @@ -21,6 +20,7 @@ import {
} from './if_step';
import { WorkflowExecutionRuntimeManager } from '../workflow_context_manager/workflow_execution_runtime_manager';
import { EnterForeachNodeImpl, ExitForeachNodeImpl } from './foreach_step';
import { AtomicStepImpl } from './atomic_step/atomic_step_impl';
// Import specific step implementations
// import { ForEachStepImpl } from './foreach-step'; // To be created
// import { IfStepImpl } from './if-step'; // To be created
Expand All @@ -30,7 +30,7 @@ import { EnterForeachNodeImpl, ExitForeachNodeImpl } from './foreach_step';

export class StepFactory {
public create<TStep extends BaseStep>(
step: TStep, // Use z.infer<typeof StepSchema> when fully defined
step: TStep, // TODO: TStep must refer to a node type, not BaseStep (IfElseNode, ForeachNode, etc.)
contextManager: WorkflowContextManager,
connectorExecutor: ConnectorExecutor, // this is temporary, we will remove it when we have a proper connector executor
workflowState: WorkflowExecutionRuntimeManager
Expand All @@ -55,13 +55,13 @@ export class StepFactory {
case 'exit-if':
return new ExitIfNodeImpl(step as any, workflowState);
case 'atomic':
// return new AtomicStepImpl(step as AtomicStep, contextManager);
return new AtomicStepImpl(step as any, contextManager, connectorExecutor, workflowState);
case 'parallel':
// return new ParallelStepImpl(step as ParallelStep, contextManager);
case 'merge':
// return new MergeStepImpl(step as MergeStep, contextManager);
default:
return new ConnectorStepImpl(step as any, contextManager, connectorExecutor, workflowState);
throw new Error(`Unknown node type: ${stepType}`);
}
}
}
Loading