Skip to content

Commit

Permalink
Fetches: add EachError, clarifying documentation
Browse files Browse the repository at this point in the history
I like the Each.
  • Loading branch information
twmb committed Jun 6, 2021
1 parent 085ad30 commit 20e0f66
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,22 +241,47 @@ type FetchError struct {

// Errors returns all errors in a fetch with the topic and partition that
// errored.
//
// There are three classes of errors possible:
//
// 1) a normal kerr.Error; these are usually the non-retriable kerr.Errors,
// but theoretically a non-retriable error can be fixed at runtime (auth
// error? fix auth). It is worth restarting the client for these errors if
// you do not intend to fix this problem at runtime.
//
// 2) an injected *ErrDataLoss; these are informational, the client
// automatically resets consuming to where it should and resumes. This
// error is worth logging and investigating, but not worth restarting the
// client for.
//
// 3) an untyped batch parse failure; these are usually unrecoverable by
// restarts, and it may be best to just let the client continue. However,
// restarting is an option, but you may need to manually repair your
// partition.
//
func (fs Fetches) Errors() []FetchError {
var errs []FetchError
fs.EachError(func(t string, p int32, err error) {
errs = append(errs, FetchError{t, p, err})
})
return errs
}

// EachError calls fn for every partition that had a fetch error with the
// topic, partition, and error.
//
// This function has the same semantics as the Errors function; refer to the
// documentation on that function for what types of errors are possible.
func (fs Fetches) EachError(fn func(string, int32, error)) {
for _, f := range fs {
for _, ft := range f.Topics {
for _, fp := range ft.Partitions {
if fp.Err != nil {
errs = append(errs, FetchError{
Topic: ft.Topic,
Partition: fp.Partition,
Err: fp.Err,
})
fn(ft.Topic, fp.Partition, fp.Err)
}
}
}
}
return errs
}

// RecordIter returns an iterator over all records in a fetch.
Expand Down

0 comments on commit 20e0f66

Please sign in to comment.