Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Chat triggers don't work with the new partial execution flow #11952

203 changes: 150 additions & 53 deletions packages/cli/src/__tests__/workflow-runner.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';
import { DirectedGraph, WorkflowExecute } from 'n8n-core';
import * as core from 'n8n-core';
import type {
IExecuteData,
INode,
IRun,
ITaskData,
IWaitingForExecution,
IWaitingForExecutionSource,
IWorkflowExecutionDataProcess,
StartNodeData,
} from 'n8n-workflow';
import {
Workflow,
WorkflowHooks,
type ExecutionError,
type IWorkflowExecuteHooks,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import Container from 'typedi';

import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { User } from '@/databases/entities/user';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { Telemetry } from '@/telemetry';
import { PermissionChecker } from '@/user-management/permission-checker';
import { WorkflowRunner } from '@/workflow-runner';
import { mockInstance } from '@test/mocking';
import { createExecution } from '@test-integration/db/executions';
Expand Down Expand Up @@ -43,61 +63,138 @@ afterAll(() => {

beforeEach(async () => {
await testDb.truncate(['Workflow', 'SharedWorkflow']);
jest.clearAllMocks();
});

test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'queue');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});
describe('processError', () => {
test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'queue');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});

test('processError should return early if the error is `ExecutionNotFoundError`', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution({ status: 'success', finished: true }, workflow);
await runner.processError(
new ExecutionNotFoundError(execution.id),
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});

test('processError should return early if the error is `ExecutionNotFoundError`', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution({ status: 'success', finished: true }, workflow);
await runner.processError(
new ExecutionNotFoundError(execution.id),
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
await Container.get(ActiveExecutions).add(
{ executionMode: 'webhook', workflowData: workflow },
execution.id,
);
config.set('executions.mode', 'regular');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
});
});

test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
await Container.get(ActiveExecutions).add(
{ executionMode: 'webhook', workflowData: workflow },
execution.id,
);
config.set('executions.mode', 'regular');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
describe('run', () => {
it('uses recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom with data is sent', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(PermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();

jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce(
new PCancelable(() => {
return mock<IRun>();
}),
);

jest.spyOn(Workflow.prototype, 'getNode').mockReturnValueOnce(mock<INode>());
jest.spyOn(DirectedGraph, 'fromWorkflow').mockReturnValueOnce(new DirectedGraph());
const recreateNodeExecutionStackSpy = jest
.spyOn(core, 'recreateNodeExecutionStack')
.mockReturnValueOnce({
nodeExecutionStack: mock<IExecuteData[]>(),
waitingExecution: mock<IWaitingForExecution>(),
waitingExecutionSource: mock<IWaitingForExecutionSource>(),
});

const data = mock<IWorkflowExecutionDataProcess>({
triggerToStartFrom: { name: 'trigger', data: mock<ITaskData>() },

workflowData: { nodes: [] },
executionData: undefined,
startNodes: [mock<StartNodeData>()],
destinationNode: undefined,
});

// ACT
await runner.run(data);

// ASSERT
expect(recreateNodeExecutionStackSpy).toHaveBeenCalled();
});

it('does not use recreateNodeExecutionStack to create a partial execution if a triggerToStartFrom without data is sent', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValueOnce();
const permissionChecker = Container.get(PermissionChecker);
jest.spyOn(permissionChecker, 'check').mockResolvedValueOnce();

jest.spyOn(WorkflowExecute.prototype, 'processRunExecutionData').mockReturnValueOnce(
new PCancelable(() => {
return mock<IRun>();
}),
);

const recreateNodeExecutionStackSpy = jest.spyOn(core, 'recreateNodeExecutionStack');

const data = mock<IWorkflowExecutionDataProcess>({
triggerToStartFrom: { name: 'trigger', data: undefined },

workflowData: { nodes: [] },
executionData: undefined,
startNodes: [mock<StartNodeData>()],
destinationNode: undefined,
});

// ACT
await runner.run(data);

// ASSERT
expect(recreateNodeExecutionStackSpy).not.toHaveBeenCalled();
});
});
68 changes: 61 additions & 7 deletions packages/cli/src/webhooks/__tests__/test-webhooks.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type * as express from 'express';
import { mock } from 'jest-mock-extended';
import type { IWebhookData, IWorkflowExecuteAdditionalData, Workflow } from 'n8n-workflow';
import type { ITaskData } from 'n8n-workflow';
import {
type IWebhookData,
type IWorkflowExecuteAdditionalData,
type Workflow,
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';

import { generateNanoId } from '@/databases/utils/generators';
Expand Down Expand Up @@ -43,20 +48,24 @@ describe('TestWebhooks', () => {
jest.useFakeTimers();
});

beforeEach(() => {
jest.clearAllMocks();
});

describe('needsWebhook()', () => {
const args: Parameters<typeof testWebhooks.needsWebhook> = [
const args: Parameters<typeof testWebhooks.needsWebhook>[0] = {
userId,
workflowEntity,
mock<IWorkflowExecuteAdditionalData>(),
];
additionalData: mock<IWorkflowExecuteAdditionalData>(),
};

test('if webhook is needed, should register then create webhook and return true', async () => {
const workflow = mock<Workflow>();

jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);

const needsWebhook = await testWebhooks.needsWebhook(...args);
const needsWebhook = await testWebhooks.needsWebhook(args);

const [registerOrder] = registrations.register.mock.invocationCallOrder;
const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder;
Expand All @@ -72,7 +81,7 @@ describe('TestWebhooks', () => {
jest.spyOn(registrations, 'register').mockRejectedValueOnce(new Error(msg));
registrations.getAllRegistrations.mockResolvedValue([]);

const needsWebhook = testWebhooks.needsWebhook(...args);
const needsWebhook = testWebhooks.needsWebhook(args);

await expect(needsWebhook).rejects.toThrowError(msg);
});
Expand All @@ -81,10 +90,55 @@ describe('TestWebhooks', () => {
webhook.webhookDescription.restartWebhook = true;
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);

const result = await testWebhooks.needsWebhook(...args);
const result = await testWebhooks.needsWebhook(args);

expect(result).toBe(false);
});

test('returns false if a triggerToStartFrom with triggerData is given', async () => {
const workflow = mock<Workflow>();
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);

const needsWebhook = await testWebhooks.needsWebhook({
...args,
triggerToStartFrom: {
name: 'trigger',
data: mock<ITaskData>(),
},
});

expect(needsWebhook).toBe(false);
});

test('returns true, registers and then creates webhook if triggerToStartFrom is given with no triggerData', async () => {
// ARRANGE
const workflow = mock<Workflow>();
const webhook2 = mock<IWebhookData>({
node: 'trigger',
httpMethod,
path,
workflowId: workflowEntity.id,
userId,
});
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook, webhook2]);

// ACT
const needsWebhook = await testWebhooks.needsWebhook({
...args,
triggerToStartFrom: { name: 'trigger' },
});

// ASSERT
const [registerOrder] = registrations.register.mock.invocationCallOrder;
const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder;

expect(registerOrder).toBeLessThan(createOrder);
expect(registrations.register.mock.calls[0][0].webhook.node).toBe(webhook2.node);
expect(workflow.createWebhookIfNotExists.mock.calls[0][0].node).toBe(webhook2.node);
expect(needsWebhook).toBe(true);
});
});

describe('executeWebhook()', () => {
Expand Down
44 changes: 34 additions & 10 deletions packages/cli/src/webhooks/test-webhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrati
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
import * as WebhookHelpers from '@/webhooks/webhook-helpers';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import type { WorkflowRequest } from '@/workflows/workflow.request';

import type {
IWebhookResponseCallbackData,
Expand Down Expand Up @@ -218,26 +219,49 @@ export class TestWebhooks implements IWebhookManager {
* Return whether activating a workflow requires listening for webhook calls.
* For every webhook call to listen for, also activate the webhook.
*/
async needsWebhook(
userId: string,
workflowEntity: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
runData?: IRunData,
pushRef?: string,
destinationNode?: string,
) {
async needsWebhook(options: {
userId: string;
workflowEntity: IWorkflowDb;
additionalData: IWorkflowExecuteAdditionalData;
runData?: IRunData;
pushRef?: string;
destinationNode?: string;
triggerToStartFrom?: WorkflowRequest.ManualRunPayload['triggerToStartFrom'];
}) {
const {
userId,
workflowEntity,
additionalData,
runData,
pushRef,
destinationNode,
triggerToStartFrom,
} = options;

if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity);

const workflow = this.toWorkflow(workflowEntity);

const webhooks = WebhookHelpers.getWorkflowWebhooks(
let webhooks = WebhookHelpers.getWorkflowWebhooks(
workflow,
additionalData,
destinationNode,
true,
);

if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) {
// If we have a preferred trigger with data, we don't have to listen for a
// webhook.
if (triggerToStartFrom?.data) {
return false;
}

// If we have a preferred trigger without data we only want to listen for
// that trigger, not the other ones.
if (triggerToStartFrom) {
webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an else if with the next check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow.

This check is removing all but one webhook from the list if the preferred trigger was passed.

The next check is checking if the the list contains at least one trigger that is restartable (I think 🤔 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question is, can it ever happen that a trigger to start triggers the restartable check? What exactly is w.webhookDescription.restartWebhook?

But if not worth it let's leave it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is w.webhookDescription.restartWebhook?

I don't know either tbh. Jan added this here: 5a179cd#diff-b386248ff00977749c873ed85821c241b773e9740d7e7adf94e05b73b350ed74

Which is a huge commit.

It seems this is for the Form, Wait, Slack and Gmail nodes.

I guess the question is, can it ever happen that a trigger to start triggers the restartable check?

I guess that would be fine still. I'm assuming this is to not listen for webhook for workflows that have been put to into waiting?

Which means you can't actually pick them as a trigger to start from.

At least for the the wait and form nodes this is only interesting for workflows that wait, probably for slack too, e.g. if you render a dialog and wait for the user to click a button. I'm not sure why the gmail node offers this though.

I guess this can be left until the split button is implemented.

cc @cstuncsik @CharlieKolb


if (webhooks.some((w) => w.webhookDescription.restartWebhook ?? false)) {
return false; // no webhooks found to start a workflow
}

Expand Down
Loading
Loading