Skip to content

Commit

Permalink
feat(core): Cancel runner task on timeout in external mode (#12101)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Dec 10, 2024
1 parent a63f0e8 commit addb4fa
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 34 deletions.
1 change: 1 addition & 0 deletions docker/images/n8n/n8n-task-runners.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"N8N_RUNNERS_TASK_BROKER_URI",
"N8N_RUNNERS_MAX_PAYLOAD",
"N8N_RUNNERS_MAX_CONCURRENCY",
"N8N_RUNNERS_TASK_TIMEOUT",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST",
"N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT",
Expand Down
4 changes: 2 additions & 2 deletions packages/@n8n/config/src/configs/runners.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;

/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */
@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;

/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */
/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */
@Env('N8N_RUNNERS_HEARTBEAT_INTERVAL')
heartbeatInterval: number = 30;
}
3 changes: 3 additions & 0 deletions packages/@n8n/task-runner/src/config/base-runner-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export class BaseRunnerConfig {
@Env('GENERIC_TIMEZONE')
timezone: string = 'America/New_York';

@Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60;

@Nested
healthcheckServer!: HealthcheckServerConfig;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
Expand Down Expand Up @@ -61,7 +62,7 @@ describe('JsTaskRunner', () => {
runner?: JsTaskRunner;
}) => {
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
return await runner.executeTask(task);
return await runner.executeTask(task, mock<AbortSignal>());
};

afterEach(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand Down Expand Up @@ -48,6 +49,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand Down Expand Up @@ -77,6 +79,7 @@ describe('TestRunner', () => {
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
Expand All @@ -86,4 +89,65 @@ describe('TestRunner', () => {
).toThrowError(/Invalid URL/);
});
});

describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});

const taskId = 'test-task';
runner.runningTasks.set(taskId, {
taskId,
active: false,
cancelled: false,
});

const dataRequestReject = jest.fn();
const nodeTypesRequestReject = jest.fn();

runner.dataRequests.set('data-req', {
taskId,
requestId: 'data-req',
resolve: jest.fn(),
reject: dataRequestReject,
});

runner.nodeTypesRequests.set('node-req', {
taskId,
requestId: 'node-req',
resolve: jest.fn(),
reject: nodeTypesRequestReject,
});

runner.taskCancelled(taskId, 'test-reason');

expect(dataRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);

expect(nodeTypesRequestReject).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Task cancelled: test-reason',
}),
);

expect(runner.dataRequests.size).toBe(0);
expect(runner.nodeTypesRequests.size).toBe(0);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class TaskCancelledError extends ApplicationError {
constructor(reason: string) {
super(`Task cancelled: ${reason}`, { level: 'warning' });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ApplicationError } from 'n8n-workflow';

export class TimeoutError extends ApplicationError {
description: string;

constructor(taskTimeout: number) {
super(
`Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`,
);

const subtitle = 'The task runner was taking too long on this task, so the task was aborted.';

const fixes = {
optimizeScript:
'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.',
ensureTermination:
'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.',
};

const suggestions = [fixes.optimizeScript, fixes.ensureTermination];

const suggestionsText = suggestions
.map((suggestion, index) => `${index + 1}. ${suggestion}`)
.join('<br/>');

const description = `${subtitle} You can try the following:<br/><br/>${suggestionsText}`;

this.description = description;
}
}
57 changes: 46 additions & 11 deletions packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state';
import { isErrorLike } from './errors/error-like';
import { ExecutionError } from './errors/execution-error';
import { makeSerializable } from './errors/serializable-error';
import { TimeoutError } from './errors/timeout-error';
import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
Expand Down Expand Up @@ -94,7 +95,7 @@ export class JsTaskRunner extends TaskRunner {
});
}

async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
async executeTask(task: Task<JSExecSettings>, signal: AbortSignal): Promise<TaskResultData> {
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');

Expand Down Expand Up @@ -133,8 +134,8 @@ export class JsTaskRunner extends TaskRunner {

const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole);
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole, signal);

return {
result,
Expand Down Expand Up @@ -183,6 +184,7 @@ export class JsTaskRunner extends TaskRunner {
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData;
Expand All @@ -199,10 +201,26 @@ export class JsTaskRunner extends TaskRunner {
};

try {
const result = (await runInNewContext(
`globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
const result = await new Promise<TaskResultData['result']>((resolve, reject) => {
const abortHandler = () => {
reject(new TimeoutError(this.taskTimeout));
};

signal.addEventListener('abort', abortHandler, { once: true });

const taskResult = runInNewContext(
`globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
{ timeout: this.taskTimeout * 1000 },
) as Promise<TaskResultData['result']>;

void taskResult
.then(resolve)
.catch(reject)
.finally(() => {
signal.removeEventListener('abort', abortHandler);
});
});

if (result === null) {
return [];
Expand Down Expand Up @@ -230,6 +248,7 @@ export class JsTaskRunner extends TaskRunner {
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = [];
Expand All @@ -255,10 +274,26 @@ export class JsTaskRunner extends TaskRunner {
};

try {
let result = (await runInNewContext(
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
)) as INodeExecutionData | undefined;
let result = await new Promise<INodeExecutionData | undefined>((resolve, reject) => {
const abortHandler = () => {
reject(new TimeoutError(this.taskTimeout));
};

signal.addEventListener('abort', abortHandler);

const taskResult = runInNewContext(
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context,
{ timeout: this.taskTimeout * 1000 },
) as Promise<INodeExecutionData>;

void taskResult
.then(resolve)
.catch(reject)
.finally(() => {
signal.removeEventListener('abort', abortHandler);
});
});

// Filter out null values
if (result === null) {
Expand Down
Loading

0 comments on commit addb4fa

Please sign in to comment.