Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(RabbitMQ Trigger Node): Make message acknowledgement and concurrent processing configurable #3385

Merged
merged 8 commits into from
May 30, 2022
25 changes: 23 additions & 2 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
IGetExecuteTriggerFunctions,
INode,
INodeExecutionData,
IRun,
IRunExecutionData,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
NodeHelpers,
Expand Down Expand Up @@ -52,6 +53,9 @@ import config from '../config';
import { User } from './databases/entities/User';
import { whereClause } from './WorkflowHelpers';
import { WorkflowEntity } from './databases/entities/WorkflowEntity';
import * as ActiveExecutions from './ActiveExecutions';

const activeExecutions = ActiveExecutions.getInstance();

const WEBHOOK_PROD_UNREGISTERED_HINT = `The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)`;

Expand Down Expand Up @@ -673,14 +677,31 @@ export class ActiveWorkflowRunner {
returnFunctions.emit = (
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun | undefined>,
): void => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.debug(`Received trigger for workflow "${workflow.name}"`);
WorkflowHelpers.saveStaticData(workflow);
// eslint-disable-next-line id-denylist
this.runWorkflow(workflowData, node, data, additionalData, mode, responsePromise).catch(
(error) => console.error(error),
const executePromise = this.runWorkflow(
workflowData,
node,
data,
additionalData,
mode,
responsePromise,
);

if (donePromise) {
executePromise.then((executionId) => {
activeExecutions
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
});
} else {
executePromise.catch(console.error);
}
};
returnFunctions.emitError = async (error: Error): Promise<void> => {
await this.activeWorkflows?.remove(workflowData.id.toString());
Expand Down
43 changes: 41 additions & 2 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ export class WorkflowExecute {

let currentExecutionTry = '';
let lastExecutionTry = '';
let closeFunction: Promise<void> | undefined;

return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false;
Expand Down Expand Up @@ -811,7 +812,7 @@ export class WorkflowExecute {
node: executionNode.name,
workflowId: workflow.id,
});
nodeSuccessData = await workflow.runNode(
const runNodeData = await workflow.runNode(
executionData.node,
executionData.data,
this.runExecutionData,
Expand All @@ -820,6 +821,14 @@ export class WorkflowExecute {
NodeExecuteFunctions,
this.mode,
);
nodeSuccessData = runNodeData.data;

if (runNodeData.closeFunction) {
// Explanation why we do this can be found in n8n-workflow/Workflow.ts -> runNode
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
closeFunction = runNodeData.closeFunction();
}

Logger.debug(`Running node "${executionNode.name}" finished successfully`, {
node: executionNode.name,
workflowId: workflow.id,
Expand Down Expand Up @@ -1033,9 +1042,10 @@ export class WorkflowExecute {
startedAt,
workflow,
new WorkflowOperationError('Workflow has been canceled or timed out!'),
closeFunction,
);
}
return this.processSuccessExecution(startedAt, workflow, executionError);
return this.processSuccessExecution(startedAt, workflow, executionError, closeFunction);
})
.catch(async (error) => {
const fullRunData = this.getFullRunData(startedAt);
Expand All @@ -1061,6 +1071,20 @@ export class WorkflowExecute {
},
);

if (closeFunction) {
try {
await closeFunction;
} catch (errorClose) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/restrict-template-expressions
`There was a problem deactivating trigger of workflow "${workflow.id}": "${errorClose.message}"`,
{
workflowId: workflow.id,
},
);
}
}

return fullRunData;
});

Expand All @@ -1072,6 +1096,7 @@ export class WorkflowExecute {
startedAt: Date,
workflow: Workflow,
executionError?: ExecutionError,
closeFunction?: Promise<void>,
): Promise<IRun> {
const fullRunData = this.getFullRunData(startedAt);

Expand Down Expand Up @@ -1106,6 +1131,20 @@ export class WorkflowExecute {

await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);

if (closeFunction) {
try {
await closeFunction;
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`There was a problem deactivating trigger of workflow "${workflow.id}": "${error.message}"`,
{
workflowId: workflow.id,
},
);
}
}

return fullRunData;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const rabbitDefaultOptions: Array<INodePropertyOptions | INodeProperties
],
},
{
displayName: 'Auto Delete',
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
Expand Down
67 changes: 61 additions & 6 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import {
ITriggerFunctions,
} from 'n8n-workflow';

const amqplib = require('amqplib');
import * as amqplib from 'amqplib';

export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
declare module 'amqplib' {
interface Channel {
connection: amqplib.Connection;
}
}

export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<amqplib.Channel> {
const credentials = await this.getCredentials('rabbitmq');

const credentialKeys = [
Expand Down Expand Up @@ -44,7 +50,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
reject(error);
});

const channel = await connection.createChannel().catch(console.warn);
const channel = await connection.createChannel().catch(console.warn) as amqplib.Channel;

if (options.arguments && ((options.arguments as IDataObject).argument! as IDataObject[]).length) {
const additionalArguments: IDataObject = {};
Expand All @@ -54,15 +60,14 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction
options.arguments = additionalArguments;
}


resolve(channel);
} catch (error) {
reject(error);
}
});
}

export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);

return new Promise(async (resolve, reject) => {
Expand All @@ -75,7 +80,7 @@ export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFun
});
}

export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<any> { // tslint:disable-line:no-any
export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);

return new Promise(async (resolve, reject) => {
Expand All @@ -87,3 +92,53 @@ export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITrigger
}
});
}

export class MessageTracker {
messages: number[] = [];
isClosing = false;

received(message: amqplib.ConsumeMessage) {
this.messages.push(message.fields.deliveryTag);
}

answered(message: amqplib.ConsumeMessage) {
if (this.messages.length === 0) {
return;
}

const index = this.messages.findIndex(value => value !== message.fields.deliveryTag);
this.messages.splice(index);
}

unansweredMessages() {
return this.messages.length;
}

async closeChannel(channel: amqplib.Channel, consumerTag: string) {
if (this.isClosing) {
return;
}
this.isClosing = true;

// Do not accept any new messages
await channel.cancel(consumerTag);

let count = 0;
let unansweredMessages = this.unansweredMessages();

// Give currently executing messages max. 5 minutes to finish before
// the channel gets closed. If we would not do that, it would not be possible
// to acknowledge messages anymore for which the executions were already running
// when for example a new version of the workflow got saved. That would lead to
// them getting delivered and processed again.
while (unansweredMessages !== 0 && count++ <= 300) {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
unansweredMessages = this.unansweredMessages();
}

await channel.close();
await channel.connection.close();
}
}
2 changes: 1 addition & 1 deletion packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ export class RabbitMQ implements INodeType {
],
},
{
displayName: 'Auto Delete',
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
Expand Down
Loading