From 229f9da2906dce9813c4ad9d93385712fd95bd01 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 15 Jun 2021 18:43:57 -0600 Subject: [PATCH] producer: allow configurable max inflight when disabling idempotency This allows for higher throughput even if you want to disable idempotency, at the increased risk of more duplicates. --- pkg/kgo/config.go | 31 ++++++++++++++++++++++++++----- pkg/kgo/sink.go | 11 +++++++++-- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index bfc92362..00a5192b 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -101,6 +101,7 @@ type cfg struct { txnTimeout time.Duration acks Acks disableIdempotency bool + maxProduceInflight int // if idempotency is disabled, we allow a configurable max inflight compression []CompressionCodec // order of preference defaultProduceTopic string @@ -184,11 +185,17 @@ func (cfg *cfg) validate() error { cfg.maxPartBytes = cfg.maxBytes } - if cfg.disableIdempotency && cfg.txnID != nil { - return errors.New("cannot both disable idempotent writes and use transactional IDs") - } - if !cfg.disableIdempotency && cfg.acks.val != -1 { - return errors.New("idempotency requires acks=all") + if cfg.disableIdempotency { + if cfg.txnID != nil { + return errors.New("cannot both disable idempotent writes and use transactional IDs") + } + if cfg.maxProduceInflight <= 0 { + return fmt.Errorf("invalid max produce inflight %d with idempotency disabled", cfg.maxProduceInflight) + } + } else { + if cfg.acks.val != -1 { + return errors.New("idempotency requires acks=all") + } } for _, limit := range []struct { @@ -407,6 +414,7 @@ func defaultCfg() cfg { txnTimeout: 40 * time.Second, acks: AllISRAcks(), + maxProduceInflight: 1, compression: []CompressionCodec{SnappyCompression(), NoCompression()}, maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012 maxBufferedRecords: math.MaxInt64, @@ -733,6 +741,19 @@ func DisableIdempotentWrite() ProducerOpt { return producerOpt{func(cfg *cfg) { cfg.disableIdempotency = true }} } +// MaxProduceInflight changes the default number of maximum allowed produce +// requests in flight. +// +// This option has no effect if using idempotency. If using idempotency, the +// maximum in flight is 1 for Kafka v0.11 to v1, and then 5 from v1 onward. +// +// By default, if idempotency is disabled, the max inflight is set to 1, so +// using this option allows you to increase the max inflight, but also +// increases the risk of duplicates if there are connection issues. +func MaxProduceInflight(n int) ProducerOpt { + return producerOpt{func(cfg *cfg) { cfg.maxProduceInflight = n }} +} + // BatchCompression sets the compression codec to use for producing records. // // Compression is chosen in the order preferred based on broker support. diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 287aef4a..0b55309f 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -63,7 +63,11 @@ func (cl *Client) newSink(nodeID int32) *sink { nodeID: nodeID, produceVersion: -1, } - s.inflightSem.Store(make(chan struct{}, 1)) + maxInflight := 1 + if cl.cfg.disableIdempotency { + maxInflight = cl.cfg.maxProduceInflight + } + s.inflightSem.Store(make(chan struct{}, maxInflight)) return s } @@ -482,7 +486,10 @@ func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) { func (s *sink) firstRespCheck(version int16) { if s.produceVersion < 0 { // this is the only place this can be checked non-atomically atomic.StoreInt32(&s.produceVersion, int32(version)) - if version >= 4 { + // If idempotency is disabled, we initialize inflightSem with + // the number of inflight requests the user wants, so we do not + // override that preference here. + if version >= 4 && !s.cl.cfg.disableIdempotency { s.inflightSem.Store(make(chan struct{}, 4)) } }