diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index b6e9563dd0750..f023e39e15de9 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -6,6 +6,9 @@ import { INodeTypeDescription, ITriggerFunctions, ITriggerResponse, + IDeferredPromise, + IExecuteResponsePromiseData, + createDeferredPromise, } from 'n8n-workflow'; import { @@ -44,7 +47,38 @@ export class RabbitMQTrigger implements INodeType { placeholder: 'queue-name', description: 'Name of the queue to publish to.', }, - + { + displayName: 'Acknowledge', + name: 'acknowledgeMode', + type: 'options', + options: [ + { + name: 'Immediately', + value: 'onReceived', + description: 'As soon as this node executes', + }, + { + name: 'Using \'Respond to Trigger\' node', + value: 'acknowledgeNode', + description: 'Acknowledge when acknowledge node executes', + }, + ], + default: 'onReceived', + description: 'When and how to acknowledge to the trigger.', + }, + { + displayName: 'Insert a \'Respond to Trigger\' node to control when and how you acknowledge. More details', + name: 'triggerNotice', + type: 'notice', + displayOptions: { + show: { + acknowledgeMode: [ + 'acknowledgeNode', + ], + }, + }, + default: '', + }, { displayName: 'Options', name: 'options', @@ -101,6 +135,7 @@ export class RabbitMQTrigger implements INodeType { async trigger(this: ITriggerFunctions): Promise { const queue = this.getNodeParameter('queue') as string; const options = this.getNodeParameter('options', {}) as IDataObject; + const acknowledgeMode = this.getNodeParameter('acknowledgeMode') as string; const channel = await rabbitmqConnectQueue.call(this, queue, options); @@ -134,12 +169,36 @@ export class RabbitMQTrigger implements INodeType { } } - self.emit([ - [ - item, - ], - ]); - channel.ack(message); + if (acknowledgeMode == 'onReceived') { + self.emit([ + [ + item, + ], + ]); + + channel.ack(message); + } else { + let acknowledgePromise = await createDeferredPromise(); + acknowledgePromise + .promise() + .then((response: IExecuteResponsePromiseData) => { + let acknowledgeResponse = response as IDataObject; + if (acknowledgeResponse.ack) { + channel.ack(message); + } else { + channel.nack(message); + } + }) + .catch(async (error) => { + channel.nack(message); + }); + + self.emit([ + [ + item, + ], + ], acknowledgePromise); + } } }); }; diff --git a/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json new file mode 100644 index 0000000000000..4dd9319b60ec2 --- /dev/null +++ b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.json @@ -0,0 +1,19 @@ +{ + "node": "n8n-nodes-base.respondToTrigger", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": [ + "Core Nodes", + "Utility" + ], + "resources": { + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/nodes/n8n-nodes-base.respondToTrigger/" + } + ] + }, + "subcategories": { + "Core Nodes":["Flow"] + } +} diff --git a/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts new file mode 100644 index 0000000000000..3b352c76a9481 --- /dev/null +++ b/packages/nodes-base/nodes/RespondToTrigger/RespondToTrigger.node.ts @@ -0,0 +1,50 @@ +import { + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + IN8nHttpFullResponse, + IN8nHttpResponse, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +export class RespondToTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'Respond to Trigger', + icon: 'file:trigger.svg', + name: 'respondToTrigger', + group: ['transform'], + version: 1, + description: 'Returns data for Trigger', + defaults: { + name: 'Respond to Trigger', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + ], + properties: [ + { + displayName: 'Acknowledge', + name: 'acknowledge', + type: 'boolean', + default: true, + description: 'Set \'true\' for acknowledge or \'false\' for not acknowledge.', + }, + ], + }; + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + + const acknowledge = this.getNodeParameter('acknowledge', 0, false) as boolean; + const response = {ack: acknowledge} as IDataObject; + + this.sendResponse(response); + + return this.prepareOutputData(items); + } +} diff --git a/packages/nodes-base/nodes/RespondToTrigger/trigger.svg b/packages/nodes-base/nodes/RespondToTrigger/trigger.svg new file mode 100644 index 0000000000000..43111d80a2f77 --- /dev/null +++ b/packages/nodes-base/nodes/RespondToTrigger/trigger.svg @@ -0,0 +1 @@ + diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 1c9b15b45d748..674ef12425d68 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -565,6 +565,7 @@ "dist/nodes/Reddit/Reddit.node.js", "dist/nodes/Redis/Redis.node.js", "dist/nodes/RenameKeys/RenameKeys.node.js", + "dist/nodes/RespondToTrigger/RespondToTrigger.node.js", "dist/nodes/RespondToWebhook/RespondToWebhook.node.js", "dist/nodes/Rocketchat/Rocketchat.node.js", "dist/nodes/RssFeedRead/RssFeedRead.node.js",