From bc0add30464852f2188026987f8f8dea862ce351 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 11 Jun 2021 23:26:56 -0600 Subject: [PATCH] consumer: inject ErrClientClosing when polling if the client is closed This allows users to detect the client has closed in a poll loop, and then return, rather than needing to orchestrate even more heavy handed logic to break out of a poll loop. --- pkg/kgo/consumer.go | 10 ++++++++++ pkg/kgo/record_and_fetch.go | 4 +++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index eb9fed7b..5ecef21f 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -238,6 +238,11 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { // It is important to check all partition errors in the returned fetches. If // any partition has a fatal error and actually had no records, fake fetch will // be injected with the error. +// +// If the client is closing or has closed, a fake fetch will be injected that +// has no topic, a partition of 0, and a partition error of ErrClientClosed. +// This can be used to detect if the client is closing and to break out of a +// poll loop. func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { if maxPollRecords == 0 { maxPollRecords = -1 @@ -330,8 +335,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { select { case <-cl.ctx.Done(): + // The client is closed: we inject an error right now, which + // will be drained immediately in the fill call just below, and + // then will be returned with our fetches. + c.addFakeReadyForDraining("", 0, ErrClientClosed) exit() case <-ctx.Done(): + // The user canceled: no need to inject anything; just return. exit() case <-done: } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index b62e8c5f..d4b5db50 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -244,7 +244,7 @@ type FetchError struct { // Errors returns all errors in a fetch with the topic and partition that // errored. // -// There are three classes of errors possible: +// There are four classes of errors possible: // // 1) a normal kerr.Error; these are usually the non-retriable kerr.Errors, // but theoretically a non-retriable error can be fixed at runtime (auth @@ -261,6 +261,8 @@ type FetchError struct { // restarting is an option, but you may need to manually repair your // partition. // +// 4) an injected ErrClientClosed; this is a fatal informational error that +// is returned from every Poll call if the client has been closed. func (fs Fetches) Errors() []FetchError { var errs []FetchError fs.EachError(func(t string, p int32, err error) {