diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 4e07caf0..f9a2696d 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -787,7 +787,7 @@ type coordinatorLoad struct { func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordinatorKey) (*broker, error) { cl.coordinatorsMu.Lock() c, ok := cl.coordinators[key] - if !ok { + if reload || !ok { c = &coordinatorLoad{ done: make(chan struct{}), // all requests for the same coordinator get collapsed into one } @@ -795,7 +795,7 @@ func (cl *Client) loadCoordinator(reload bool, ctx context.Context, key coordina if c.err != nil { // if our load fails, we avoid caching the coordinator cl.coordinatorsMu.Lock() delete(cl.coordinators, key) - cl.coordinatorsMu.Lock() + cl.coordinatorsMu.Unlock() } close(c.done) }()