Skip to content

Commit

Permalink
fix(core): Prevent __default__ jobs in scaling mode (#12402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Dec 30, 2024
1 parent 1e60bbc commit 072664b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
4 changes: 3 additions & 1 deletion packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { GlobalConfig } from '@n8n/config';
import * as BullModule from 'bull';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, ExecutionCancelledError } from 'n8n-workflow';
import Container from 'typedi';

import { mockInstance, mockLogger } from '@test/mocking';
Expand Down Expand Up @@ -287,6 +287,8 @@ describe('ScalingService', () => {
const result = await scalingService.stopJob(job);

expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' });
expect(job.discard).toHaveBeenCalled();
expect(job.moveToFailed).toHaveBeenCalledWith(new ExecutionCancelledError('123'), true);
expect(result).toBe(true);
});

Expand Down
25 changes: 20 additions & 5 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { GlobalConfig } from '@n8n/config';
import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core';
import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify, ensureError } from 'n8n-workflow';
import {
ApplicationError,
BINARY_ENCODING,
sleep,
jsonStringify,
ensureError,
ExecutionCancelledError,
} from 'n8n-workflow';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { strict } from 'node:assert';
import assert, { strict } from 'node:assert';
import Container, { Service } from 'typedi';

import { ActiveExecutions } from '@/active-executions';
Expand Down Expand Up @@ -206,16 +213,24 @@ export class ScalingService {
try {
if (await job.isActive()) {
await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('Stopped active job', props);
await job.discard(); // prevent retries
await job.moveToFailed(new ExecutionCancelledError(job.data.executionId), true); // remove from queue
return true;
}

await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
this.logger.debug('Stopped inactive job', props);
return true;
} catch (error: unknown) {
await job.progress({ kind: 'abort-job' });
this.logger.error('Failed to stop job', { ...props, error });
assert(error instanceof Error);
this.logger.error('Failed to stop job', {
...props,
error: {
message: error.message,
name: error.name,
stack: error.stack,
},
});
return false;
}
}
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ export class WorkflowRunner {
//
// FIXME: This is a quick fix. The proper fix would be to not remove
// the execution from the active executions while it's still running.
if (error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError) {
if (
error instanceof ExecutionNotFoundError ||
error instanceof ExecutionCancelledError ||
error.message.includes('cancelled')
) {
return;
}

this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
this.errorReporter.error(error, { executionId });

const isQueueMode = config.getEnv('executions.mode') === 'queue';
Expand Down Expand Up @@ -413,7 +418,6 @@ export class WorkflowRunner {
data.workflowData,
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined },
);
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
await this.processError(error, new Date(), data.executionMode, executionId, hooks);

reject(error);
Expand Down

0 comments on commit 072664b

Please sign in to comment.