Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Cancel runner task on timeout in external mode #12101

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/images/n8n/n8n-task-runners.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"N8N_RUNNERS_TASK_BROKER_URI",
"N8N_RUNNERS_MAX_PAYLOAD",
"N8N_RUNNERS_MAX_CONCURRENCY",
"N8N_RUNNERS_TASK_TIMEOUT",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT",
Expand Down
4 changes: 2 additions & 2 deletions packages/@n8n/config/src/configs/runners.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;

/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */
@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;

/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */
/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */
@Env('N8N_RUNNERS_HEARTBEAT_INTERVAL')
heartbeatInterval: number = 30;
}
3 changes: 3 additions & 0 deletions packages/@n8n/task-runner/src/config/base-runner-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export class BaseRunnerConfig {
@Env('GENERIC_TIMEZONE')
timezone: string = 'America/New_York';

@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;

@Nested
healthcheckServer!: HealthcheckServerConfig;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
Expand Down Expand Up @@ -61,7 +62,7 @@ describe('JsTaskRunner', () => {
runner?: JsTaskRunner;
}) => {
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
return await runner.executeTask(task);
return await runner.executeTask(task, mock<AbortSignal>());
};

afterEach(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand Down Expand Up @@ -48,6 +49,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand Down Expand Up @@ -77,6 +79,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand All @@ -86,4 +89,65 @@ describe('TestRunner', () => {
).toThrowError(/Invalid URL/);
});
});

describe('taskCancelled', () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

it('should reject pending requests when task is cancelled', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});

const taskId = 'test-task';
runner.runningTasks.set(taskId, {
taskId,
active: false,
cancelled: false,
});

const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();

runner.dataRequests.set('data-req', {
taskId,
requestId: 'data-req',
resolve: jest.fn(),
reject: dataRequestReject,
});

runner.nodeTypesRequests.set('node-req', {
taskId,
requestId: 'node-req',
resolve: jest.fn(),
reject: nodeTypesRequestReject,
});

runner.taskCancelled(taskId, 'test-reason');

expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);

expect(nodeTypesRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);

expect(runner.dataRequests.size).toBe(0);
expect(runner.nodeTypesRequests.size).toBe(0);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class TaskCancelledError extends ApplicationError {
constructor(reason: string) {
super(`Task cancelled: ${reason}`, { level: 'warning' });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ApplicationError } from 'n8n-workflow';

export class TimeoutError extends ApplicationError {
description: string;

constructor(taskTimeout: number) {
super(
`Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`,
);

const subtitle = 'The task runner was taking too long on this task, so the task was aborted.';

const fixes = {
optimizeScript:
'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.',
ensureTermination:
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
};

const suggestions = [fixes.optimizeScript, fixes.ensureTermination];

const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');

const description = `${subtitle} You can try the following:<br/><br/>${suggestionsText}`;

this.description = description;
}
}
57 changes: 46 additions & 11 deletions packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state';
import { isErrorLike } from './errors/error-like';
import { ExecutionError } from './errors/execution-error';
import { makeSerializable } from './errors/serializable-error';
import { TimeoutError } from './errors/timeout-error';
import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
Expand Down Expand Up @@ -94,7 +95,7 @@ export class JsTaskRunner extends TaskRunner {
});
}

async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
async executeTask(task: Task<JSExecSettings>, signal: AbortSignal): Promise<TaskResultData> {
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');

Expand Down Expand Up @@ -133,8 +134,8 @@ export class JsTaskRunner extends TaskRunner {

const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole);
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole, signal);

return {
result,
Expand Down Expand Up @@ -183,6 +184,7 @@ export class JsTaskRunner extends TaskRunner {
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData;
Expand All @@ -199,10 +201,26 @@ export class JsTaskRunner extends TaskRunner {
};

try {
const result = (await runInNewContext(
`globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
const result = await new Promise<TaskResultData['result']>((resolve, reject) => {
const abortHandler = () => {
reject(new TimeoutError(this.taskTimeout));
};

signal.addEventListener('abort', abortHandler, { once: true });

const taskResult = runInNewContext(
`globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
{ timeout: this.taskTimeout * 1000 },
) as Promise<TaskResultData['result']>;

void taskResult
.then(resolve)
.catch(reject)
.finally(() => {
signal.removeEventListener('abort', abortHandler);
});
});

if (result === null) {
return [];
Expand Down Expand Up @@ -230,6 +248,7 @@ export class JsTaskRunner extends TaskRunner {
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = [];
Expand All @@ -255,10 +274,26 @@ export class JsTaskRunner extends TaskRunner {
};

try {
let result = (await runInNewContext(
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as INodeExecutionData | undefined;
let result = await new Promise<INodeExecutionData | undefined>((resolve, reject) => {
const abortHandler = () => {
reject(new TimeoutError(this.taskTimeout));
};

signal.addEventListener('abort', abortHandler);

const taskResult = runInNewContext(
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
{ timeout: this.taskTimeout * 1000 },
) as Promise<INodeExecutionData>;

void taskResult
.then(resolve)
.catch(reject)
.finally(() => {
signal.removeEventListener('abort', abortHandler);
});
});

// Filter out null values
if (result === null) {
Expand Down
Loading
Loading