Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ function createIfGraph(ifStep: IfStep, context: GraphBuildContext): graphlib.Gra
};
const enterThenBranchNode: EnterConditionBranchNode = {
id: `enterThen(${enterConditionNodeId})`,
type: 'enter-condition-branch',
type: 'enter-then-branch',
condition: ifElseStep.condition,
};

Expand All @@ -197,7 +197,7 @@ function createIfGraph(ifStep: IfStep, context: GraphBuildContext): graphlib.Gra

const exitThenBranchNode: ExitConditionBranchNode = {
id: `exitThen(${enterConditionNodeId})`,
type: 'exit-condition-branch',
type: 'exit-then-branch',
startNodeId: enterThenBranchNode.id,
};
const thenGraph = createStepsSequence(trueSteps, context);
Expand All @@ -208,13 +208,13 @@ function createIfGraph(ifStep: IfStep, context: GraphBuildContext): graphlib.Gra
if (falseSteps?.length > 0) {
const enterElseBranchNode: EnterConditionBranchNode = {
id: `enterElse(${enterConditionNodeId})`,
type: 'enter-condition-branch',
type: 'enter-else-branch',
};
graph.setNode(enterElseBranchNode.id, enterElseBranchNode);
graph.setEdge(enterConditionNodeId, enterElseBranchNode.id);
const exitElseBranchNode: ExitConditionBranchNode = {
id: `exitElse(${enterConditionNodeId})`,
type: 'exit-condition-branch',
type: 'exit-else-branch',
startNodeId: enterElseBranchNode.id,
};
const elseGraph = createStepsSequence(falseSteps, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ describe('convertToWorkflowGraph', () => {
const enterThenBranchNode = executionGraph.node('enterThen(testIfStep)');
expect(enterThenBranchNode).toEqual({
id: 'enterThen(testIfStep)',
type: 'enter-condition-branch',
type: 'enter-then-branch',
condition: 'true',
} as EnterConditionBranchNode);
});
Expand All @@ -379,7 +379,7 @@ describe('convertToWorkflowGraph', () => {
const exitThenBranchNode = executionGraph.node('exitThen(testIfStep)');
expect(exitThenBranchNode).toEqual({
id: 'exitThen(testIfStep)',
type: 'exit-condition-branch',
type: 'exit-then-branch',
startNodeId: 'enterThen(testIfStep)',
} as ExitConditionBranchNode);
});
Expand All @@ -389,7 +389,7 @@ describe('convertToWorkflowGraph', () => {
const enterElseBranchNode = executionGraph.node('enterElse(testIfStep)');
expect(enterElseBranchNode).toEqual({
id: 'enterElse(testIfStep)',
type: 'enter-condition-branch',
type: 'enter-else-branch',
condition: undefined,
} as EnterConditionBranchNode);
});
Expand All @@ -399,7 +399,7 @@ describe('convertToWorkflowGraph', () => {
const exitElseBranchNode = executionGraph.node('exitElse(testIfStep)');
expect(exitElseBranchNode).toEqual({
id: 'exitElse(testIfStep)',
type: 'exit-condition-branch',
type: 'exit-else-branch',
startNodeId: 'enterElse(testIfStep)',
} as ExitConditionBranchNode);
});
Expand Down Expand Up @@ -537,7 +537,7 @@ describe('convertToWorkflowGraph', () => {
const enterThenBranchNode = executionGraph.node('enterThen(if_firstThenTestConnectorStep)');
expect(enterThenBranchNode).toEqual({
id: 'enterThen(if_firstThenTestConnectorStep)',
type: 'enter-condition-branch',
type: 'enter-then-branch',
condition: 'false',
} as EnterConditionBranchNode);
});
Expand All @@ -555,7 +555,7 @@ describe('convertToWorkflowGraph', () => {
const exitThenBranchNode = executionGraph.node('exitThen(if_firstThenTestConnectorStep)');
expect(exitThenBranchNode).toEqual({
id: 'exitThen(if_firstThenTestConnectorStep)',
type: 'exit-condition-branch',
type: 'exit-then-branch',
startNodeId: 'enterThen(if_firstThenTestConnectorStep)',
} as ExitConditionBranchNode);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ export type EnterIfNode = z.infer<typeof EnterIfNodeSchema>;

export const EnterConditionBranchNodeSchema = z.object({
id: z.string(),
type: z.literal('enter-condition-branch'),
type: z.union([z.literal('enter-then-branch'), z.literal('enter-else-branch')]),
condition: z.union([z.string(), z.undefined()]),
});
export type EnterConditionBranchNode = z.infer<typeof EnterConditionBranchNodeSchema>;

export const ExitConditionBranchNodeSchema = z.object({
id: z.string(),
type: z.literal('exit-condition-branch'),
type: z.union([z.literal('exit-then-branch'), z.literal('exit-else-branch')]),
startNodeId: z.string(),
});
export type ExitConditionBranchNode = z.infer<typeof ExitConditionBranchNodeSchema>;
Expand Down
3 changes: 3 additions & 0 deletions src/platform/packages/shared/kbn-workflows/types/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ export interface EsWorkflowStepExecution {
spaceId: string;
id: string;
stepId: string;

/** Current step's scope path */
path: string[];
workflowRunId: string;
workflowId: string;
status: ExecutionStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,78 @@ export class EnterForeachNodeImpl implements StepImplementation, StepErrorCatche
) {}

public async run(): Promise<void> {
this.wfExecutionRuntimeManager.enterScope();
let foreachState = this.wfExecutionRuntimeManager.getStepState(this.step.id);
if (!this.wfExecutionRuntimeManager.getStepState(this.step.id)) {
await this.enterForeach();
} else {
await this.advanceIteration();
}
}

if (!foreachState) {
await this.wfExecutionRuntimeManager.startStep(this.step.id);
const evaluatedItems = this.getItems();
async catchError(): Promise<void> {
await this.wfExecutionRuntimeManager.setStepState(this.step.id, undefined);
}

if (evaluatedItems.length === 0) {
this.workflowLogger.logDebug(
`Foreach step "${this.step.id}" has no items to iterate over. Skipping execution.`,
{
workflow: { step_id: this.step.id },
}
);
await this.wfExecutionRuntimeManager.setStepState(this.step.id, {
items: [],
total: 0,
});
await this.wfExecutionRuntimeManager.finishStep(this.step.id);
this.wfExecutionRuntimeManager.goToStep(this.step.exitNodeId);
return;
}
private async enterForeach(): Promise<void> {
let foreachState = this.wfExecutionRuntimeManager.getStepState(this.step.id);
await this.wfExecutionRuntimeManager.startStep(this.step.id);
const evaluatedItems = this.getItems();

if (evaluatedItems.length === 0) {
this.workflowLogger.logDebug(
`Foreach step "${this.step.id}" will iterate over ${evaluatedItems.length} items.`,
`Foreach step "${this.step.id}" has no items to iterate over. Skipping execution.`,
{
workflow: { step_id: this.step.id },
}
);

// Initialize foreach state
foreachState = {
items: evaluatedItems,
item: evaluatedItems[0],
index: 0,
total: evaluatedItems.length,
};
} else {
// Update items and index if they have changed
const items = foreachState.items;
const index = foreachState.index + 1;
const item = items[index];
const total = foreachState.total;
foreachState = {
items,
index,
item,
total,
};
await this.wfExecutionRuntimeManager.setStepState(this.step.id, {
items: [],
total: 0,
});
await this.wfExecutionRuntimeManager.finishStep(this.step.id);
this.wfExecutionRuntimeManager.goToStep(this.step.exitNodeId);
return;
}

this.workflowLogger.logDebug(
`Foreach step "${this.step.id}" will iterate over ${evaluatedItems.length} items.`,
{
workflow: { step_id: this.step.id },
}
);

// Initialize foreach state
foreachState = {
items: evaluatedItems,
item: evaluatedItems[0],
index: 0,
total: evaluatedItems.length,
};
// Enter a new scope for the whole foreach
this.wfExecutionRuntimeManager.enterScope();

// Enter a new scope for the first iteration
this.wfExecutionRuntimeManager.enterScope(foreachState.index!.toString());
await this.wfExecutionRuntimeManager.setStepState(this.step.id, foreachState);
this.wfExecutionRuntimeManager.goToNextStep();
}

async catchError(): Promise<void> {
await this.wfExecutionRuntimeManager.setStepState(this.step.id, undefined);
private async advanceIteration(): Promise<void> {
let foreachState = this.wfExecutionRuntimeManager.getStepState(this.step.id)!;
// Update items and index if they have changed
const items = foreachState.items;
const index = foreachState.index + 1;
const item = items[index];
const total = foreachState.total;
foreachState = {
items,
index,
item,
total,
};
// Enter a new scope for the new iteration
this.wfExecutionRuntimeManager.enterScope(foreachState.index!.toString());
await this.wfExecutionRuntimeManager.setStepState(this.step.id, foreachState);
this.wfExecutionRuntimeManager.goToNextStep();
}

private getItems(): any[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ export class ExitForeachNodeImpl implements StepImplementation {
) {}

public async run(): Promise<void> {
this.wfExecutionRuntimeManager.exitScope();
const foreachState = this.wfExecutionRuntimeManager.getStepState(this.step.startNodeId);

if (!foreachState) {
throw new Error(`Foreach state for step ${this.step.startNodeId} not found`);
}
// Exit the scope of the current iteration
this.wfExecutionRuntimeManager.exitScope();

if (foreachState.items[foreachState.index + 1]) {
this.wfExecutionRuntimeManager.goToStep(this.step.startNodeId);
return;
}

// All items have been processed, exit the foreach scope
this.wfExecutionRuntimeManager.exitScope();
await this.wfExecutionRuntimeManager.setStepState(this.step.startNodeId, undefined);
await this.wfExecutionRuntimeManager.finishStep(this.step.startNodeId);
this.workflowLogger.logDebug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,28 @@ describe('EnterForeachNodeImpl', () => {
getStepState.mockReturnValue(undefined);
});

it('should enter scope', async () => {
it('should enter the whole foreach scope', async () => {
await underTest.run();

expect(enterScope).toHaveBeenCalledTimes(1);
expect(enterScope).toHaveBeenCalledWith();
});

it('should enter the iteration scope', async () => {
await underTest.run();

expect(enterScope).toHaveBeenCalledWith('0');
});

it('should enter scopes in correct order', async () => {
await underTest.run();
expect(enterScope).toHaveBeenNthCalledWith(1);
expect(enterScope).toHaveBeenNthCalledWith(2, '0');
});

it('should enter scope twice', async () => {
await underTest.run();

expect(enterScope).toHaveBeenCalledTimes(2);
});

it('should start step', async () => {
Expand Down Expand Up @@ -208,6 +226,18 @@ describe('EnterForeachNodeImpl', () => {
});
});

it('should enter iteration scope', async () => {
await underTest.run();

expect(enterScope).toHaveBeenCalledWith('1');
});

it('should enter scope only once', async () => {
await underTest.run();

expect(enterScope).toHaveBeenCalledTimes(1);
});

it('should not start step', async () => {
await underTest.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('ExitForeachNodeImpl', () => {
expect(wfExecutionRuntimeManager.setStepResult).not.toHaveBeenCalled();
});

it('should exit scope', async () => {
it('should exit iteration scope', async () => {
await underTest.run();
expect(exitScope).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -137,9 +137,9 @@ describe('ExitForeachNodeImpl', () => {
);
});

it('should exit scope', async () => {
it('should exit iteration scope and whole foreach scope', async () => {
await underTest.run();
expect(exitScope).toHaveBeenCalledTimes(1);
expect(exitScope).toHaveBeenCalledTimes(2);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { EnterConditionBranchNode } from '@kbn/workflows';
import type { StepImplementation } from '../step_base';
import type { WorkflowExecutionRuntimeManager } from '../../workflow_context_manager/workflow_execution_runtime_manager';

export class EnterConditionBranchNodeImpl implements StepImplementation {
constructor(private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager) {}
constructor(
private node: EnterConditionBranchNode,
private wfExecutionRuntimeManager: WorkflowExecutionRuntimeManager
) {}

public async run(): Promise<void> {
if (this.node.type === 'enter-then-branch') {
this.wfExecutionRuntimeManager.enterScope('true');
} else {
this.wfExecutionRuntimeManager.enterScope('false');
}
this.wfExecutionRuntimeManager.goToNextStep();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ export class EnterIfNodeImpl implements StepImplementation {
) {}

public async run(): Promise<void> {
this.wfExecutionRuntimeManager.enterScope();
await this.wfExecutionRuntimeManager.startStep(this.step.id);
this.wfExecutionRuntimeManager.enterScope();
const successors: any[] = this.wfExecutionRuntimeManager.getNodeSuccessors(this.step.id);

if (successors.some((node) => node.type !== 'enter-condition-branch')) {
if (
successors.some((node) => !['enter-then-branch', 'enter-else-branch'].includes(node.type))
) {
throw new Error(
`EnterIfNode with id ${
this.step.id
} must have only 'enter-condition-branch' successors, but found: ${successors
} must have only 'enter-then-branch' or 'enter-else-branch' successors, but found: ${successors
.map((node) => node.type)
.join(', ')}.`
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ export class ExitConditionBranchNodeImpl implements StepImplementation {
// After the branch finishes, we go to the end of If condition
const exitIfNode = successors[0];
this.wfExecutionRuntimeManager.goToStep(exitIfNode.id);
this.wfExecutionRuntimeManager.exitScope();
}
}
Loading