Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuild RabbitMQ node and trigger #3244

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b69a754
Rebuild RabbitMQ node and trigger
Ken-Michalak Mar 2, 2022
f759064
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency May 4, 2022
3ba9381
:zap: ui and linter fixes
michael-radency May 5, 2022
98a0704
:zap: breaking-changes update
michael-radency May 5, 2022
533dd49
:zap: nack on error option
michael-radency May 5, 2022
e748a1c
:zap: renamed option
michael-radency May 6, 2022
25c88e0
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency May 6, 2022
1647a6d
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency May 10, 2022
33dc9a2
:zap: merge fix
michael-radency Jun 6, 2022
b265f6e
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 6, 2022
b615bfd
:hammer: update
michael-radency Jun 6, 2022
5d28f43
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 7, 2022
c77645d
:hammer: amqp-connection-manager setup
michael-radency Jun 7, 2022
a46890c
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 7, 2022
407ff88
:hammer: closing connection if PRECONDITION-FAILED
michael-radency Jun 8, 2022
dff75ab
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 8, 2022
ebf5b94
:closing connection:
michael-radency Jun 8, 2022
aeeba0b
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 9, 2022
c2cd0d3
:hammer: error handling fix
michael-radency Jun 9, 2022
f882088
Merge branch 'master' of https://github.com/n8n-io/n8n into rabbitmq-…
michael-radency Jun 10, 2022
6720e44
:hammer: clean up and linter fixes
michael-radency Jun 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 0 additions & 91 deletions packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts

This file was deleted.

186 changes: 119 additions & 67 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,96 +1,125 @@
import {
ICredentialDataDecryptedObject,
IDataObject,
IExecuteFunctions,
ITriggerFunctions,
} from 'n8n-workflow';

import * as amqplib from 'amqplib';
import amqp, { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';

declare module 'amqplib' {
interface Channel {
connection: amqplib.Connection;
}
}
// maintain and auto-reconnect one connection per set of credentials
const allConnections = new Map<string, AmqpConnectionManager>();

export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise<amqplib.Channel> {
const credentials = await this.getCredentials('rabbitmq');
export async function rabbitmqConnect(
self: IExecuteFunctions | ITriggerFunctions,
setup: (this: ChannelWrapper, channel: amqplib.Channel) => Promise<void>,
): Promise<ChannelWrapper> {
const credentials = await self.getCredentials('rabbitmq');

const credentialKeys = [
'hostname',
'port',
'username',
'password',
'vhost',
];
return new Promise(async (resolve, reject) => {
try {
const connection = getConnection(credentials);
const channel = connection.createChannel({ setup });

const credentialData: IDataObject = {};
credentialKeys.forEach(key => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});
const mode = 'getActivationMode' in self && self.getActivationMode();

const optsData: IDataObject = {};
if (credentials.ssl === true) {
credentialData.protocol = 'amqps';
// this is manual execution, so don't just silently retry
if (mode && ['manual', 'activate'].includes(mode)) {
await manualExecutionWaitOrFail(channel);
}

optsData.ca = credentials.ca === '' ? undefined : [Buffer.from(credentials.ca as string)];
if (credentials.passwordless === true) {
optsData.cert = credentials.cert === '' ? undefined : Buffer.from(credentials.cert as string);
optsData.key = credentials.key === '' ? undefined : Buffer.from(credentials.key as string);
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
resolve(channel);
} catch (error) {
reject(error);
}
});
}

/** If the channel doesn't connect successfully, close the channel and error out. */
async function manualExecutionWaitOrFail(channel: ChannelWrapper, timeout = 20000): Promise<void> {
const failPromise = new Promise((resolve, reject) => {
setTimeout(reject, 20000, new Error('Timeout while waiting for RabbitMQ channel'));
channel.once('error', reject);
});

try {
await Promise.race([channel.waitForConnect(), failPromise]);
} catch (error) {
channel.close();
throw error;
}
}

// Get or create a new managed connection that automatically reconnects
function getConnection(credentials: ICredentialDataDecryptedObject | undefined): AmqpConnectionManager {
if (!credentials) {
throw new Error('RabbitMQ credentials required to connect');
}

return new Promise(async (resolve, reject) => {
try {
const connection = await amqplib.connect(credentialData, optsData);
const connectionKey = JSON.stringify(credentials);

connection.on('error', (error: Error) => {
reject(error);
});
let connection = allConnections.get(connectionKey);

const channel = await connection.createChannel().catch(console.warn) as amqplib.Channel;
if (!connection) {
connection = createConnection(credentials);
allConnections.set(connectionKey, connection);
}

return connection;
}

if (options.arguments && ((options.arguments as IDataObject).argument! as IDataObject[]).length) {
const additionalArguments: IDataObject = {};
((options.arguments as IDataObject).argument as IDataObject[]).forEach((argument: IDataObject) => {
additionalArguments[argument.key as string] = argument.value;
});
options.arguments = additionalArguments;
export function getAllConnections() {
return allConnections;
}

// Create connection manager with the default options (5-second heartbeat and retry)
function createConnection(credentials: ICredentialDataDecryptedObject): AmqpConnectionManager {
const [credentialData, connectionOptions] = getConnectionArguments(credentials);
const name = `${credentialData.hostname}:${credentialData.port}`;

const connection = amqp
.connect(credentialData, { connectionOptions })
.on('error', (err) => {
console.warn(`RabbitMQ: Connection error for ${name}: ${err.message}`);
})
.on('blocked', ({ reason }) => {
console.warn(`RabbitMQ: Connection blocked for ${name}: ${reason}`);
})
.on('disconnect', ({ err }) => {
console.log(`RabbitMQ: Connection closed for ${name}: ${err.message}`);
if (err.message.includes('PRECONDITION-FAILED')) {
connection.close();
}
});

resolve(channel);
} catch (error) {
reject(error);
}
});
console.log(`RabbitMQ: Created managed connection for ${name}`);

return connection;
}

export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
function getConnectionArguments(credentials: IDataObject) {
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];

return new Promise(async (resolve, reject) => {
try {
await channel.assertQueue(queue, options);
resolve(channel);
} catch (error) {
reject(error);
}
const credentialData: IDataObject = {};

credentialKeys.forEach((key) => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});
}

export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
const optsData: IDataObject = {};
if (credentials.ssl === true) {
credentialData.protocol = 'amqps';

return new Promise(async (resolve, reject) => {
try {
await channel.assertExchange(exchange, type, options);
resolve(channel);
} catch (error) {
reject(error);
optsData.ca = credentials.ca === '' ? undefined : [Buffer.from(credentials.ca as string)];
if (credentials.passwordless === true) {
optsData.cert = credentials.cert === '' ? undefined : Buffer.from(credentials.cert as string);
optsData.key = credentials.key === '' ? undefined : Buffer.from(credentials.key as string);
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
});
}
return [credentialData, optsData];
}

export class MessageTracker {
Expand All @@ -114,14 +143,14 @@ export class MessageTracker {
return this.messages.length;
}

async closeChannel(channel: amqplib.Channel, consumerTag: string) {
async closeChannel(channel: ChannelWrapper) {
if (this.isClosing) {
return;
}
this.isClosing = true;

// Do not accept any new messages
await channel.cancel(consumerTag);
await channel.cancelAll();

let count = 0;
let unansweredMessages = this.unansweredMessages();
Expand All @@ -139,6 +168,29 @@ export class MessageTracker {
}

await channel.close();
await channel.connection.close();
const connections = getAllConnections();
connections.forEach(connection => connection.close());
}
}

export function fixOptions(data: IDataObject) {
const options = data;

if (options.arguments) {
const args = (options?.arguments as IDataObject)?.argumet as IDataObject[] || [];
options.arguments = args.reduce((acc, argument) => {
acc[argument.key as string] = argument.value;
return acc;
}, {});
}

if (options.headers) {
const headers = (options?.headers as IDataObject)?.header as IDataObject[] || [];
options.headers = headers.reduce((acc, header) => {
acc[header.key as string] = header.value;
return acc;
}, {});
}

return options;
}
Loading