From 20e0f66f894fc7eaa597a77817198aa8cbded5c7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 6 Jun 2021 01:43:07 -0600 Subject: [PATCH] Fetches: add EachError, clarifying documentation I like the Each. --- pkg/kgo/record_and_fetch.go | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 27c56a96..baeb75b3 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -241,22 +241,47 @@ type FetchError struct { // Errors returns all errors in a fetch with the topic and partition that // errored. +// +// There are three classes of errors possible: +// +// 1) a normal kerr.Error; these are usually the non-retriable kerr.Errors, +// but theoretically a non-retriable error can be fixed at runtime (auth +// error? fix auth). It is worth restarting the client for these errors if +// you do not intend to fix this problem at runtime. +// +// 2) an injected *ErrDataLoss; these are informational, the client +// automatically resets consuming to where it should and resumes. This +// error is worth logging and investigating, but not worth restarting the +// client for. +// +// 3) an untyped batch parse failure; these are usually unrecoverable by +// restarts, and it may be best to just let the client continue. However, +// restarting is an option, but you may need to manually repair your +// partition. +// func (fs Fetches) Errors() []FetchError { var errs []FetchError + fs.EachError(func(t string, p int32, err error) { + errs = append(errs, FetchError{t, p, err}) + }) + return errs +} + +// EachError calls fn for every partition that had a fetch error with the +// topic, partition, and error. +// +// This function has the same semantics as the Errors function; refer to the +// documentation on that function for what types of errors are possible. +func (fs Fetches) EachError(fn func(string, int32, error)) { for _, f := range fs { for _, ft := range f.Topics { for _, fp := range ft.Partitions { if fp.Err != nil { - errs = append(errs, FetchError{ - Topic: ft.Topic, - Partition: fp.Partition, - Err: fp.Err, - }) + fn(ft.Topic, fp.Partition, fp.Err) } } } } - return errs } // RecordIter returns an iterator over all records in a fetch.