From 71c6109adc01df8868150e3a112b367f8193f2ac Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 8 Apr 2021 10:22:16 -0600 Subject: [PATCH] consumer group bugfix: map needs to be one block lower topicOffsets is meant to track per-topic offsets when updating uncommitted. Thus, we want to have the maps existence be per-topic. The prior location had the map per fetch, and would reuse that map across all topics within a fetch. --- pkg/kgo/consumer_group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index ba2e5f73..6c2a5475 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1433,13 +1433,13 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { defer g.mu.Unlock() for _, fetch := range fetches { - var topicOffsets map[int32]uncommit for _, topic := range fetch.Topics { if g.cl.cfg.logger.Level() >= LogLevelDebug { fmt.Fprintf(&b, "%s[", topic.Topic) } + var topicOffsets map[int32]uncommit for _, partition := range topic.Partitions { if len(partition.Records) == 0 { continue