Skip to content

Commit

Permalink
client: avoid caching invalid coordinators; allow retries
Browse files Browse the repository at this point in the history
If a coordinator was still loading while FindCoordinator was issued,
then it would return a node of -1, and we would cache that. Nothing
would ever remove this from being cached, because we only cleaned up the
cache when handling responses that indicated the coordinator was
invalid. Since we never would issue a request to an invalid broker (-1),
then we would never clean the cache.

Now, we take a multi pronged approach:

- If a broker func fails for the retriable type (that handles retrying
  requests), we now allow retries. Previously, we only retried on
request failures, not on broker func failures.

- We now collapse all duplicate coordinator loads into one request,
  rather than issuing repeated requests. All of these collapsed requests
will use the same node / error as the first.

- We now look at the FindCoordinatorResponse.ErrorCode. I'm not sure why
  I did not before.

- If we load a coordinator but the broker does not exist, we now avoid
  caching that result. We avoid caching on any error.
  • Loading branch information
twmb committed May 13, 2021
1 parent b3753fa commit 528f007
Showing 1 changed file with 45 additions and 18 deletions.
63 changes: 45 additions & 18 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Client struct {
decompressor *decompressor

coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]int32
coordinators map[coordinatorKey]*coordinatorLoad

updateMetadataCh chan struct{}
updateMetadataNowCh chan struct{} // like above, but with high priority
Expand Down Expand Up @@ -166,7 +166,7 @@ func NewClient(opts ...Opt) (*Client, error) {

decompressor: newDecompressor(),

coordinators: make(map[coordinatorKey]int32),
coordinators: make(map[coordinatorKey]*coordinatorLoad),

updateMetadataCh: make(chan struct{}, 1),
updateMetadataNowCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -531,13 +531,13 @@ start:
tries++
br, err := r.br()
r.last = br
if err != nil {
return nil, err
}
resp, err := r.last.waitResp(ctx, req)
var resp kmsg.Response
var retryErr error
if err == nil && r.parseRetryErr != nil {
retryErr = r.parseRetryErr(resp)
if err == nil {
resp, err = r.last.waitResp(ctx, req)
if err == nil && r.parseRetryErr != nil {
retryErr = r.parseRetryErr(resp)
}
}
if err != nil || retryErr != nil {
if retryTimeout == 0 || time.Since(tryStart) <= retryTimeout {
Expand Down Expand Up @@ -776,32 +776,59 @@ type coordinatorKey struct {
typ int8
}

type coordinatorLoad struct {
done chan struct{}
node int32
err error
}

// loadController returns the group/txn coordinator for the given key, retrying
// as necessary. If reload is true, this does not used a cache coordinator.
func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordinatorKey) (*broker, error) {
cl.coordinatorsMu.Lock()
coordinator, ok := cl.coordinators[key]
c, ok := cl.coordinators[key]
if !ok {
c = &coordinatorLoad{
done: make(chan struct{}), // all requests for the same coordinator get collapsed into one
}
defer func() {
if c.err != nil { // if our load fails, we avoid caching the coordinator
cl.coordinatorsMu.Lock()
delete(cl.coordinators, key)
cl.coordinatorsMu.Lock()
}
close(c.done)
}()
cl.coordinators[key] = c
}
cl.coordinatorsMu.Unlock()

if !reload && ok {
return cl.brokerOrErr(nil, coordinator, &errUnknownCoordinator{coordinator, key})
<-c.done
if c.err != nil {
return nil, c.err
}
return cl.brokerOrErr(nil, c.node, &errUnknownCoordinator{c.node, key})
}

resp, err := (&kmsg.FindCoordinatorRequest{
var resp *kmsg.FindCoordinatorResponse
resp, c.err = (&kmsg.FindCoordinatorRequest{
CoordinatorKey: key.name,
CoordinatorType: key.typ,
}).RequestWith(ctx, cl.retriable())

if err != nil {
return nil, err
if c.err != nil {
return nil, c.err
}

coordinator = resp.NodeID
cl.coordinatorsMu.Lock()
cl.coordinators[key] = coordinator
cl.coordinatorsMu.Unlock()
if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil {
return nil, c.err
}

return cl.brokerOrErr(ctx, coordinator, &errUnknownCoordinator{coordinator, key})
c.node = resp.NodeID
var b *broker
b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key})
return b, c.err
}

func (cl *Client) maybeDeleteStaleCoordinator(name string, typ int8, err error) bool {
Expand Down

0 comments on commit 528f007

Please sign in to comment.