From 3a7f35e53ca2d3b12830126ebac16bc27905b031 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 10 Mar 2023 20:06:37 -0700 Subject: [PATCH] kgo.Client: add UpdateFetchMaxBytes One final knob to control how much a user fetches and thus keeps in memory. Closes #375. --- pkg/kgo/atomic_maybe_work.go | 12 ++++++++++++ pkg/kgo/config.go | 8 ++++---- pkg/kgo/consumer.go | 8 ++++++++ pkg/kgo/source.go | 4 ++-- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/atomic_maybe_work.go b/pkg/kgo/atomic_maybe_work.go index 10e51d6e..bfdd3c1d 100644 --- a/pkg/kgo/atomic_maybe_work.go +++ b/pkg/kgo/atomic_maybe_work.go @@ -1,5 +1,7 @@ package kgo +import "sync/atomic" + const ( stateUnstarted = iota stateWorking @@ -62,3 +64,13 @@ func (l *workLoop) maybeFinish(again bool) bool { func (l *workLoop) hardFinish() { l.state.Store(stateUnstarted) } + +// lazyI32 is used in a few places where we want atomics _sometimes_. Some +// uses do not need to be atomic (notably, setup), and we do not want the +// noCopy guard. +// +// Specifically, this is used for a few int32 settings in the config. +type lazyI32 int32 + +func (v *lazyI32) store(s int32) { atomic.StoreInt32((*int32)(v), s) } +func (v *lazyI32) load() int32 { return atomic.LoadInt32((*int32)(v)) } diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index d2c3aa71..4d3e80cb 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -133,8 +133,8 @@ type cfg struct { maxWait int32 minBytes int32 - maxBytes int32 - maxPartBytes int32 + maxBytes lazyI32 + maxPartBytes lazyI32 resetOffset Offset isolationLevel int8 keepControl bool @@ -1132,7 +1132,7 @@ func FetchMaxWait(wait time.Duration) ConsumerOpt { // recommended to set this option so that decompression does not eat all of // your RAM. func FetchMaxBytes(b int32) ConsumerOpt { - return consumerOpt{func(cfg *cfg) { cfg.maxBytes = b }} + return consumerOpt{func(cfg *cfg) { cfg.maxBytes = lazyI32(b) }} } // FetchMinBytes sets the minimum amount of bytes a broker will try to send @@ -1154,7 +1154,7 @@ func FetchMinBytes(b int32) ConsumerOpt { // // This corresponds to the Java max.partition.fetch.bytes setting. func FetchMaxPartitionBytes(b int32) ConsumerOpt { - return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = b }} + return consumerOpt{func(cfg *cfg) { cfg.maxPartBytes = lazyI32(b) }} } // MaxConcurrentFetches sets the maximum number of fetch requests to allow in diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 1fd64d6a..abe1e039 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -527,6 +527,14 @@ func (cl *Client) AllowRebalance() { cl.consumer.allowRebalance() } +// UpdateFetchMaxBytes updates the max bytes that a fetch request will ask for +// and the max partition bytes that a fetch request will ask for each +// partition. +func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) { + cl.cfg.maxBytes.store(maxBytes) + cl.cfg.maxPartBytes.store(maxPartBytes) +} + // PauseFetchTopics sets the client to no longer fetch the given topics and // returns all currently paused topics. Paused topics persist until resumed. // You can call this function with no topics to simply receive the list of diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0f115024..b255026d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -435,8 +435,8 @@ func (s *source) createReq() *fetchRequest { req := &fetchRequest{ maxWait: s.cl.cfg.maxWait, minBytes: s.cl.cfg.minBytes, - maxBytes: s.cl.cfg.maxBytes, - maxPartBytes: s.cl.cfg.maxPartBytes, + maxBytes: s.cl.cfg.maxBytes.load(), + maxPartBytes: s.cl.cfg.maxPartBytes.load(), rack: s.cl.cfg.rack, isolationLevel: s.cl.cfg.isolationLevel, preferLagFn: s.cl.cfg.preferLagFn,