diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 0f835469..13b425e7 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -340,7 +340,9 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er c.sourcesReadyCond.Broadcast() } -func errFetch(err error) Fetches { +// NewErrFetch returns a fake fetch containing a single empty topic with a +// single zero partition with the given error. +func NewErrFetch(err error) Fetches { return []Fetch{{ Topics: []FetchTopic{{ Topic: "", @@ -408,7 +410,7 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { if ctx != nil { select { case <-ctx.Done(): - return errFetch(ctx.Err()) + return NewErrFetch(ctx.Err()) default: } } @@ -511,10 +513,10 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches { select { case <-cl.ctx.Done(): exit() - return errFetch(ErrClientClosed) + return NewErrFetch(ErrClientClosed) case <-ctx.Done(): exit() - return errFetch(ctx.Err()) + return NewErrFetch(ctx.Err()) case <-done: }