From c7dbafb496ef07e8aea89faaaecd83a3f442b82e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 12 Feb 2021 12:14:55 -0700 Subject: [PATCH] kgo source: kill the session if FetchResponse is version < 7 If we did not kill the session, we would still use sessions even though kafka does not support it. If kafka replied with a partition error, we would stop requesting the partition and be unable to make progress. --- pkg/kgo/source.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index ae262b6c..ea2240cd 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -575,7 +575,11 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct return } - if resp.SessionID > 0 { + if resp.Version < 7 { + // If the version is less than 7, we cannot use fetch sessions, + // so we kill them on the first response. + s.session.kill() + } else if resp.SessionID > 0 { s.session.bumpEpoch(resp.SessionID) }