From 938651e4cea7ef30581683062b8890c86ed9ab97 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 14 Mar 2021 15:18:26 -0600 Subject: [PATCH] consumer & consumer group: small redux, bug fixes ATOMIC STORES ============= This commit switches the consumer type to be stored in an atomic value, rather than a uint8 type that specifies which pointer to use guarded by a mutex. This switch fundamentally arises from trying to unblock metadata updates while a group consumer is leaving the group. Previously, we had to grab the consumer lock to check the consumer type to check if we were consuming with regex. We avoid that now. This actually makes a bunch of other areas simpler as well -- many places needed the group consumer to do some logic on the group consumer directly. Previously, we had to grab the consumer lock, and for simplicity we held it through the function. Holding it was unnecessary, and now we avoid grabbing the lock at all. Anything that sets the consumer value grabs a new dedicated assignMu. The dedicated assignMu allows us to unblock a clean group leave, which may (in a shortly incoming commit) grab the consumer mu to assign partitions on revoke. We do not have to worry about TOCTOU: the guarantee is things work in order. If a person concurrently modifies something, they may change the outcome of stuff that was set into sequence by original events, but the outcome is still sound according to our client. Particularly, a later metadata update will trigger the right sequence for the new assignment. Same type of logic with offset setting, but people should not be doing that concurrently with assigning and whatnot. UPDATES & LOCK ORDERING FIXES ============================= This is the bulk of this commit that mostly fixes some lock orderings and missing locks. This should fix the panic in #24 by at least logging on when it would be detected and continuing, however the bug itself is still a mystery. The debug logs about what the balance results were should help, though, if this crops up again. There are a few lock ordering fixes in here which are now documented extensively. Notably, PollFetches needs the consumer mu, and there is a huge reason as to why. The prerevoke and revoke logic, and how we ensure things are done before returning sometimes, is all more extensively documented. Lastly, all instances of assignPartitions is now properly guarded by the consumer mutex. Prior, some instances were not. --- pkg/kgo/atomic_maybe_work.go | 19 ++ pkg/kgo/client.go | 20 +- pkg/kgo/consumer.go | 200 +++++++++--- pkg/kgo/consumer_direct.go | 13 +- pkg/kgo/consumer_group.go | 568 ++++++++++++++++++++--------------- pkg/kgo/metadata.go | 11 +- pkg/kgo/txn.go | 31 +- 7 files changed, 533 insertions(+), 329 deletions(-) diff --git a/pkg/kgo/atomic_maybe_work.go b/pkg/kgo/atomic_maybe_work.go index 15fddef5..d52c302d 100644 --- a/pkg/kgo/atomic_maybe_work.go +++ b/pkg/kgo/atomic_maybe_work.go @@ -2,6 +2,25 @@ package kgo import "sync/atomic" +// a helper type for some places +type atomicBool uint32 + +func (b *atomicBool) set(v bool) { + if v { + atomic.StoreUint32((*uint32)(b), 1) + } else { + atomic.StoreUint32((*uint32)(b), 0) + } +} + +func (b *atomicBool) get() bool { + v := atomic.LoadUint32((*uint32)(b)) + if v == 1 { + return true + } + return false +} + const ( stateUnstarted = iota stateWorking diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 1023ddbd..acee2e30 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -180,8 +180,7 @@ func NewClient(opts ...Opt) (*Client, error) { metadone: make(chan struct{}), } cl.producer.init() - cl.consumer.cl = cl - cl.consumer.sourcesReadyCond = sync.NewCond(&cl.consumer.sourcesReadyMu) + cl.consumer.init(cl) cl.topics.Store(make(map[string]*topicPartitions)) cl.metawait.init() @@ -403,19 +402,12 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) { // Close leaves any group and closes all connections and goroutines. func (cl *Client) Close() { - // First, kill the consumer. Setting dead to true and then assigning - // nothing will - // 1) invalidate active fetches - // 2) ensure consumptions are unassigned, stopping all source filling - // 3) ensures no more assigns can happen - cl.consumer.mu.Lock() - if cl.consumer.dead { // client already closed - cl.consumer.mu.Unlock() - return + // First, kill the consumer. This waits for the consumer to unset + // gracefully, ensuring we leave groups properly, and then stores the + // dead consumer, meaning no more assigns can happen. + if wasDead := cl.consumer.kill(); wasDead { + return // client was already closed } - cl.consumer.dead = true - cl.consumer.mu.Unlock() - cl.AssignPartitions() // Now we kill the client context and all brokers, ensuring all // requests fail. This will finish all producer callbacks and diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index e03a479a..92e3fcec 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -77,22 +77,33 @@ func (o Offset) At(at int64) Offset { return o } -type consumerType uint8 - -const ( - consumerTypeUnset consumerType = iota - consumerTypeDirect - consumerTypeGroup -) - type consumer struct { cl *Client - // mu guards this block specifically - mu sync.Mutex - group *groupConsumer - direct *directConsumer - typ consumerType + // assignMu is grabbed when setting v (AssignGroup, AssignDirect, or Close) + // mu is grabbed when + // - polling fetches, for quickly draining sources / updating group uncommitted + // - calling assignPartitions (group / direct updates) + // + // v is atomic for non-locking reads in a few instances where that + // is preferrable / allowed. + assignMu sync.Mutex + mu sync.Mutex + v atomic.Value // *consumerValue + + // On metadata update, if the consumer is set (direct or group), the + // client begins a goroutine that updates the consumer kind's + // assignments. + // + // This is done in a goroutine to not block the metadata loop, because + // the update **could** wait on a group consumer leaving if a + // concurrent AssignGroup is called (very low risk vector). + // + // The update realistically should be instantaneous, but if it is slow, + // some metadata updates could pile up. We loop with our atomic work + // loop, which collapses repeated updates into one extra update, so we + // loop as little as necessary. + outstandingMetadataUpdates workLoop // sessionChangeMu is grabbed when a session is stopped and held through // when a session can be started again. The sole purpose is to block an @@ -122,16 +133,79 @@ func (u *usedCursors) use(c *cursor) { (*u)[c] = struct{}{} } -// unset, called under the consumer mu, transitions the group to the unset +var consumerUnsetSentinel = new(consumerValue) +var consumerDeadSentinel = new(consumerValue) + +func init() { + consumerUnsetSentinel.v = &consumerUnsetSentinel + consumerDeadSentinel.v = &consumerDeadSentinel +} + +type consumerValue struct { + // Options: + // - consumerUnsetSentinel + // - *directConsumer + // - *groupConsumer + // - consumerDeadSentinel + v interface{} +} + +func (c *consumer) init(cl *Client) { + c.cl = cl + c.sourcesReadyCond = sync.NewCond(&c.sourcesReadyMu) + c.v.Store(consumerUnsetSentinel) +} + +func (c *consumer) loadKind() interface{} { return c.v.Load().(*consumerValue).v } +func (c *consumer) loadGroup() (*groupConsumer, bool) { + g, ok := c.loadKind().(*groupConsumer) + return g, ok +} +func (c *consumer) loadDirect() (*directConsumer, bool) { + d, ok := c.loadKind().(*directConsumer) + return d, ok +} + +func (c *consumer) storeDirect(d *directConsumer) { c.v.Store(&consumerValue{v: d}) } // while locked +func (c *consumer) storeGroup(g *groupConsumer) { c.v.Store(&consumerValue{v: g}) } // while locked + +func (c *consumer) kill() (wasDead bool) { + c.assignMu.Lock() + wasDead, wait := c.unset() + c.v.Store(consumerDeadSentinel) + c.assignMu.Unlock() + + wait() + return wasDead +} + +func (c *consumer) unsetAndWait() (wasDead bool) { + wasDead, wait := c.unset() + wait() + return wasDead +} + +// unset, called under the assign mu, transitions the group to the unset // state, invalidating old assignments and leaving a group if it was in one. -func (c *consumer) unset() { +// +// This returns a function to wait for a group to be left, if in one. +func (c *consumer) unset() (wasDead bool, wait func()) { + c.mu.Lock() + defer c.mu.Unlock() + c.assignPartitions(nil, assignInvalidateAll) - if c.typ == consumerTypeGroup { - c.group.leave() + + prior := c.loadKind() + wasDead = prior == consumerDeadSentinel + if !wasDead { + c.v.Store(consumerUnsetSentinel) + } + + wait = func() {} + if g, ok := prior.(*groupConsumer); ok { + wait = g.leave() } - c.typ = consumerTypeUnset - c.direct = nil - c.group = nil + return wasDead, wait } // addSourceReadyForDraining tracks that a source needs its buffered fetch @@ -171,12 +245,34 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { var fetches Fetches fill := func() { + // A group can grab the consumer lock then the group mu and + // assign partitions. The group mu is grabbed to update its + // uncommitted map. Assigning partitions clears sources ready + // for draining. + // + // We need to grab the consumer mu to ensure proper lock + // ordering and prevent lock inversion. Polling fetches also + // updates the group's uncommitted map; if we do not grab the + // consumer mu at the top, we have a problem: without the lock, + // we could have grabbed some sources, then a group assigned, + // and after the assign, we update uncommitted with fetches + // from the old assignment + c.mu.Lock() + defer c.mu.Unlock() + c.sourcesReadyMu.Lock() - defer c.sourcesReadyMu.Unlock() for _, ready := range c.sourcesReadyForDraining { fetches = append(fetches, ready.takeBuffered()) } c.sourcesReadyForDraining = nil + realFetches := fetches + fetches = append(fetches, c.fakeReadyForDraining...) + c.fakeReadyForDraining = nil + c.sourcesReadyMu.Unlock() + + if len(realFetches) == 0 { + return + } // Before returning, we want to update our uncommitted. If we // updated after, then we could end up with weird interactions @@ -187,17 +283,9 @@ func (cl *Client) PollFetches(ctx context.Context) Fetches { // session to start. If we returned stale fetches that did not // have their uncommitted offset tracked, then we would allow // duplicates. - // - // We grab the consumer mu because a concurrent client close - // could happen. - c.mu.Lock() - if c.typ == consumerTypeGroup && len(fetches) > 0 { - c.group.updateUncommitted(fetches) + if g, ok := c.loadGroup(); ok { + g.updateUncommitted(realFetches) } - c.mu.Unlock() - - fetches = append(fetches, c.fakeReadyForDraining...) - c.fakeReadyForDraining = nil } fill() @@ -354,7 +442,12 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how clientTopics := c.cl.loadTopics() for topic, partitions := range assignments { - topicParts := clientTopics[topic].load() // must be non-nil, which is ensured in Assign<> or in metadata when consuming as regex + topicPartitions := clientTopics[topic] // should be non-nil + if topicPartitions == nil { + c.cl.cfg.logger.Log(LogLevelError, "BUG! consumer was assigned topic that we did not ask for in AssignGroup nor AssignDirect, skipping!", "topic", topic) + continue + } + topicParts := topicPartitions.load() for partition, offset := range partitions { // First, if the request is exact, get rid of the relative @@ -410,23 +503,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } func (c *consumer) doOnMetadataUpdate() { - c.mu.Lock() - defer c.mu.Unlock() - - switch c.typ { - case consumerTypeUnset: + switch c.loadKind().(type) { + case *directConsumer: + case *groupConsumer: + default: return - case consumerTypeDirect: - c.assignPartitions(c.direct.findNewAssignments(c.cl.loadTopics()), assignWithoutInvalidating) - case consumerTypeGroup: - c.group.findNewAssignments(c.cl.loadTopics()) } - go c.loadSession().doOnMetadataUpdate() + // See the comment on the outstandingMetadataUpdates field for why this + // block below. + if c.outstandingMetadataUpdates.maybeBegin() { + doUpdate := func() { + switch t := c.loadKind().(type) { + case *directConsumer: + if new := t.findNewAssignments(c.cl.loadTopics()); len(new) > 0 { + c.mu.Lock() + c.assignPartitions(new, assignWithoutInvalidating) + c.mu.Unlock() + } + case *groupConsumer: + t.findNewAssignments(c.cl.loadTopics()) + } + + go c.loadSession().doOnMetadataUpdate() + } + + go func() { + again := true + for again { + doUpdate() + again = c.outstandingMetadataUpdates.maybeFinish(false) + } + }() + } + } func (s *consumerSession) doOnMetadataUpdate() { - if s == nil { // no session started yet + if s == nil || s == noConsumerSession { // no session started yet return } diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index 52458ab5..e76cbd11 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -62,10 +62,13 @@ type directConsumer struct { // This takes ownership of any assignments. func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt) { c := &cl.consumer - c.mu.Lock() - defer c.mu.Unlock() - c.unset() + c.assignMu.Lock() + defer c.assignMu.Unlock() + + if wasDead := c.unsetAndWait(); wasDead { + return + } d := &directConsumer{ topics: make(map[string]Offset), @@ -80,8 +83,8 @@ func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt) { if len(d.topics) == 0 && len(d.partitions) == 0 || c.dead { return } - c.typ = consumerTypeDirect - c.direct = d + + c.storeDirect(d) defer cl.triggerUpdateMetadata() diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 997e6db3..2c71ba78 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2,7 +2,7 @@ package kgo import ( "context" - "errors" + "fmt" "regexp" "sort" "sync" @@ -12,8 +12,6 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" ) -var errLeftGroup = errors.New("left group or client closed") - // GroupOpt is an option to configure group consuming. type GroupOpt interface { apply(*groupConsumer) @@ -200,38 +198,94 @@ func InstanceID(id string) GroupOpt { return groupOpt{func(cfg *groupConsumer) { cfg.instanceID = &id }} } +// GroupProtocol sets the group's join protocol, overriding the default value +// "consumer". The only reason to override this is if you are implementing +// custom join and sync group logic. +func GroupProtocol(protocol string) GroupOpt { + return groupOpt{func(cfg *groupConsumer) { cfg.protocol = protocol }} +} + type groupConsumer struct { c *consumer // used to change consumer state; generally c.mu is grabbed on access cl *Client // used for running requests / adding to topics map ctx context.Context cancel func() - manageDone chan struct{} - dying bool + manageDone chan struct{} // closed once when the manage goroutine quits + + ///////////////////////// + // configuration block // + ///////////////////////// id string // group we are in + instanceID *string // optional, our instance ID topics map[string]struct{} // topics we are interested in balancers []GroupBalancer // balancers we can use + protocol string // "consumer" by default, expected to never be overridden cooperative bool // whether all balancers are cooperative + sessionTimeout time.Duration + rebalanceTimeout time.Duration + heartbeatInterval time.Duration + requireStable bool + + onAssigned func(context.Context, map[string][]int32) + onRevoked func(context.Context, map[string][]int32) + onLost func(context.Context, map[string][]int32) + + autocommitDisable bool // true if autocommit was disabled or we are transactional + autocommitInterval time.Duration + + /////////////////////// + // configuration end // + /////////////////////// + + // regexTopics is configuration, but used exclusively with reSeen, + // which is updated in findNewAssignments. If our assignment is for + // regular expressions, then we put every topic that we have passed + // against all our regex into reSeen. This avoids us re-evaluating + // topics in our regex on future metadata assignments. + regexTopics bool + reSeen map[string]struct{} + + // Full lock grabbed in BlockingCommitOffsets, read lock grabbed in + // CommitOffsets, this lock ensures that only one blocking commit can + // happen at once, and if it is happening, no other commit can be + // happening. + blockingCommitMu sync.RWMutex + + rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) + + // The following two are only updated in the manager / join&sync loop + lastAssigned map[string][]int32 // only updated in join&sync loop + nowAssigned map[string][]int32 // only updated in join&sync loop + + groupExtraTopics map[string]struct{} // TODO TODO TODO + + // leader is whether we are the leader right now. This is set to false + // + // - set to false at the beginning of a join group session + // - set to true if join group response indicates we are leader + // - read on metadata updates in findNewAssignments + leader atomicBool + + // Set to true when ending a transaction committing transaction + // offsets, and then set to false immediately after before calling + // EndTransaction. + offsetsAddedToTxn bool + ////////////// // mu block // ////////////// mu sync.Mutex - // leader is whether we are the leader right now. This is set to false - // at the beginning of a join group session, and updated if we are - // chosen to be the leader. This is read on metadata updates when - // finding new assignments. - leader bool - // using is updated when finding new assignments, we always add to this // if we want to consume a topic (or see there are more potential // partitions). Only the leader can trigger a new group session if there // are simply more partitions for existing topics. // // This is read when joining a group or leaving a group. - using map[string]int // topics we are currently using => # partitions known in that topic + using map[string]int // topics *we* are currently using => # partitions known in that topic // uncommitted is read and updated all over: // - updated before PollFetches returns @@ -253,38 +307,19 @@ type groupConsumer struct { memberID string generation int32 - //////////// - // mu end // - //////////// - + // commitCancel and commitDone are set under mu before firing off an + // async commit request. If another commit happens, it cancels the + // prior commit, waits for the prior to be done, and then starts its + // own. commitCancel func() commitDone chan struct{} - blockingCommitMu sync.RWMutex - - rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) - - regexTopics bool - reSeen map[string]struct{} - - instanceID *string - lastAssigned map[string][]int32 // only updated in join&sync loop - nowAssigned map[string][]int32 // only updated in join&sync loop - - sessionTimeout time.Duration - rebalanceTimeout time.Duration - heartbeatInterval time.Duration - requireStable bool - - onAssigned func(context.Context, map[string][]int32) - onRevoked func(context.Context, map[string][]int32) - onLost func(context.Context, map[string][]int32) - - blockAuto bool - autocommitDisable bool - autocommitInterval time.Duration + // blockAuto is set and cleared in {,Blocking}CommitOffsets to block + // autocommitting if autocommitting is active. This ensures that an + // autocommit does not cancel the user's manual commit. + blockAuto bool - offsetsAddedToTxn bool + dying bool // set when closing, read in findNewAssignments } // LeaveGroup leaves a group if in one. Calling the client's Close function @@ -297,7 +332,11 @@ type groupConsumer struct { // manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka // scripts or kcl). func (cl *Client) LeaveGroup() { - cl.AssignPartitions() + c := &cl.consumer + c.assignMu.Lock() + _, wait := cl.consumer.unset() + c.assignMu.Unlock() + wait() } // AssignGroup assigns a group to consume from, overriding any prior @@ -311,10 +350,13 @@ func (cl *Client) LeaveGroup() { // It is recommended to do one final blocking commit before leaving a group. func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { c := &cl.consumer - c.mu.Lock() - defer c.mu.Unlock() - c.unset() + c.assignMu.Lock() + defer c.assignMu.Unlock() + + if wasDead := c.unsetAndWait(); wasDead { + return + } ctx, cancel := context.WithCancel(cl.ctx) g := &groupConsumer{ @@ -330,6 +372,7 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { balancers: []GroupBalancer{ CooperativeStickyBalancer(), }, + protocol: "consumer", cooperative: true, // default yes, potentially canceled below by our balancers using: make(map[string]int), @@ -356,8 +399,8 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { for _, balancer := range g.balancers { g.cooperative = g.cooperative && balancer.isCooperative() } - c.typ = consumerTypeGroup - c.group = g + + c.storeGroup(g) // Ensure all topics exist so that we will fetch their metadata. if !g.regexTopics { @@ -376,6 +419,12 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { cl.triggerUpdateMetadata() } +// Manages the group consumer's join / sync / heartbeat / fetch offset flow. +// +// Once a group is assigned, we fire a metadata request for all topics the +// assignment specified interest in. Only after we finally have some topic +// metadata do we join the group, and once joined, this management runs in a +// dedicated goroutine until the group is left. func (g *groupConsumer) manage() { defer close(g.manageDone) g.cl.cfg.logger.Log(LogLevelInfo, "beginning to manage the group lifecycle") @@ -401,34 +450,39 @@ func (g *groupConsumer) manage() { g.onRevoked(g.ctx, g.nowAssigned) } - // If we are eager, we should have invalidated - // everything before getting here, but we do so doubly - // just in case. + // If we are eager, we should have invalidated everything + // before getting here, but we do so doubly just in case. // - // If we are cooperative, the join and sync could have - // failed during the cooperative rebalance where we - // were still consuming. - // - // We need to invalidate everything. - g.c.assignPartitions(nil, assignInvalidateAll) - g.nowAssigned = nil + // If we are cooperative, the join and sync could have failed + // during the cooperative rebalance where we were still + // consuming. We need to invalidate everything. + { + g.c.mu.Lock() + g.c.assignPartitions(nil, assignInvalidateAll) + g.mu.Lock() // before allowing poll to touch uncommitted, lock the group + g.c.mu.Unlock() // now part of poll can continue + g.uncommitted = nil + g.mu.Unlock() - // TODO check if this lock && assign is necessary. - g.mu.Lock() - g.uncommitted = nil - g.mu.Unlock() + g.nowAssigned = nil + g.lastAssigned = nil + + g.leader.set(false) + } + + if err == context.Canceled { // context was canceled, quit now + return + } // Waiting for the backoff is a good time to update our // metadata; maybe the error is from stale metadata. consecutiveErrors++ backoff := g.cl.cfg.retryBackoff(consecutiveErrors) - if err != errLeftGroup && err != context.Canceled { // if we left the group we return below - g.cl.cfg.logger.Log(LogLevelError, "join and sync loop errored", - "err", err, - "consecutive_errors", consecutiveErrors, - "backoff", backoff, - ) - } + g.cl.cfg.logger.Log(LogLevelError, "join and sync loop errored", + "err", err, + "consecutive_errors", consecutiveErrors, + "backoff", backoff, + ) deadline := time.Now().Add(backoff) g.cl.waitmeta(g.ctx, backoff) after := time.NewTimer(time.Until(deadline)) @@ -441,7 +495,7 @@ func (g *groupConsumer) manage() { } } -func (g *groupConsumer) leave() { +func (g *groupConsumer) leave() (wait func()) { g.cancel() // If g.using is nonzero before this check, then a manage goroutine has @@ -450,39 +504,38 @@ func (g *groupConsumer) leave() { g.dying = true wasManaging := len(g.using) > 0 g.mu.Unlock() - if wasManaging { - // Leaving a group waits for the managing goroutine to be done, - // which can block in a users onAssign/onRevoke/onLost. This - // can block a metadata update from completing, so we unlock - // the consumer mu to ensure that does not happen. - // - // TODO: fix this, since unlocking the consumer mu means - // another asignment can happen. This is a low risk vector, - // since it is unlikely that multiple assignments will be - // happening for a client, instead we expect one assignment to - // consume and one to leave. - g.c.mu.Unlock() - <-g.manageDone - g.c.mu.Lock() - } - if g.instanceID == nil { - g.cl.cfg.logger.Log(LogLevelInfo, - "leaving group", - "group", g.id, - "memberID", g.memberID, // lock not needed now since nothing can change it (manageDone) - ) - (&kmsg.LeaveGroupRequest{ - Group: g.id, - MemberID: g.memberID, - Members: []kmsg.LeaveGroupRequestMember{{ + done := make(chan struct{}) + go func() { + defer close(done) + + if wasManaging { + // We want to wait for the manage goroutine to be done + // so that we call the user's on{Assign,RevokeLost}. + <-g.manageDone + } + + if g.instanceID == nil { + g.cl.cfg.logger.Log(LogLevelInfo, + "leaving group", + "group", g.id, + "memberID", g.memberID, // lock not needed now since nothing can change it (manageDone) + ) + (&kmsg.LeaveGroupRequest{ + Group: g.id, MemberID: g.memberID, - // no instance ID - }}, - }).RequestWith(g.cl.ctx, g.cl) - } + Members: []kmsg.LeaveGroupRequestMember{{ + MemberID: g.memberID, + // no instance ID + }}, + }).RequestWith(g.cl.ctx, g.cl) + } + }() + + return func() { <-done } } +// returns the difference of g.nowAssigned and g.lastAssigned. func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { if g.lastAssigned == nil { return g.nowAssigned, nil @@ -491,8 +544,9 @@ func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { added = make(map[string][]int32, len(g.nowAssigned)) lost = make(map[string][]int32, len(g.nowAssigned)) - // First we loop over lastAssigned to find what was lost, or what was - // added to topics we were working on. + // First, we diff lasts: any topic in last but not now is lost, + // otherwise, (1) new partitions are added, (2) common partitions are + // ignored, and (3) partitions no longer in now are lost. lasts := make(map[int32]struct{}, 100) for topic, lastPartitions := range g.lastAssigned { nowPartitions, exists := g.nowAssigned[topic] @@ -523,7 +577,7 @@ func (g *groupConsumer) diffAssigned() (added, lost map[string][]int32) { } } - // We loop again over nowAssigned to add entirely new topics to added. + // Finally, any new topics in now assigned are strictly added. for topic, nowPartitions := range g.nowAssigned { if _, exists := g.lastAssigned[topic]; !exists { added[topic] = nowPartitions @@ -565,9 +619,12 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32) { g.nowAssigned = nil // We are setting uncommitted to nil _after_ the heartbeat loop - // already invalidated everything. After setting this here, - // nothing should be able to recreate uncommitted until a - // future fetch after the group is rejoined. + // already called assignPartitions(nil, assignInvalidateAll). + // After nilling uncommitted here, nothing should recreate + // uncommitted until a future fetch after the group is + // rejoined. This _can_ be broken with a manual SetOffsets or + // with {,Blocking}CommitOffsets but we explicitly document not + // to do that outside the context of a live group session. g.mu.Lock() g.uncommitted = nil g.mu.Unlock() @@ -593,9 +650,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32) { // // We want to invalidate buffered fetches since they may // contain partitions that we lost, and we do not want a future - // poll to return those fetches. We could be smarter and knife - // out only partitions we lost, but it is simpler to just drop - // everything. + // poll to return those fetches. lostOffsets := make(map[string]map[int32]Offset, len(lost)) for lostTopic, lostPartitions := range lost { @@ -606,14 +661,17 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32) { lostOffsets[lostTopic] = lostPartitionOffsets } - // We must invalidate before calling onRevoke, because we want - // to allow commits in onRevoke to be the FINAL offsets; we do - // not want to allow new fetches for revoked partitions after a - // call to revoke before we invalidate those partitions. + // We must invalidate before revoking and before updating + // uncommitted, because we want any commits in onRevoke to be + // for the final polled offsets. We do not want to allow the + // logical race of allowing fetches for revoked partitions + // after a revoke but before an invalidation. + g.c.mu.Lock() g.c.assignPartitions(lostOffsets, assignInvalidateMatching) + g.c.mu.Unlock() } - if len(lost) != 0 || stage == revokeThisSession { + if len(lost) > 0 || stage == revokeThisSession { if len(lost) == 0 { g.cl.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke at the end of a session even though no partitions were lost") } else { @@ -628,14 +686,8 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32) { return } - // cooperative consumers need to rejoin after they revoke what they - // lost. - defer g.rejoin() + defer g.rejoin() // cooperative consumers rejoin after they revoking what they lost - // If committing, users should be waiting for the commit to finish in - // onRevoke, which would complete updating the uncommitted map. But, if - // they are not, we avoid racing on g.uncommitted. - // // The block below deletes everything lost from our uncommitted map. // All commits should be **completed** by the time this runs. An async // commit can undo what we do below. The default revoke runs a blocking @@ -678,6 +730,9 @@ func newAssignRevokeSession() *assignRevokeSession { } } +// For cooperative consumers, the first thing a cooperative consumer does is to +// diff its last assignment and its new assignment and revoke anything lost. +// We call this a "prerevoke". func (s *assignRevokeSession) prerevoke(g *groupConsumer, lost map[string][]int32) <-chan struct{} { go func() { defer close(s.prerevokeDone) @@ -702,6 +757,15 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][] return s.assignDone } +// At the end of a group session, before we leave the heartbeat loop, we call +// revoke. For non-cooperative consumers, this revokes everything in the +// current session, and before revoking, we invalidate all partitions. For the +// cooperative consumer, this does nothing but does notify the client that a +// revoke has begun / the group session is ending. +// +// This may not run before returning from the heartbeat loop: if we encounter a +// fatal error, we return before revoking so that we can instead call onLost in +// the manage loop. func (s *assignRevokeSession) revoke(g *groupConsumer) <-chan struct{} { go func() { defer close(s.revokeDone) @@ -713,14 +777,23 @@ func (s *assignRevokeSession) revoke(g *groupConsumer) <-chan struct{} { return s.revokeDone } +// This chunk of code "pre" revokes lost partitions for the cooperative +// consumer and then begins heartbeating while fetching offsets. This returns +// when heartbeating errors (or if fetch offsets errors). +// +// Before returning, this function ensures that +// - onAssigned is complete +// - which ensures that pre revoking is complete +// - fetching is complete +// - heartbeating is complete func (g *groupConsumer) setupAssignedAndHeartbeat() error { hbErrCh := make(chan error, 1) fetchErrCh := make(chan error, 1) s := newAssignRevokeSession() added, lost := g.diffAssigned() - g.cl.cfg.logger.Log(LogLevelInfo, "new group session begun", "assigned", added, "lost", lost) - s.prerevoke(g, lost) + g.cl.cfg.logger.Log(LogLevelInfo, "new group session begun", "added", added, "lost", lost) + s.prerevoke(g, lost) // for cooperative consumers // Since we have joined the group, we immediately begin heartbeating. // This will continue until the heartbeat errors, the group is killed, @@ -734,7 +807,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { // We immediately begin fetching offsets. We want to wait until the // fetch function returns, since it assumes within it that another - // assign has not happened (it assigns partitions itself). Returning + // assign cannot happen (it assigns partitions itself). Returning // before the fetch completes would be not good. // // The difference between fetchDone and fetchErrCh is that fetchErrCh @@ -760,10 +833,14 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error { // Ensuring assigning is done ensures two things: // // * that we wait for for prerevoking to be done, which updates the - // uncommitted field. Waiting for that ensures that a rejoin and poll - // doesn't have weird concurrent interaction. + // uncommitted field. Waiting for that ensures that a rejoin and poll + // does not have weird concurrent interaction. // // * that our onLost will not be concurrent with onAssign + // + // We especially need to wait here because heartbeating may not + // necessarily run onRevoke before returning (because of a fatal + // error). s.assign(g, added) defer func() { <-s.assignDone }() @@ -786,6 +863,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio ticker := time.NewTicker(g.heartbeatInterval) defer ticker.Stop() + // We issue one heartbeat quickly if we are cooperative because + // cooperative consumers rejoin the group immediately, and we want to + // detect that in 500ms rather than 3s. var cooperativeFastCheck <-chan time.Time if g.cooperative { cooperativeFastCheck = time.After(500 * time.Millisecond) @@ -823,7 +903,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // manage goroutine will race with us setting // nowAssigned. ctxCh = nil - err = errLeftGroup + err = context.Canceled } if heartbeat { @@ -841,6 +921,10 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio g.cl.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "err", err) } + // The first error either triggers a clean revoke and metadata + // update or it returns immediately. If we triggered the + // revoke, we wait for it to complete regardless of any future + // error. if didMetadone && didRevoke { g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat loop complete", "err", lastErr) return lastErr @@ -852,6 +936,8 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio if lastErr == nil { g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "err", err) + } else { + g.cl.cfg.logger.Log(LogLevelInfo, "heartbeat errored again while waiting for user revoke to finish", "err", err) } // Since we errored, we must revoke. @@ -859,18 +945,21 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio // If we are an eager consumer, we stop fetching all of // our current partitions as we will be revoking them. if !g.cooperative { + g.c.mu.Lock() g.c.assignPartitions(nil, assignInvalidateAll) + g.c.mu.Unlock() } // If our error is not from rebalancing, then we - // encountered IllegalGeneration or UnknownMemberID, - // both of which are unexpected and unrecoverable. + // encountered IllegalGeneration or UnknownMemberID or + // our context closed all of which are unexpected and + // unrecoverable. // // We return early rather than revoking and updating // metadata; the groupConsumer's manage function will // call onLost with all partitions. // - // The caller still wait for the session's onAssigned + // setupAssignedAndHeartbeat still waits for onAssigned // to be done so that we avoid calling onLost // concurrently. if err != kerr.RebalanceInProgress && revoked == nil { @@ -901,29 +990,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio } } -// We need to lock to set the leader due to the potential for a concurrent -// findNewAssignments. -func (g *groupConsumer) setLeader() { - g.mu.Lock() - defer g.mu.Unlock() - g.leader = true -} - -// prejoin, called at the beginning of joinAndSync, ensures we are no longer -// the leader and that the rejoinCh is drained. -func (g *groupConsumer) prejoin() { - g.mu.Lock() - defer g.mu.Unlock() - g.leader = false - - select { - case <-g.rejoinCh: - default: - } -} - -// rejoin is called if we are leader: this ensures the heartbeat loop will -// see we need to rejoin. +// rejoin is called after a cooperative member revokes what it lost at the +// beginning of a session, or if we are leader and detect new partitions to +// consume. func (g *groupConsumer) rejoin() { select { case g.rejoinCh <- struct{}{}: @@ -931,21 +1000,24 @@ func (g *groupConsumer) rejoin() { } } -var clientGroupProtocol = "consumer" // in the Java API, the standard client is the "consumer" protocol; `var` so we can take the address TODO make client configurable? - // Joins and then syncs, issuing the two slow requests in goroutines to allow // for group cancelation to return early. func (g *groupConsumer) joinAndSync() error { g.cl.cfg.logger.Log(LogLevelInfo, "joining group") - g.prejoin() + g.leader.set(false) start: + select { + case <-g.rejoinCh: // drain to avoid unnecessary rejoins + default: + } + var ( joinReq = &kmsg.JoinGroupRequest{ Group: g.id, SessionTimeoutMillis: int32(g.sessionTimeout.Milliseconds()), RebalanceTimeoutMillis: int32(g.rebalanceTimeout.Milliseconds()), - ProtocolType: clientGroupProtocol, + ProtocolType: g.protocol, MemberID: g.memberID, InstanceID: g.instanceID, Protocols: g.joinGroupProtocols(), @@ -985,7 +1057,7 @@ start: Generation: g.generation, MemberID: g.memberID, InstanceID: g.instanceID, - ProtocolType: &clientGroupProtocol, + ProtocolType: &g.protocol, Protocol: &protocol, GroupAssignment: plan.intoAssignment(), // nil unless we are the leader } @@ -994,7 +1066,7 @@ start: synced = make(chan struct{}) ) - g.cl.cfg.logger.Log(LogLevelInfo, "syncing", "protocol_type", clientGroupProtocol, "protocol", protocol) + g.cl.cfg.logger.Log(LogLevelInfo, "syncing", "protocol_type", g.protocol, "protocol", protocol) go func() { defer close(synced) syncResp, err = syncReq.RequestWith(g.ctx, g.cl) @@ -1014,7 +1086,7 @@ start: g.cl.cfg.logger.Log(LogLevelInfo, "sync failed with RebalanceInProgress, rejoining") goto start } - g.cl.cfg.logger.Log(LogLevelWarn, "sync group failed", err) + g.cl.cfg.logger.Log(LogLevelWarn, "sync group failed", "err", err) return err } @@ -1053,6 +1125,7 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo leader := resp.LeaderID == resp.MemberID if leader { + g.leader.set(true) g.cl.cfg.logger.Log(LogLevelInfo, "joined, balancing group", "memberID", g.memberID, "instanceID", g.instanceID, @@ -1066,7 +1139,6 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo if err != nil { return } - g.setLeader() } else { g.cl.cfg.logger.Log(LogLevelInfo, "joined", @@ -1194,8 +1266,17 @@ start: } } - // Grab the group lock before assigning so that we can update the - // uncommitted map before a Poll/Commit by the user. + clientTopics := g.c.cl.loadTopics() + for fetchedTopic := range offsets { + if _, exists := clientTopics[fetchedTopic]; !exists { + delete(offsets, fetchedTopic) + g.cl.cfg.logger.Log(LogLevelError, "BUG! member was assigned topic that we did not ask for in AssignGroup! skipping assigning this topic!", "topic", fetchedTopic) + } + } + + // Lock for assign and then updating uncommitted. + g.c.mu.Lock() + defer g.c.mu.Unlock() g.mu.Lock() defer g.mu.Unlock() @@ -1237,42 +1318,33 @@ start: return nil } -// findNewAssignments is called under the consumer lock at the end of a -// metadata update, updating the topics the group wants to use and other -// metadata. +// findNewAssignments updates topics the group wants to use and other metadata. +// We only grab the group mu at the end if we need to. // // This joins the group if // - the group has never been joined // - new topics are found for consuming (changing this consumer's join metadata) // // Additionally, if the member is the leader, this rejoins the group if the -// leader notices new partitions in an existing topic. This only focuses on -// topics the leader itself owns; it can be added in the future to focus on all -// topics, which would support groups that consume disparate topics. Ideally, -// this is uncommon. This does not rejoin if the leader notices a partition is -// lost, which is finicky. +// leader notices new partitions in an existing topic. +// +// This does not rejoin if the leader notices a partition is lost, which is +// finicky. func (g *groupConsumer) findNewAssignments(topics map[string]*topicPartitions) { - g.mu.Lock() - defer g.mu.Unlock() - - if g.dying { - return - } - type change struct { isNew bool delta int } - var numNew int + var numNewTopics int toChange := make(map[string]change, len(topics)) for topic, topicPartitions := range topics { numPartitions := len(topicPartitions.load().partitions) // If we are already using this topic, add that it changed if // there are more partitions than we were using prior. if used, exists := g.using[topic]; exists { - if numPartitions-used > 0 { - toChange[topic] = change{delta: numPartitions - used} + if added := numPartitions - used; added > 0 { + toChange[topic] = change{delta: added} } continue } @@ -1301,7 +1373,7 @@ func (g *groupConsumer) findNewAssignments(topics map[string]*topicPartitions) { continue } toChange[topic] = change{isNew: true, delta: numPartitions} - numNew++ + numNewTopics++ } } @@ -1310,6 +1382,13 @@ func (g *groupConsumer) findNewAssignments(topics map[string]*topicPartitions) { return } + g.mu.Lock() + defer g.mu.Unlock() + + if g.dying { + return + } + wasManaging := len(g.using) != 0 for topic, change := range toChange { g.using[topic] += change.delta @@ -1317,9 +1396,10 @@ func (g *groupConsumer) findNewAssignments(topics map[string]*topicPartitions) { if !wasManaging { go g.manage() + return } - if numNew > 0 || g.leader { + if numNewTopics > 0 || g.leader.get() { g.rejoin() } } @@ -1394,6 +1474,7 @@ func (g *groupConsumer) updateCommitted( } if g.uncommitted == nil || // just in case len(req.Topics) != len(resp.Topics) { // bad kafka + g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Num topics in request: %d, in reply: %d, we cannot handle this!", len(req.Topics), len(resp.Topics))) return } @@ -1411,6 +1492,7 @@ func (g *groupConsumer) updateCommitted( if topic == nil || // just in case reqTopic.Topic != respTopic.Topic || // bad kafka len(reqTopic.Partitions) != len(respTopic.Partitions) { // same + g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic at request index %d: %s, reply at index: %s; num partitions on request topic: %d, in reply: %d, we cannot handle this!", i, reqTopic.Topic, respTopic.Topic, len(reqTopic.Partitions), len(respTopic.Partitions))) continue } @@ -1425,9 +1507,15 @@ func (g *groupConsumer) updateCommitted( reqPart := &reqTopic.Partitions[i] respPart := &respTopic.Partitions[i] uncommit, exists := topic[respPart.Partition] - if !exists || // just in case - respPart.ErrorCode != 0 || // bad commit - reqPart.Partition != respPart.Partition { // bad kafka + if !exists { // just in case + continue + } + if reqPart.Partition != respPart.Partition { // bad kafka + g.cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("Kafka replied to our OffsetCommitRequest incorrectly! Topic %s partition %d != resp partition %d", reqTopic.Topic, reqPart.Partition, respPart.Partition)) + continue + } + if respPart.ErrorCode != 0 { + g.cl.cfg.logger.Log(LogLevelWarn, "unable to commit offset for topic partition", "topic", reqTopic.Topic, "partition", reqPart.Partition, "error_code", respPart.ErrorCode) continue } @@ -1478,25 +1566,32 @@ func (g *groupConsumer) loopCommit() { } } -// SetOffsets sets any matching offsets in setOffsets to the given -// epoch/offset. Partitions that are not specified are not set. +// SetOffsets, for consumer groups, sets any matching offsets in setOffsets to +// the given epoch/offset. Partitions that are not specified are not set. It is +// invalid to set topics that were not yet returned from a PollFetches. // // If using transactions, it is advised to just use a GroupTransactSession and // avoid this function entirely. +// +// It is strongly recommended to use this function outside of the context of a +// PollFetches loop and only when you know the group is not revoked (i.e., +// block any concurrent revoke while issuing this call). Any other usage is +// prone to odd interactions. func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { if len(setOffsets) == 0 { return } + // We assignPartitions before returning, so we grab the consumer lock + // first to preserve consumer mu => group mu ordering. c := &cl.consumer c.mu.Lock() defer c.mu.Unlock() - if c.typ != consumerTypeGroup { + g, ok := c.loadGroup() + if !ok { return } - - g := c.group g.mu.Lock() defer g.mu.Unlock() @@ -1505,7 +1600,7 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { // The gist of what follows: // // We need to set uncommitted.committed; that is the guarantee of this - // function. However, if, for everything we are setting the head equals + // function. However, if, for everything we are setting, the head equals // the commit, then we do not need to actually invalidate our current // assignments or buffered fetches. // @@ -1574,12 +1669,11 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { // If using a cooperative balancer, commits while consuming during rebalancing // may fail with REBALANCE_IN_PROGRESS. func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset { - cl.consumer.mu.Lock() - defer cl.consumer.mu.Unlock() - if cl.consumer.typ != consumerTypeGroup { + g, ok := cl.consumer.loadGroup() + if !ok { return nil } - return cl.consumer.group.getUncommitted() + return g.getUncommitted() } // CommittedOffsets returns the latest committed offsets. Committed offsets are @@ -1587,16 +1681,10 @@ func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset { // // If there are no committed offsets, this returns nil. func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset { - c := &cl.consumer - - c.mu.Lock() - defer c.mu.Unlock() - - if c.typ != consumerTypeGroup { + g, ok := cl.consumer.loadGroup() + if !ok { return nil } - - g := c.group g.mu.Lock() defer g.mu.Unlock() @@ -1618,7 +1706,7 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo for topic, partitions := range g.uncommitted { var topicUncommitted map[int32]EpochOffset for partition, uncommit := range partitions { - if uncommit.head == uncommit.committed { + if head && uncommit.head == uncommit.committed { continue } if topicUncommitted == nil { @@ -1665,51 +1753,46 @@ func (cl *Client) BlockingCommitOffsets( done := make(chan struct{}) defer func() { <-done }() - func() { // anonymous func called immediately for the defers - cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted) - defer cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets") + cl.cfg.logger.Log(LogLevelDebug, "in BlockingCommitOffsets", "with", uncommitted) + defer cl.cfg.logger.Log(LogLevelDebug, "left BlockingCommitOffsets") - if onDone == nil { - onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {} - } + if onDone == nil { + onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {} + } - cl.consumer.mu.Lock() - defer cl.consumer.mu.Unlock() + g, ok := cl.consumer.loadGroup() + if !ok { + onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup) + close(done) + return + } + if len(uncommitted) == 0 { + onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil) + close(done) + return + } - if cl.consumer.typ != consumerTypeGroup { - onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup) - close(done) - return - } - if len(uncommitted) == 0 { - onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), nil) - close(done) - return - } + g.blockingCommitMu.Lock() // block all other concurrent commits until our OnDone is done. - g := cl.consumer.group - g.blockingCommitMu.Lock() // block all other concurrent commits until our OnDone is done. + unblockCommits := func(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + defer close(done) + defer g.blockingCommitMu.Unlock() + onDone(req, resp, err) + } - unblockCommits := func(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - defer close(done) - defer g.blockingCommitMu.Unlock() - onDone(req, resp, err) - } + g.mu.Lock() + go func() { + defer g.mu.Unlock() - g.mu.Lock() - go func() { + g.blockAuto = true + unblockAuto := func(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { + unblockCommits(req, resp, err) + g.mu.Lock() defer g.mu.Unlock() + g.blockAuto = false + } - g.blockAuto = true - unblockAuto := func(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - unblockCommits(req, resp, err) - g.mu.Lock() - defer g.mu.Unlock() - g.blockAuto = false - } - - g.commit(ctx, uncommitted, unblockAuto) - }() + g.commit(ctx, uncommitted, unblockAuto) }() } @@ -1756,9 +1839,9 @@ func (cl *Client) CommitOffsets( if onDone == nil { onDone = func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, _ error) {} } - cl.consumer.mu.Lock() - defer cl.consumer.mu.Unlock() - if cl.consumer.typ != consumerTypeGroup { + + g, ok := cl.consumer.loadGroup() + if !ok { onDone(new(kmsg.OffsetCommitRequest), new(kmsg.OffsetCommitResponse), ErrNotGroup) return } @@ -1767,7 +1850,6 @@ func (cl *Client) CommitOffsets( return } - g := cl.consumer.group g.blockingCommitMu.RLock() // block BlockingCommit, but allow other concurrent Commit to cancel us unblockSyncCommit := func(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index d370c6f5..e8ecba63 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -218,10 +218,13 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error) { // fetchTopicMetadata fetches metadata for all reqTopics and returns new // topicPartitionsData for each topic. func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicPartitionsData, bool, error) { - cl.consumer.mu.Lock() - all := cl.consumer.typ == consumerTypeDirect && cl.consumer.direct.regexTopics || - cl.consumer.typ == consumerTypeGroup && cl.consumer.group.regexTopics - cl.consumer.mu.Unlock() + var all bool + switch v := cl.consumer.loadKind().(type) { + case *groupConsumer: + all = v.regexTopics + case *directConsumer: + all = v.regexTopics + } _, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics) if err != nil { return nil, all, err diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 07078453..7359449f 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -63,15 +63,10 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr cl: cl, } - c := &cl.consumer - c.mu.Lock() - defer c.mu.Unlock() - - if c.typ != consumerTypeGroup { - return nil // invalid, but we will let the caller handle this + g, ok := cl.consumer.loadGroup() + if !ok { + return nil // concurrent Assign; users should not do this } - - g := c.group g.mu.Lock() defer g.mu.Unlock() @@ -328,10 +323,8 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) atomic.StoreUint32(&cl.producer.producingTxn, 0) // forbid any new produces while ending txn defer func() { - cl.consumer.mu.Lock() - defer cl.consumer.mu.Unlock() - if cl.consumer.typ == consumerTypeGroup { - cl.consumer.group.offsetsAddedToTxn = false + if g, ok := cl.consumer.loadGroup(); ok { + g.offsetsAddedToTxn = false } }() @@ -457,11 +450,10 @@ func (cl *Client) commitTransactionOffsets( cl.producer.txnMu.Unlock() return } - cl.consumer.mu.Lock() cl.producer.txnMu.Unlock() - defer cl.consumer.mu.Unlock() - if cl.consumer.typ != consumerTypeGroup { + g, ok := cl.consumer.loadGroup() + if !ok { onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), ErrNotGroup) return } @@ -470,7 +462,6 @@ func (cl *Client) commitTransactionOffsets( return } - g := cl.consumer.group g.mu.Lock() defer g.mu.Unlock() @@ -515,7 +506,8 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error { } // commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types -// and we avoid updateCommitted. +// and we avoid updateCommitted. We avoid updating because we manually +// SetOffsets when ending the transaction. func (g *groupConsumer) commitTxn( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, @@ -547,14 +539,13 @@ func (g *groupConsumer) commitTxn( // The id must have been set at least once by this point because of // addOffsetsToTxn. id, epoch, _ := g.cl.producerID() - memberID := g.memberID req := &kmsg.TxnOffsetCommitRequest{ TransactionalID: *g.cl.cfg.txnID, Group: g.id, ProducerID: id, ProducerEpoch: epoch, Generation: g.generation, - MemberID: memberID, + MemberID: g.memberID, InstanceID: g.instanceID, } @@ -592,7 +583,7 @@ func (g *groupConsumer) commitTxn( Partition: partition, Offset: eo.Offset, LeaderEpoch: eo.Epoch, - Metadata: &memberID, + Metadata: &req.MemberID, }) } }