Skip to content

Commit

Permalink
consumer: inject ErrClientClosing when polling if the client is closed
Browse files Browse the repository at this point in the history
This allows users to detect the client has closed in a poll loop, and
then return, rather than needing to orchestrate even more heavy handed
logic to break out of a poll loop.
  • Loading branch information
twmb committed Jun 12, 2021
1 parent f50b320 commit bc0add3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches {
// It is important to check all partition errors in the returned fetches. If
// any partition has a fatal error and actually had no records, fake fetch will
// be injected with the error.
//
// If the client is closing or has closed, a fake fetch will be injected that
// has no topic, a partition of 0, and a partition error of ErrClientClosed.
// This can be used to detect if the client is closing and to break out of a
// poll loop.
func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
if maxPollRecords == 0 {
maxPollRecords = -1
Expand Down Expand Up @@ -330,8 +335,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {

select {
case <-cl.ctx.Done():
// The client is closed: we inject an error right now, which
// will be drained immediately in the fill call just below, and
// then will be returned with our fetches.
c.addFakeReadyForDraining("", 0, ErrClientClosed)
exit()
case <-ctx.Done():
// The user canceled: no need to inject anything; just return.
exit()
case <-done:
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type FetchError struct {
// Errors returns all errors in a fetch with the topic and partition that
// errored.
//
// There are three classes of errors possible:
// There are four classes of errors possible:
//
// 1) a normal kerr.Error; these are usually the non-retriable kerr.Errors,
// but theoretically a non-retriable error can be fixed at runtime (auth
Expand All @@ -261,6 +261,8 @@ type FetchError struct {
// restarting is an option, but you may need to manually repair your
// partition.
//
// 4) an injected ErrClientClosed; this is a fatal informational error that
// is returned from every Poll call if the client has been closed.
func (fs Fetches) Errors() []FetchError {
var errs []FetchError
fs.EachError(func(t string, p int32, err error) {
Expand Down

0 comments on commit bc0add3

Please sign in to comment.