-
-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
config: add producer option to configure transactions backoff duration #133
Conversation
pkg/kgo/config.go
Outdated
@@ -1037,6 +1039,12 @@ func TransactionTimeout(timeout time.Duration) ProducerOpt { | |||
return producerOpt{func(cfg *cfg) { cfg.txnTimeout = timeout }} | |||
} | |||
|
|||
// ConcurrentTransactionBackoff sets the backoff interval to use during | |||
// transactional requests in case we encounter CONCURRENT_TRANSACTIONS error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this documentation be expanded to,
// ConcurrentTransactionBackoff sets the backoff interval to use during
// transactional requests in case we encounter CONCURRENT_TRANSACTIONS error,
// overriding the default 20ms.
//
// Sometimes, when a client begins a transaction quickly enough after finishing
// a previous one, Kafka will return a CONCURRENT_TRANSACTIONS error. Clients
// are expected to backoff slightly and retry the operation. Lower backoffs may
// increase load on the brokers, while higher backoffs may increase transaction
// latency in clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/kgo/config.go
Outdated
@@ -1037,6 +1039,12 @@ func TransactionTimeout(timeout time.Duration) ProducerOpt { | |||
return producerOpt{func(cfg *cfg) { cfg.txnTimeout = timeout }} | |||
} | |||
|
|||
// ConcurrentTransactionBackoff sets the backoff interval to use during | |||
// transactional requests in case we encounter CONCURRENT_TRANSACTIONS error. | |||
func ConcurrentTransactionBackoff(backoff time.Duration) ProducerOpt { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be moved below into a ConsumerOpt? Or perhaps above to just a plain Opt, since transactions kinda encompass both producing & consuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Looks great, thank you! |
And lower the default from 100ms to 20ms in line with Java Kafka client