diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 61f3ce67..872b98c1 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -707,12 +707,19 @@ func (c *consumer) purgeTopics(topics []string) { for _, topic := range topics { delete(c.d.using, topic) delete(c.d.reSeen, topic) + delete(c.d.m, topic) } } } // AddConsumeTopics adds new topics to be consumed. This function is a no-op if // the client is configured to consume via regex. +// +// Note that if you are directly consuming and specified ConsumePartitions, +// this function will not add the rest of the partitions for a topic unless the +// topic has been previously purged. That is, if you directly consumed only one +// of five partitions originally, this will not add the other four until the +// entire topic is purged. func (cl *Client) AddConsumeTopics(topics ...string) { c := &cl.consumer if len(topics) == 0 || c.g == nil && c.d == nil || cl.cfg.regex { @@ -728,6 +735,9 @@ func (cl *Client) AddConsumeTopics(topics ...string) { c.g.tps.storeTopics(topics) } else { c.d.tps.storeTopics(topics) + for _, topic := range topics { + c.d.m.addt(topic) + } } cl.triggerUpdateMetadataNow("from AddConsumeTopics") } diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index 830d5b85..41869596 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -2,9 +2,10 @@ package kgo type directConsumer struct { cfg *cfg - tps *topicsPartitions // data for topics that the user assigned - reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not - using map[string]map[int32]struct{} // topics we are currently using (this only grows) + tps *topicsPartitions // data for topics that the user assigned + using mtmps // topics we are currently using + m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add + reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not } func (c *consumer) initDirect() { @@ -12,7 +13,8 @@ func (c *consumer) initDirect() { cfg: &c.cl.cfg, tps: newTopicsPartitions(), reSeen: make(map[string]bool), - using: make(map[string]map[int32]struct{}), + using: make(mtmps), + m: make(mtmps), } c.d = d @@ -21,11 +23,15 @@ func (c *consumer) initDirect() { } var topics []string - for topic := range d.cfg.topics { + for topic, partitions := range d.cfg.partitions { topics = append(topics, topic) + for partition := range partitions { + d.m.add(topic, partition) + } } - for topic := range d.cfg.partitions { + for topic := range d.cfg.topics { topics = append(topics, topic) + d.m.addt(topic) } d.tps.storeTopics(topics) // prime topics to load if non-regex (this is of no benefit if regex) } @@ -59,7 +65,7 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { toUse := make(map[string]map[int32]Offset, 10) for topic, topicPartitions := range topics { - useTopic := true + var useTopic bool if d.cfg.regex { want, seen := d.reSeen[topic] if !seen { @@ -75,6 +81,8 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { d.reSeen[topic] = want } useTopic = want + } else { + useTopic = d.m.onlyt(topic) } // If the above detected that we want to keep this topic, we @@ -95,14 +103,17 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { } // Lastly, if this topic has some specific partitions pinned, - // we set those. - for partition, offset := range d.cfg.partitions[topic] { - toUseTopic, exists := toUse[topic] - if !exists { - toUseTopic = make(map[int32]Offset, 10) - toUse[topic] = toUseTopic + // we set those. We only use partitions from topics that have + // not been purged. + for topic := range d.m { + for partition, offset := range d.cfg.partitions[topic] { + toUseTopic, exists := toUse[topic] + if !exists { + toUseTopic = make(map[int32]Offset, 10) + toUse[topic] = toUseTopic + } + toUseTopic[partition] = offset } - toUseTopic[partition] = offset } } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 02be233e..4011bc2e 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2596,10 +2596,10 @@ func (g *groupConsumer) commitAcrossRebalance( return } - // We retry up to twice (three tries total): cooperative rebalancing + // We retry four times, for five tries total: cooperative rebalancing // uses two back to back rebalances, and the commit could - // pathologically end during both. A third failure is unexpected. - if g.cooperative && tries < 3 { + // pathologically end during both. + if g.cooperative && tries < 5 { origDone := onDone onDone = func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { retry := err == nil @@ -2653,6 +2653,7 @@ func (g *groupConsumer) commitAcrossRebalance( if retry { go func() { g.cl.cfg.logger.Log(LogLevelInfo, "CommitOffsets spanned a rebalance, we are cooperative and did not lose any partition we were trying to commit, recommitting", "err", retryErr) + time.Sleep(10 * time.Millisecond) g.mu.Lock() defer g.mu.Unlock() g.commitAcrossRebalance(ctx, uncommitted, origDone, tries+1) diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 4df09d7a..a310f26d 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -145,8 +145,15 @@ func (cl *Client) triggerUpdateMetadataNow(why string) { } func (cl *Client) blockingMetadataFn(fn func()) { + var wg sync.WaitGroup + wg.Add(1) + waitfn := func() { + defer wg.Done() + fn() + } select { - case cl.blockingMetadataFnCh <- fn: + case cl.blockingMetadataFnCh <- waitfn: + wg.Wait() case <-cl.ctx.Done(): } } diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index cc71853b..096bdbde 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -10,6 +10,10 @@ import ( "github.com/twmb/franz-go/pkg/kerr" ) +///////////// +// HELPERS // -- ugly types to eliminate the toil of nil maps and lookups +///////////// + func dupmsi32(m map[string]int32) map[string]int32 { d := make(map[string]int32, len(m)) for t, ps := range m { @@ -71,6 +75,43 @@ func (m mtps) String() string { return sb.String() } +type mtmps map[string]map[int32]struct{} // map of topics to map of partitions + +func (m *mtmps) add(t string, p int32) { + if *m == nil { + *m = make(mtmps) + } + mps := (*m)[t] + if mps == nil { + mps = make(map[int32]struct{}) + (*m)[t] = mps + } + mps[p] = struct{}{} +} + +func (m *mtmps) addt(t string) { + if *m == nil { + *m = make(mtmps) + } + mps := (*m)[t] + if mps == nil { + mps = make(map[int32]struct{}) + (*m)[t] = mps + } +} + +func (m mtmps) onlyt(t string) bool { + if m == nil { + return false + } + ps, exists := m[t] + return exists && len(ps) == 0 +} + +//////////// +// PAUSED // -- types for pausing topics and partitions +//////////// + type pausedTopics map[string]pausedPartitions type pausedPartitions struct { @@ -174,6 +215,10 @@ func (m pausedTopics) clone() pausedTopics { return dup } +////////// +// GUTS // -- the key types for storing important metadata for topics & partitions +////////// + func newTopicPartitions() *topicPartitions { parts := new(topicPartitions) parts.v.Store(new(topicPartitionsData))