Skip to content

Commit

Permalink
feat(core): Parent workflows should wait for sub-workflows to finish (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Dec 6, 2024
1 parent 956b11a commit 60b3dcc
Show file tree
Hide file tree
Showing 18 changed files with 257 additions and 70 deletions.
81 changes: 76 additions & 5 deletions packages/cli/src/__tests__/wait-tracker.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { IRun, IWorkflowBase } from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';

import type { ActiveExecutions } from '@/active-executions';
import type { Project } from '@/databases/entities/project';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';
Expand All @@ -12,9 +14,10 @@ import { WaitTracker } from '@/wait-tracker';
import type { WorkflowRunner } from '@/workflow-runner';
import { mockLogger } from '@test/mocking';

jest.useFakeTimers();
jest.useFakeTimers({ advanceTimers: true });

describe('WaitTracker', () => {
const activeExecutions = mock<ActiveExecutions>();
const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>();
Expand All @@ -30,6 +33,7 @@ describe('WaitTracker', () => {
mode: 'manual',
data: mock({
pushRef: 'push_ref',
parentExecution: undefined,
}),
});
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
Expand All @@ -40,6 +44,7 @@ describe('WaitTracker', () => {
mockLogger(),
executionRepository,
ownershipService,
activeExecutions,
workflowRunner,
orchestrationService,
instanceSettings,
Expand Down Expand Up @@ -80,7 +85,9 @@ describe('WaitTracker', () => {
let startExecutionSpy: jest.SpyInstance<Promise<void>, [executionId: string]>;

beforeEach(() => {
executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.findSingleExecution
.calledWith(execution.id)
.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);

Expand Down Expand Up @@ -110,13 +117,17 @@ describe('WaitTracker', () => {
});

describe('startExecution()', () => {
it('should query for execution to start', async () => {
beforeEach(() => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init();

executionRepository.findSingleExecution.mockResolvedValue(execution);
executionRepository.findSingleExecution.calledWith(execution.id).mockResolvedValue(execution);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);

execution.data.parentExecution = undefined;
});

it('should query for execution to start', async () => {
await waitTracker.startExecution(execution.id);

expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, {
Expand All @@ -137,6 +148,65 @@ describe('WaitTracker', () => {
execution.id,
);
});

it('should also resume parent execution once sub-workflow finishes', async () => {
const parentExecution = mock<IExecutionResponse>({
id: 'parent_execution_id',
finished: false,
});
parentExecution.workflowData = mock<IWorkflowBase>({ id: 'parent_workflow_id' });
execution.data.parentExecution = {
executionId: parentExecution.id,
workflowId: parentExecution.workflowData.id,
};
executionRepository.findSingleExecution
.calledWith(parentExecution.id)
.mockResolvedValue(parentExecution);
const postExecutePromise = createDeferredPromise<IRun | undefined>();
activeExecutions.getPostExecutePromise
.calledWith(execution.id)
.mockReturnValue(postExecutePromise.promise);

await waitTracker.startExecution(execution.id);

expect(executionRepository.findSingleExecution).toHaveBeenNthCalledWith(1, execution.id, {
includeData: true,
unflattenData: true,
});

expect(workflowRunner.run).toHaveBeenCalledTimes(1);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
1,
{
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
pushRef: execution.data.pushRef,
},
false,
false,
execution.id,
);

postExecutePromise.resolve(mock<IRun>());
await jest.advanceTimersByTimeAsync(100);

expect(workflowRunner.run).toHaveBeenCalledTimes(2);
expect(workflowRunner.run).toHaveBeenNthCalledWith(
2,
{
executionMode: parentExecution.mode,
executionData: parentExecution.data,
workflowData: parentExecution.workflowData,
projectId: project.id,
pushRef: parentExecution.data.pushRef,
},
false,
false,
parentExecution.id,
);
});
});

describe('single-main setup', () => {
Expand Down Expand Up @@ -165,6 +235,7 @@ describe('WaitTracker', () => {
mockLogger(),
executionRepository,
ownershipService,
activeExecutions,
workflowRunner,
orchestrationService,
mock<InstanceSettings>({ isLeader: false }),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { mock } from 'jest-mock-extended';
import type { IWorkflowBase } from 'n8n-workflow';
import {
type IExecuteWorkflowInfo,
type IWorkflowExecuteAdditionalData,
type ExecuteWorkflowOptions,
type IRun,
type INodeExecutionData,
import type {
IExecuteWorkflowInfo,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowOptions,
IRun,
INodeExecutionData,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
import Container from 'typedi';
Expand Down Expand Up @@ -50,6 +50,7 @@ const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array<INodeExecutionDa
mode: 'manual',
startedAt: new Date(),
status: 'new',
waitTill: undefined,
});

const getCancelablePromise = async (run: IRun) =>
Expand Down Expand Up @@ -114,7 +115,9 @@ describe('WorkflowExecuteAdditionalData', () => {
});

describe('executeWorkflow', () => {
const runWithData = getMockRun({ lastNodeOutput: [[{ json: { test: 1 } }]] });
const runWithData = getMockRun({
lastNodeOutput: [[{ json: { test: 1 } }]],
});

beforeEach(() => {
workflowRepository.get.mockResolvedValue(
Expand Down Expand Up @@ -159,6 +162,23 @@ describe('WorkflowExecuteAdditionalData', () => {

expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID);
});

it('should return waitTill property when workflow execution is waiting', async () => {
const waitTill = new Date();
runWithData.waitTill = waitTill;

const response = await executeWorkflow(
mock<IExecuteWorkflowInfo>(),
mock<IWorkflowExecuteAdditionalData>(),
mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }),
);

expect(response).toEqual({
data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main,
executionId: EXECUTION_ID,
waitTill,
});
});
});

describe('getRunData', () => {
Expand Down Expand Up @@ -230,6 +250,10 @@ describe('WorkflowExecuteAdditionalData', () => {
waitingExecution: {},
waitingExecutionSource: {},
},
parentExecution: {
executionId: '123',
workflowId: '567',
},
resultData: { runData: {} },
startData: {},
},
Expand Down
10 changes: 10 additions & 0 deletions packages/cli/src/wait-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { InstanceSettings } from 'n8n-core';
import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { Service } from 'typedi';

import { ActiveExecutions } from '@/active-executions';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service';
Expand All @@ -23,6 +24,7 @@ export class WaitTracker {
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService,
private readonly activeExecutions: ActiveExecutions,
private readonly workflowRunner: WorkflowRunner,
private readonly orchestrationService: OrchestrationService,
private readonly instanceSettings: InstanceSettings,
Expand Down Expand Up @@ -133,6 +135,14 @@ export class WaitTracker {

// Start the execution again
await this.workflowRunner.run(data, false, false, executionId);

const { parentExecution } = fullExecutionData.data;
if (parentExecution) {
// on child execution completion, resume parent execution
void this.activeExecutions.getPostExecutePromise(executionId).then(() => {
void this.startExecution(parentExecution.executionId);
});
}
}

stopTracking() {
Expand Down
21 changes: 16 additions & 5 deletions packages/cli/src/webhooks/webhook-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ import type { Project } from '@/databases/entities/project';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import type { IExecutionDb, IWorkflowDb } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import { Logger } from '@/logging/logger.service';
import { parseBody } from '@/middlewares';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { WaitTracker } from '@/wait-tracker';
import { createMultiFormDataParser } from '@/webhooks/webhook-form-data';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import * as WorkflowHelpers from '@/workflow-helpers';
Expand Down Expand Up @@ -548,11 +549,21 @@ export async function executeWebhook(
{ executionId },
);

const activeExecutions = Container.get(ActiveExecutions);

// Get a promise which resolves when the workflow did execute and send then response
const executePromise = activeExecutions.getPostExecutePromise(executionId);

const { parentExecution } = runExecutionData;
if (parentExecution) {
// on child execution completion, resume parent execution
void executePromise.then(() => {
const waitTracker = Container.get(WaitTracker);
void waitTracker.startExecution(parentExecution.executionId);
});
}

if (!didSendResponse) {
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
executionId,
) as Promise<IExecutionDb | undefined>;
executePromise
// eslint-disable-next-line complexity
.then(async (data) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ export async function getRunData(
waitingExecution: {},
waitingExecutionSource: {},
},
parentExecution,
};

return {
Expand Down Expand Up @@ -944,6 +945,7 @@ async function startExecution(
return {
executionId,
data: returnData!.data!.main,
waitTill: data.waitTill,
};
}
activeExecutions.finalizeExecution(executionId, data);
Expand Down
67 changes: 64 additions & 3 deletions packages/core/src/node-execution-context/__tests__/shared-tests.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { captor, mock } from 'jest-mock-extended';
import { captor, mock, type MockProxy } from 'jest-mock-extended';
import type {
IRunExecutionData,
ContextType,
Expand All @@ -9,11 +9,21 @@ import type {
ITaskMetadata,
ISourceData,
IExecuteData,
IWorkflowExecuteAdditionalData,
ExecuteWorkflowData,
RelatedExecution,
IExecuteWorkflowInfo,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY } from 'n8n-workflow';
import Container from 'typedi';

import { BinaryDataService } from '@/BinaryData/BinaryData.service';

import type { BaseExecuteContext } from '../base-execute-context';

const binaryDataService = mock<BinaryDataService>();
Container.set(BinaryDataService, binaryDataService);

export const describeCommonTests = (
context: BaseExecuteContext,
{
Expand All @@ -31,7 +41,7 @@ export const describeCommonTests = (
},
) => {
// @ts-expect-error `additionalData` is private
const { additionalData } = context;
const additionalData = context.additionalData as MockProxy<IWorkflowExecuteAdditionalData>;

describe('getExecutionCancelSignal', () => {
it('should return the abort signal', () => {
Expand Down Expand Up @@ -178,4 +188,55 @@ export const describeCommonTests = (
resolveSimpleParameterValueSpy.mockRestore();
});
});

describe('putExecutionToWait', () => {
it('should set waitTill and execution status', async () => {
const waitTill = new Date();

await context.putExecutionToWait(waitTill);

expect(runExecutionData.waitTill).toEqual(waitTill);
expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
});
});

describe('executeWorkflow', () => {
const data = [[{ json: { test: true } }]];
const executeWorkflowData = mock<ExecuteWorkflowData>();
const workflowInfo = mock<IExecuteWorkflowInfo>();
const parentExecution: RelatedExecution = {
executionId: 'parent_execution_id',
workflowId: 'parent_workflow_id',
};

it('should execute workflow and return data', async () => {
additionalData.executeWorkflow.mockResolvedValue(executeWorkflowData);
binaryDataService.duplicateBinaryData.mockResolvedValue(data);

const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});

expect(result.data).toEqual(data);
expect(binaryDataService.duplicateBinaryData).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId,
executeWorkflowData.data,
);
});

it('should put execution to wait if waitTill is returned', async () => {
const waitTill = new Date();
additionalData.executeWorkflow.mockResolvedValue({ ...executeWorkflowData, waitTill });
binaryDataService.duplicateBinaryData.mockResolvedValue(data);

const result = await context.executeWorkflow(workflowInfo, undefined, undefined, {
parentExecution,
});

expect(additionalData.setExecutionStatus).toHaveBeenCalledWith('waiting');
expect(runExecutionData.waitTill).toEqual(WAIT_INDEFINITELY);
expect(result.waitTill).toBe(waitTill);
});
});
};
Loading

0 comments on commit 60b3dcc

Please sign in to comment.