diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index ae262b6c..ea2240cd 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -575,7 +575,11 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct return } - if resp.SessionID > 0 { + if resp.Version < 7 { + // If the version is less than 7, we cannot use fetch sessions, + // so we kill them on the first response. + s.session.kill() + } else if resp.SessionID > 0 { s.session.bumpEpoch(resp.SessionID) }