Skip to content

Commit

Permalink
producer: allow configurable max inflight when disabling idempotency
Browse files Browse the repository at this point in the history
This allows for higher throughput even if you want to disable
idempotency, at the increased risk of more duplicates.
  • Loading branch information
twmb committed Jun 16, 2021
1 parent 5047b31 commit 229f9da
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
31 changes: 26 additions & 5 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type cfg struct {
txnTimeout time.Duration
acks Acks
disableIdempotency bool
maxProduceInflight int // if idempotency is disabled, we allow a configurable max inflight
compression []CompressionCodec // order of preference

defaultProduceTopic string
Expand Down Expand Up @@ -184,11 +185,17 @@ func (cfg *cfg) validate() error {
cfg.maxPartBytes = cfg.maxBytes
}

if cfg.disableIdempotency && cfg.txnID != nil {
return errors.New("cannot both disable idempotent writes and use transactional IDs")
}
if !cfg.disableIdempotency && cfg.acks.val != -1 {
return errors.New("idempotency requires acks=all")
if cfg.disableIdempotency {
if cfg.txnID != nil {
return errors.New("cannot both disable idempotent writes and use transactional IDs")
}
if cfg.maxProduceInflight <= 0 {
return fmt.Errorf("invalid max produce inflight %d with idempotency disabled", cfg.maxProduceInflight)
}
} else {
if cfg.acks.val != -1 {
return errors.New("idempotency requires acks=all")
}
}

for _, limit := range []struct {
Expand Down Expand Up @@ -407,6 +414,7 @@ func defaultCfg() cfg {

txnTimeout: 40 * time.Second,
acks: AllISRAcks(),
maxProduceInflight: 1,
compression: []CompressionCodec{SnappyCompression(), NoCompression()},
maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012
maxBufferedRecords: math.MaxInt64,
Expand Down Expand Up @@ -733,6 +741,19 @@ func DisableIdempotentWrite() ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.disableIdempotency = true }}
}

// MaxProduceInflight changes the default number of maximum allowed produce
// requests in flight.
//
// This option has no effect if using idempotency. If using idempotency, the
// maximum in flight is 1 for Kafka v0.11 to v1, and then 5 from v1 onward.
//
// By default, if idempotency is disabled, the max inflight is set to 1, so
// using this option allows you to increase the max inflight, but also
// increases the risk of duplicates if there are connection issues.
func MaxProduceInflight(n int) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.maxProduceInflight = n }}
}

// BatchCompression sets the compression codec to use for producing records.
//
// Compression is chosen in the order preferred based on broker support.
Expand Down
11 changes: 9 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func (cl *Client) newSink(nodeID int32) *sink {
nodeID: nodeID,
produceVersion: -1,
}
s.inflightSem.Store(make(chan struct{}, 1))
maxInflight := 1
if cl.cfg.disableIdempotency {
maxInflight = cl.cfg.maxProduceInflight
}
s.inflightSem.Store(make(chan struct{}, maxInflight))
return s
}

Expand Down Expand Up @@ -482,7 +486,10 @@ func (s *sink) requeueUnattemptedReq(req *produceRequest, err error) {
func (s *sink) firstRespCheck(version int16) {
if s.produceVersion < 0 { // this is the only place this can be checked non-atomically
atomic.StoreInt32(&s.produceVersion, int32(version))
if version >= 4 {
// If idempotency is disabled, we initialize inflightSem with
// the number of inflight requests the user wants, so we do not
// override that preference here.
if version >= 4 && !s.cl.cfg.disableIdempotency {
s.inflightSem.Store(make(chan struct{}, 4))
}
}
Expand Down

0 comments on commit 229f9da

Please sign in to comment.