Skip to content

Commit

Permalink
add more context to metadata reloads on inner partition errors
Browse files Browse the repository at this point in the history
Two cases can trigger loads of metadata due to inner errors, of which
there can be many. We added the first error's reason for fetching, but
we can do better and add every error.

This gives us larger messages that now print every error. We collapse
errors as much as possible, which should be the usual case for
kerr.Error.
  • Loading branch information
twmb committed Nov 3, 2021
1 parent 1bc1156 commit 3cbaa5f
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 13 deletions.
102 changes: 97 additions & 5 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -160,15 +161,15 @@ func (cl *Client) updateMetadataLoop() {
}
}

again, err := cl.updateMetadata()
again, err, why := cl.updateMetadata()
if again || err != nil {
if now && nowTries < 3 {
goto start
}
if err != nil {
cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err))
} else {
cl.triggerUpdateMetadata(true, "re-updating metadata due inner topic or partition error")
cl.triggerUpdateMetadata(true, why.reason("re-updating due to inner errors"))
}
}
if err == nil {
Expand All @@ -195,7 +196,7 @@ func (cl *Client) updateMetadataLoop() {
// The producer and consumer use different topic maps and underlying
// topicPartitionsData pointers, but we update those underlying pointers
// equally.
func (cl *Client) updateMetadata() (needsRetry bool, err error) {
func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateWhy) {
defer cl.metawait.signal()
defer cl.consumer.doOnMetadataUpdate()

Expand Down Expand Up @@ -235,7 +236,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
tpsProducerLoad,
err,
)
return true, err
return true, err, nil
}

// If we are consuming with regex and fetched all topics, the metadata
Expand Down Expand Up @@ -300,6 +301,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
m.isProduce,
&reloadOffsets,
stopConsumerSession,
&why,
)
}
}
Expand All @@ -311,7 +313,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) {
)
}

return needsRetry, nil
return needsRetry, nil, why
}

// fetchTopicMetadata fetches metadata for all reqTopics and returns new
Expand Down Expand Up @@ -457,6 +459,7 @@ func (cl *Client) mergeTopicPartitions(
isProduce bool,
reloadOffsets *listOrEpochLoads,
stopConsumerSession func(),
why *multiUpdateWhy,
) (needsRetry bool) {
lv := *l.load() // copy so our field writes do not collide with reads

Expand All @@ -483,6 +486,7 @@ func (cl *Client) mergeTopicPartitions(
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
}
why.add(topic, -1, r.loadErr)
return true
}

Expand Down Expand Up @@ -547,6 +551,7 @@ func (cl *Client) mergeTopicPartitions(
newTP.records.bumpRepeatedLoadErr(newTP.loadErr)
}
needsRetry = true
why.add(topic, int32(part), newTP.loadErr)
continue
}

Expand Down Expand Up @@ -616,3 +621,90 @@ func (cl *Client) mergeTopicPartitions(
}
return needsRetry
}

type multiUpdateWhy map[kerrOrString]map[string]map[int32]struct{}

type kerrOrString struct {
k *kerr.Error
s string
}

func (m *multiUpdateWhy) add(t string, p int32, err error) {
if err == nil {
return
}

if *m == nil {
*m = make(map[kerrOrString]map[string]map[int32]struct{})
}
var ks kerrOrString
if ke := (*kerr.Error)(nil); errors.As(err, &ke) {
ks = kerrOrString{k: ke}
} else {
ks = kerrOrString{s: err.Error()}
}

ts := (*m)[ks]
if ts == nil {
ts = make(map[string]map[int32]struct{})
(*m)[ks] = ts
}

ps := ts[t]
if ps == nil {
ps = make(map[int32]struct{})
ts[t] = ps
}
// -1 signals that the entire topic had an error.
if p != -1 {
ps[p] = struct{}{}
}
}

// err{topic[1 2 3] topic2[4 5 6]} err2{...}
func (m multiUpdateWhy) reason(reason string) string {
if len(m) == 0 {
return ""
}

ksSorted := make([]kerrOrString, 0, len(m))
for err := range m {
ksSorted = append(ksSorted, err)
}
sort.Slice(ksSorted, func(i, j int) bool { // order by non-nil kerr's code, otherwise the string
l, r := ksSorted[i], ksSorted[j]
return l.k != nil && (r.k == nil || l.k.Code < r.k.Code) || r.k == nil && l.s < r.s
})

var errorStrings []string
for _, ks := range ksSorted {
ts := m[ks]
tsSorted := make([]string, 0, len(ts))
for t := range ts {
tsSorted = append(tsSorted, t)
}
sort.Strings(tsSorted)

var topicStrings []string
for _, t := range tsSorted {
ps := ts[t]
if len(ps) == 0 {
topicStrings = append(topicStrings, t)
} else {
psSorted := make([]int32, 0, len(ps))
for p := range ps {
psSorted = append(psSorted, p)
}
sort.Slice(psSorted, func(i, j int) bool { return psSorted[i] < psSorted[j] })
topicStrings = append(topicStrings, fmt.Sprintf("%s%v", t, psSorted))
}
}

if ks.k != nil {
errorStrings = append(errorStrings, fmt.Sprintf("%s{%s}", ks.k.Message, strings.Join(topicStrings, " ")))
} else {
errorStrings = append(errorStrings, fmt.Sprintf("%s{%s}", ks.s, strings.Join(topicStrings, " ")))
}
}
return reason + ": " + strings.Join(errorStrings, " ")
}
11 changes: 3 additions & 8 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,12 +755,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
reloadOffsets listOrEpochLoads
preferreds []cursorOffsetPreferred
updateMeta bool
updateWhy string
setUpdateWhy = func(why string) {
if updateWhy == "" {
updateWhy = why
}
}
updateWhy multiUpdateWhy

kip320 = s.cl.supportsOffsetForLeaderEpoch()
)
Expand Down Expand Up @@ -808,7 +803,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
fp := partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks)
if fp.Err != nil {
updateMeta = true
setUpdateWhy(fmt.Sprintf("fetch topic %s partition %d has error %s causing metadata update", topic, partition, fp.Err))
updateWhy.add(topic, partition, fp.Err)
}

// We only keep the partition if it has no error, or an
Expand Down Expand Up @@ -914,7 +909,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
}
}

return f, reloadOffsets, preferreds, updateMeta, updateWhy
return f, reloadOffsets, preferreds, updateMeta, updateWhy.reason("fetch had inner topic errors")
}

// processRespPartition processes all records in all potentially compressed
Expand Down

0 comments on commit 3cbaa5f

Please sign in to comment.