diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts
index b6e9563dd0750..f023e39e15de9 100644
--- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts
+++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts
@@ -6,6 +6,9 @@ import {
INodeTypeDescription,
ITriggerFunctions,
ITriggerResponse,
+ IDeferredPromise,
+ IExecuteResponsePromiseData,
+ createDeferredPromise,
} from 'n8n-workflow';
import {
@@ -44,7 +47,38 @@ export class RabbitMQTrigger implements INodeType {
placeholder: 'queue-name',
description: 'Name of the queue to publish to.',
},
-
+ {
+ displayName: 'Acknowledge',
+ name: 'acknowledgeMode',
+ type: 'options',
+ options: [
+ {
+ name: 'Immediately',
+ value: 'onReceived',
+ description: 'As soon as this node executes',
+ },
+ {
+ name: 'Using \'Respond to Trigger\' node',
+ value: 'acknowledgeNode',
+ description: 'Acknowledge when acknowledge node executes',
+ },
+ ],
+ default: 'onReceived',
+ description: 'When and how to acknowledge to the trigger.',
+ },
+ {
+ displayName: 'Insert a \'Respond to Trigger\' node to control when and how you acknowledge. More details',
+ name: 'triggerNotice',
+ type: 'notice',
+ displayOptions: {
+ show: {
+ acknowledgeMode: [
+ 'acknowledgeNode',
+ ],
+ },
+ },
+ default: '',
+ },
{
displayName: 'Options',
name: 'options',
@@ -101,6 +135,7 @@ export class RabbitMQTrigger implements INodeType {
async trigger(this: ITriggerFunctions): Promise {
const queue = this.getNodeParameter('queue') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
+ const acknowledgeMode = this.getNodeParameter('acknowledgeMode') as string;
const channel = await rabbitmqConnectQueue.call(this, queue, options);
@@ -134,12 +169,36 @@ export class RabbitMQTrigger implements INodeType {
}
}
- self.emit([
- [
- item,
- ],
- ]);
- channel.ack(message);
+ if (acknowledgeMode == 'onReceived') {
+ self.emit([
+ [
+ item,
+ ],
+ ]);
+
+ channel.ack(message);
+ } else {
+ let acknowledgePromise = await createDeferredPromise();
+ acknowledgePromise
+ .promise()
+ .then((response: IExecuteResponsePromiseData) => {
+ let acknowledgeResponse = response as IDataObject;
+ if (acknowledgeResponse.ack) {
+ channel.ack(message);
+ } else {
+ channel.nack(message);
+ }
+ })
+ .catch(async (error) => {
+ channel.nack(message);
+ });
+
+ self.emit([
+ [
+ item,
+ ],
+ ], acknowledgePromise);
+ }
}
});
};
diff --git a/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json
new file mode 100644
index 0000000000000..4dd9319b60ec2
--- /dev/null
+++ b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json
@@ -0,0 +1,19 @@
+{
+ "node": "n8n-nodes-base.respondToTrigger",
+ "nodeVersion": "1.0",
+ "codexVersion": "1.0",
+ "categories": [
+ "Core Nodes",
+ "Utility"
+ ],
+ "resources": {
+ "primaryDocumentation": [
+ {
+ "url": "https://docs.n8n.io/nodes/n8n-nodes-base.respondToTrigger/"
+ }
+ ]
+ },
+ "subcategories": {
+ "Core Nodes":["Flow"]
+ }
+}
diff --git a/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts
new file mode 100644
index 0000000000000..3b352c76a9481
--- /dev/null
+++ b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts
@@ -0,0 +1,50 @@
+import {
+ IExecuteFunctions,
+} from 'n8n-core';
+
+import {
+ IDataObject,
+ IN8nHttpFullResponse,
+ IN8nHttpResponse,
+ INodeExecutionData,
+ INodeType,
+ INodeTypeDescription,
+} from 'n8n-workflow';
+
+export class RespondToTrigger implements INodeType {
+ description: INodeTypeDescription = {
+ displayName: 'Respond to Trigger',
+ icon: 'file:trigger.svg',
+ name: 'respondToTrigger',
+ group: ['transform'],
+ version: 1,
+ description: 'Returns data for Trigger',
+ defaults: {
+ name: 'Respond to Trigger',
+ },
+ inputs: ['main'],
+ outputs: ['main'],
+ credentials: [
+ ],
+ properties: [
+ {
+ displayName: 'Acknowledge',
+ name: 'acknowledge',
+ type: 'boolean',
+ default: true,
+ description: 'Set \'true\' for acknowledge or \'false\' for not acknowledge.',
+ },
+ ],
+ };
+
+ async execute(this: IExecuteFunctions): Promise {
+ const items = this.getInputData();
+
+ const acknowledge = this.getNodeParameter('acknowledge', 0, false) as boolean;
+ const response = {ack: acknowledge} as IDataObject;
+
+ this.sendResponse(response);
+
+ return this.prepareOutputData(items);
+ }
+}
diff --git a/packages/nodes-base/nodes/RespondToTrigger/trigger.svg b/packages/nodes-base/nodes/RespondToTrigger/trigger.svg
new file mode 100644
index 0000000000000..43111d80a2f77
--- /dev/null
+++ b/packages/nodes-base/nodes/RespondToTrigger/trigger.svg
@@ -0,0 +1 @@
+
diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json
index 1c9b15b45d748..674ef12425d68 100644
--- a/packages/nodes-base/package.json
+++ b/packages/nodes-base/package.json
@@ -565,6 +565,7 @@
"dist/nodes/Reddit/Reddit.node.js",
"dist/nodes/Redis/Redis.node.js",
"dist/nodes/RenameKeys/RenameKeys.node.js",
+ "dist/nodes/RespondToTrigger/RespondToTrigger.node.js",
"dist/nodes/RespondToWebhook/RespondToWebhook.node.js",
"dist/nodes/Rocketchat/Rocketchat.node.js",
"dist/nodes/RssFeedRead/RssFeedRead.node.js",