From 356722dac4a2ad8ad6ee60d64d7a42394698e911 Mon Sep 17 00:00:00 2001 From: Michael Kret Date: Fri, 24 Jun 2022 16:48:01 +0300 Subject: [PATCH 1/3] :hammer: additional options to kafka trigger --- .../nodes/Kafka/KafkaTrigger.node.ts | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 22c26a163a2ea..f0bc5599cac31 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -94,6 +94,29 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to allow sending message to a previously non exisiting topic', }, + { + displayName: 'Auto Commit Interval', + name: 'autoCommitInterval', + type: 'number', + default: 0, + description: 'The consumer will commit offsets after a given period, for example, five seconds', + hint: 'Value in milliseconds', + }, + { + displayName: 'Auto Commit Threshold', + name: 'autoCommitThreshold', + type: 'number', + default: 0, + description: 'The consumer will commit offsets after resolving a given number of messages', + }, + { + displayName: 'Heartbeat Interval', + name: 'heartbeatInterval', + type: 'number', + default: 3000, + description: 'Heartbeats are used to ensure that the consumer\'s session stays active', + hint: 'The value must be set lower than Session Timeout', + }, { displayName: 'Read Messages From Beginning', name: 'fromBeginning', @@ -128,6 +151,7 @@ export class KafkaTrigger implements INodeType { type: 'number', default: 30000, description: 'The time to await a response in ms', + hint: 'Value in milliseconds', }, { displayName: 'Return Headers', @@ -175,7 +199,11 @@ export class KafkaTrigger implements INodeType { const kafka = new apacheKafka(config); - const consumer = kafka.consumer({ groupId }); + const consumer = kafka.consumer({ + groupId, + sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, + heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, + }); await consumer.connect(); @@ -191,6 +219,8 @@ export class KafkaTrigger implements INodeType { const startConsumer = async () => { await consumer.run({ + autoCommitInterval: options.autoCommitInterval as number || null, + autoCommitThreshold: options.autoCommitThreshold as number || null, eachMessage: async ({ topic, message }) => { let data: IDataObject = {}; From 19e4d14888aba8b430000cc79f212eb06036dd2b Mon Sep 17 00:00:00 2001 From: Michael Kret Date: Tue, 19 Jul 2022 11:59:44 +0300 Subject: [PATCH 2/3] :zap: option for maxInFlightRequests --- packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index f0bc5599cac31..5851168458bbf 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -117,6 +117,13 @@ export class KafkaTrigger implements INodeType { description: 'Heartbeats are used to ensure that the consumer\'s session stays active', hint: 'The value must be set lower than Session Timeout', }, + { + displayName: 'Max Number of Requests', + name: 'maxInFlightRequests', + type: 'number', + default: 0, + description: 'Max number of requests that may be in progress at any time. If falsey then no limit.', + }, { displayName: 'Read Messages From Beginning', name: 'fromBeginning', @@ -201,6 +208,7 @@ export class KafkaTrigger implements INodeType { const consumer = kafka.consumer({ groupId, + maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number, sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, }); From 7d7b1f3f3ea749bb6d9b61865a6bdeaa0dba5241 Mon Sep 17 00:00:00 2001 From: ricardo Date: Thu, 21 Jul 2022 18:36:51 -0400 Subject: [PATCH 3/3] :zap: Small change --- .../nodes/Kafka/KafkaTrigger.node.ts | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 5851168458bbf..a1bea2f47ef2a 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -95,19 +95,19 @@ export class KafkaTrigger implements INodeType { description: 'Whether to allow sending message to a previously non exisiting topic', }, { - displayName: 'Auto Commit Interval', - name: 'autoCommitInterval', + displayName: 'Auto Commit Threshold', + name: 'autoCommitThreshold', type: 'number', default: 0, - description: 'The consumer will commit offsets after a given period, for example, five seconds', - hint: 'Value in milliseconds', + description: 'The consumer will commit offsets after resolving a given number of messages', }, { - displayName: 'Auto Commit Threshold', - name: 'autoCommitThreshold', + displayName: 'Auto Commit Interval', + name: 'autoCommitInterval', type: 'number', default: 0, - description: 'The consumer will commit offsets after resolving a given number of messages', + description: 'The consumer will commit offsets after a given period, for example, five seconds', + hint: 'Value in milliseconds', }, { displayName: 'Heartbeat Interval', @@ -152,6 +152,13 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to return only the message property', }, + { + displayName: 'Return Headers', + name: 'returnHeaders', + type: 'boolean', + default: false, + description: 'Whether to return the headers received from Kafka', + }, { displayName: 'Session Timeout', name: 'sessionTimeout', @@ -160,13 +167,6 @@ export class KafkaTrigger implements INodeType { description: 'The time to await a response in ms', hint: 'Value in milliseconds', }, - { - displayName: 'Return Headers', - name: 'returnHeaders', - type: 'boolean', - default: false, - description: 'Whether to return the headers received from Kafka', - }, ], }, ],