From b6759bc75398c51912ccbfd3f573a97c8a68f904 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 11 Nov 2021 18:36:58 -0700 Subject: [PATCH] consumer: allow disabling fetch sessions with a config opt Sometimes users may not want these. --- pkg/kgo/config.go | 30 ++++++++++++++++++++++++++++++ pkg/kgo/source.go | 3 +++ 2 files changed, 33 insertions(+) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 4c3a6dda..fb177605 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -135,6 +135,7 @@ type cfg struct { rack string maxConcurrentFetches int + disableFetchSessions bool topics map[string]*regexp.Regexp // topics to consume; if regex is true, values are compiled regular expressions partitions map[string]map[int32]Offset // partitions to directly consume from @@ -1193,6 +1194,35 @@ func ConsumeRegex() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.regex = true }} } +// DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+). +// +// A "fetch session" is is a way to reduce bandwidth for fetch requests & +// responses, and to potentially reduce the amount of work that brokers have to +// do to handle fetch requests. A fetch session opts in to the broker tracking +// some state of what the client is interested in. For example, say that you +// are interested in thousands of topics, and most of these topics are +// receiving data only rarely. A fetch session allows the client to register +// that it is interested in those thousands of topics on the first request. On +// future requests, if the offsets for these topics have not changed, those +// topics will be elided from the request. The broker knows to reply with the +// extra topics if any new data is available, otherwise the topics are also +// elided from the response. This massively reduces the amount of information +// that needs to be included in requests or responses. +// +// Using fetch sessions means more state is stored on brokers. Maintaining this +// state eats some memory. If you have thousands of consumers, you may not want +// fetch sessions to be used for everything. Brokers intelligently handle this +// by not creating sessions if they are at their configured limit, but you may +// consider disabling sessions if they are generally not useful to you. Brokers +// have metrics for the number of fetch sessions active, so you can monitor +// that to determine whether enabling or disabling sessions is beneficial or +// not. +// +// For more details on fetch sessions, see KIP-227. +func DisableFetchSessions() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.disableFetchSessions = true }} +} + ////////////////////////////////// // CONSUMER GROUP CONFIGURATION // ////////////////////////////////// diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 4dae4090..4b9a8f20 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -49,6 +49,9 @@ func (cl *Client) newSource(nodeID int32) *source { nodeID: nodeID, sem: make(chan struct{}), } + if cl.cfg.disableFetchSessions { + s.session.kill() + } close(s.sem) return s }