From 3cbaa5f52c5a299c8ba5b00e65b8c1314de26008 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 3 Nov 2021 11:34:04 -0600 Subject: [PATCH] add more context to metadata reloads on inner partition errors Two cases can trigger loads of metadata due to inner errors, of which there can be many. We added the first error's reason for fetching, but we can do better and add every error. This gives us larger messages that now print every error. We collapse errors as much as possible, which should be the usual case for kerr.Error. --- pkg/kgo/metadata.go | 102 +++++++++++++++++++++++++++++++++++++++++--- pkg/kgo/source.go | 11 ++--- 2 files changed, 100 insertions(+), 13 deletions(-) diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 3c25fd05..55b7008b 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sort" + "strings" "sync" "time" @@ -160,7 +161,7 @@ func (cl *Client) updateMetadataLoop() { } } - again, err := cl.updateMetadata() + again, err, why := cl.updateMetadata() if again || err != nil { if now && nowTries < 3 { goto start @@ -168,7 +169,7 @@ func (cl *Client) updateMetadataLoop() { if err != nil { cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err)) } else { - cl.triggerUpdateMetadata(true, "re-updating metadata due inner topic or partition error") + cl.triggerUpdateMetadata(true, why.reason("re-updating due to inner errors")) } } if err == nil { @@ -195,7 +196,7 @@ func (cl *Client) updateMetadataLoop() { // The producer and consumer use different topic maps and underlying // topicPartitionsData pointers, but we update those underlying pointers // equally. -func (cl *Client) updateMetadata() (needsRetry bool, err error) { +func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateWhy) { defer cl.metawait.signal() defer cl.consumer.doOnMetadataUpdate() @@ -235,7 +236,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { tpsProducerLoad, err, ) - return true, err + return true, err, nil } // If we are consuming with regex and fetched all topics, the metadata @@ -300,6 +301,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { m.isProduce, &reloadOffsets, stopConsumerSession, + &why, ) } } @@ -311,7 +313,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { ) } - return needsRetry, nil + return needsRetry, nil, why } // fetchTopicMetadata fetches metadata for all reqTopics and returns new @@ -457,6 +459,7 @@ func (cl *Client) mergeTopicPartitions( isProduce bool, reloadOffsets *listOrEpochLoads, stopConsumerSession func(), + why *multiUpdateWhy, ) (needsRetry bool) { lv := *l.load() // copy so our field writes do not collide with reads @@ -483,6 +486,7 @@ func (cl *Client) mergeTopicPartitions( topicPartition.records.bumpRepeatedLoadErr(lv.loadErr) } } + why.add(topic, -1, r.loadErr) return true } @@ -547,6 +551,7 @@ func (cl *Client) mergeTopicPartitions( newTP.records.bumpRepeatedLoadErr(newTP.loadErr) } needsRetry = true + why.add(topic, int32(part), newTP.loadErr) continue } @@ -616,3 +621,90 @@ func (cl *Client) mergeTopicPartitions( } return needsRetry } + +type multiUpdateWhy map[kerrOrString]map[string]map[int32]struct{} + +type kerrOrString struct { + k *kerr.Error + s string +} + +func (m *multiUpdateWhy) add(t string, p int32, err error) { + if err == nil { + return + } + + if *m == nil { + *m = make(map[kerrOrString]map[string]map[int32]struct{}) + } + var ks kerrOrString + if ke := (*kerr.Error)(nil); errors.As(err, &ke) { + ks = kerrOrString{k: ke} + } else { + ks = kerrOrString{s: err.Error()} + } + + ts := (*m)[ks] + if ts == nil { + ts = make(map[string]map[int32]struct{}) + (*m)[ks] = ts + } + + ps := ts[t] + if ps == nil { + ps = make(map[int32]struct{}) + ts[t] = ps + } + // -1 signals that the entire topic had an error. + if p != -1 { + ps[p] = struct{}{} + } +} + +// err{topic[1 2 3] topic2[4 5 6]} err2{...} +func (m multiUpdateWhy) reason(reason string) string { + if len(m) == 0 { + return "" + } + + ksSorted := make([]kerrOrString, 0, len(m)) + for err := range m { + ksSorted = append(ksSorted, err) + } + sort.Slice(ksSorted, func(i, j int) bool { // order by non-nil kerr's code, otherwise the string + l, r := ksSorted[i], ksSorted[j] + return l.k != nil && (r.k == nil || l.k.Code < r.k.Code) || r.k == nil && l.s < r.s + }) + + var errorStrings []string + for _, ks := range ksSorted { + ts := m[ks] + tsSorted := make([]string, 0, len(ts)) + for t := range ts { + tsSorted = append(tsSorted, t) + } + sort.Strings(tsSorted) + + var topicStrings []string + for _, t := range tsSorted { + ps := ts[t] + if len(ps) == 0 { + topicStrings = append(topicStrings, t) + } else { + psSorted := make([]int32, 0, len(ps)) + for p := range ps { + psSorted = append(psSorted, p) + } + sort.Slice(psSorted, func(i, j int) bool { return psSorted[i] < psSorted[j] }) + topicStrings = append(topicStrings, fmt.Sprintf("%s%v", t, psSorted)) + } + } + + if ks.k != nil { + errorStrings = append(errorStrings, fmt.Sprintf("%s{%s}", ks.k.Message, strings.Join(topicStrings, " "))) + } else { + errorStrings = append(errorStrings, fmt.Sprintf("%s{%s}", ks.s, strings.Join(topicStrings, " "))) + } + } + return reason + ": " + strings.Join(errorStrings, " ") +} diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 58e23ece..6086969c 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -755,12 +755,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe reloadOffsets listOrEpochLoads preferreds []cursorOffsetPreferred updateMeta bool - updateWhy string - setUpdateWhy = func(why string) { - if updateWhy == "" { - updateWhy = why - } - } + updateWhy multiUpdateWhy kip320 = s.cl.supportsOffsetForLeaderEpoch() ) @@ -808,7 +803,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe fp := partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks) if fp.Err != nil { updateMeta = true - setUpdateWhy(fmt.Sprintf("fetch topic %s partition %d has error %s causing metadata update", topic, partition, fp.Err)) + updateWhy.add(topic, partition, fp.Err) } // We only keep the partition if it has no error, or an @@ -914,7 +909,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } } - return f, reloadOffsets, preferreds, updateMeta, updateWhy + return f, reloadOffsets, preferreds, updateMeta, updateWhy.reason("fetch had inner topic errors") } // processRespPartition processes all records in all potentially compressed