From 528f007f561ed117126fbb74e7eb7352731a6eed Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 13 May 2021 16:08:17 -0600 Subject: [PATCH] client: avoid caching invalid coordinators; allow retries If a coordinator was still loading while FindCoordinator was issued, then it would return a node of -1, and we would cache that. Nothing would ever remove this from being cached, because we only cleaned up the cache when handling responses that indicated the coordinator was invalid. Since we never would issue a request to an invalid broker (-1), then we would never clean the cache. Now, we take a multi pronged approach: - If a broker func fails for the retriable type (that handles retrying requests), we now allow retries. Previously, we only retried on request failures, not on broker func failures. - We now collapse all duplicate coordinator loads into one request, rather than issuing repeated requests. All of these collapsed requests will use the same node / error as the first. - We now look at the FindCoordinatorResponse.ErrorCode. I'm not sure why I did not before. - If we load a coordinator but the broker does not exist, we now avoid caching that result. We avoid caching on any error. --- pkg/kgo/client.go | 63 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 76d15157..4e07caf0 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -84,7 +84,7 @@ type Client struct { decompressor *decompressor coordinatorsMu sync.Mutex - coordinators map[coordinatorKey]int32 + coordinators map[coordinatorKey]*coordinatorLoad updateMetadataCh chan struct{} updateMetadataNowCh chan struct{} // like above, but with high priority @@ -166,7 +166,7 @@ func NewClient(opts ...Opt) (*Client, error) { decompressor: newDecompressor(), - coordinators: make(map[coordinatorKey]int32), + coordinators: make(map[coordinatorKey]*coordinatorLoad), updateMetadataCh: make(chan struct{}, 1), updateMetadataNowCh: make(chan struct{}, 1), @@ -531,13 +531,13 @@ start: tries++ br, err := r.br() r.last = br - if err != nil { - return nil, err - } - resp, err := r.last.waitResp(ctx, req) + var resp kmsg.Response var retryErr error - if err == nil && r.parseRetryErr != nil { - retryErr = r.parseRetryErr(resp) + if err == nil { + resp, err = r.last.waitResp(ctx, req) + if err == nil && r.parseRetryErr != nil { + retryErr = r.parseRetryErr(resp) + } } if err != nil || retryErr != nil { if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout { @@ -776,32 +776,59 @@ type coordinatorKey struct { typ int8 } +type coordinatorLoad struct { + done chan struct{} + node int32 + err error +} + // loadController returns the group/txn coordinator for the given key, retrying // as necessary. If reload is true, this does not used a cache coordinator. func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordinatorKey) (*broker, error) { cl.coordinatorsMu.Lock() - coordinator, ok := cl.coordinators[key] + c, ok := cl.coordinators[key] + if !ok { + c = &coordinatorLoad{ + done: make(chan struct{}), // all requests for the same coordinator get collapsed into one + } + defer func() { + if c.err != nil { // if our load fails, we avoid caching the coordinator + cl.coordinatorsMu.Lock() + delete(cl.coordinators, key) + cl.coordinatorsMu.Lock() + } + close(c.done) + }() + cl.coordinators[key] = c + } cl.coordinatorsMu.Unlock() if !reload && ok { - return cl.brokerOrErr(nil, coordinator, &errUnknownCoordinator{coordinator, key}) + <-c.done + if c.err != nil { + return nil, c.err + } + return cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key}) } - resp, err := (&kmsg.FindCoordinatorRequest{ + var resp *kmsg.FindCoordinatorResponse + resp, c.err = (&kmsg.FindCoordinatorRequest{ CoordinatorKey: key.name, CoordinatorType: key.typ, }).RequestWith(ctx, cl.retriable()) - if err != nil { - return nil, err + if c.err != nil { + return nil, c.err } - coordinator = resp.NodeID - cl.coordinatorsMu.Lock() - cl.coordinators[key] = coordinator - cl.coordinatorsMu.Unlock() + if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil { + return nil, c.err + } - return cl.brokerOrErr(ctx, coordinator, &errUnknownCoordinator{coordinator, key}) + c.node = resp.NodeID + var b *broker + b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key}) + return b, c.err } func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) bool {