diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index febb23ff..966f942f 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -854,6 +854,14 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe keep = true } + case kerr.UnknownTopicID: + // We need to keep UnknownTopicID even though it is + // retryable, because encountering this error means + // the topic has been recreated and we will never + // consume the topic again anymore. This is an error + // worth bubbling up. + keep = true + case kerr.OffsetOutOfRange: // If we are out of range, we reset to what we can. // With Kafka >= 2.1, we should only get offset out