Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

n8n-3667-bug-kafka-node-crashing-service - N8N-3667 #3600

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,36 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to allow sending message to a previously non exisiting topic',
},
{
displayName: 'Auto Commit Threshold',
name: 'autoCommitThreshold',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after resolving a given number of messages',
},
{
displayName: 'Auto Commit Interval',
name: 'autoCommitInterval',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after a given period, for example, five seconds',
hint: 'Value in milliseconds',
},
{
displayName: 'Heartbeat Interval',
name: 'heartbeatInterval',
type: 'number',
default: 3000,
description: 'Heartbeats are used to ensure that the consumer\'s session stays active',
hint: 'The value must be set lower than Session Timeout',
},
{
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
default: 0,
description: 'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
{
displayName: 'Read Messages From Beginning',
name: 'fromBeginning',
Expand Down Expand Up @@ -122,20 +152,21 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to return only the message property',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
},
{
displayName: 'Return Headers',
name: 'returnHeaders',
type: 'boolean',
default: false,
description: 'Whether to return the headers received from Kafka',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
hint: 'Value in milliseconds',
},
],
},
],
Expand Down Expand Up @@ -175,7 +206,12 @@ export class KafkaTrigger implements INodeType {

const kafka = new apacheKafka(config);

const consumer = kafka.consumer({ groupId });
const consumer = kafka.consumer({
groupId,
maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});

await consumer.connect();

Expand All @@ -191,6 +227,8 @@ export class KafkaTrigger implements INodeType {

const startConsumer = async () => {
await consumer.run({
autoCommitInterval: options.autoCommitInterval as number || null,
autoCommitThreshold: options.autoCommitThreshold as number || null,
eachMessage: async ({ topic, message }) => {

let data: IDataObject = {};
Expand Down