diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index f876f9ed..c7cc0aa1 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -555,7 +555,6 @@ start: } cxn.versions[key.ApiKey] = key.MaxVersion } - cxn.cl.cfg.logger.Log(LogLevelDebug, "initialized api versions", "versions", cxn.versions) return nil } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 00aeb2da..d83e42b8 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1,6 +1,7 @@ package kgo import ( + "bytes" "context" "fmt" "regexp" @@ -1429,12 +1430,19 @@ type uncommitted map[string]map[int32]uncommit // updateUncommitted sets the latest uncommitted offset. func (g *groupConsumer) updateUncommitted(fetches Fetches) { + var b bytes.Buffer + g.mu.Lock() defer g.mu.Unlock() for _, fetch := range fetches { var topicOffsets map[int32]uncommit for _, topic := range fetch.Topics { + + if g.cl.cfg.logger.Level() >= LogLevelDebug { + fmt.Fprintf(&b, "%s[", topic.Topic) + } + for _, partition := range topic.Partitions { if len(partition.Records) == 0 { continue @@ -1451,18 +1459,37 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) { g.uncommitted[topic.Topic] = topicOffsets } } + uncommit := topicOffsets[partition.Partition] + // Our new head points just past the final consumed offset, // that is, if we rejoin, this is the offset to begin at. newOffset := final.Offset + 1 + if g.cl.cfg.logger.Level() >= LogLevelDebug { + fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, uncommit.head.Offset, newOffset) + } uncommit.head = EpochOffset{ final.LeaderEpoch, // -1 if old message / unknown newOffset, } topicOffsets[partition.Partition] = uncommit } + + if g.cl.cfg.logger.Level() >= LogLevelDebug { + if bytes.HasSuffix(b.Bytes(), []byte(", ")) { + b.Truncate(b.Len() - 2) + } + b.WriteString("], ") + } } } + + if g.cl.cfg.logger.Level() >= LogLevelDebug { + update := b.String() + update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic + g.cl.cfg.logger.Log(LogLevelDebug, "updated uncommitted", "to", update) + } + } // updateCommitted updates the group's uncommitted map. This function triply @@ -1491,6 +1518,8 @@ func (g *groupConsumer) updateCommitted( return resp.Topics[i].Topic < resp.Topics[j].Topic }) + var b bytes.Buffer + for i := range resp.Topics { reqTopic := &req.Topics[i] respTopic := &resp.Topics[i] @@ -1509,6 +1538,9 @@ func (g *groupConsumer) updateCommitted( return respTopic.Partitions[i].Partition < respTopic.Partitions[j].Partition }) + if g.cl.cfg.logger.Level() >= LogLevelDebug { + fmt.Fprintf(&b, "%s[", respTopic.Topic) + } for i := range respTopic.Partitions { reqPart := &reqTopic.Partitions[i] respPart := &respTopic.Partitions[i] @@ -1525,13 +1557,32 @@ func (g *groupConsumer) updateCommitted( continue } + if g.cl.cfg.logger.Level() >= LogLevelDebug { + fmt.Fprintf(&b, "%d{%d=>%d}, ", reqPart.Partition, uncommit.committed.Offset, reqPart.Offset) + } + uncommit.committed = EpochOffset{ reqPart.LeaderEpoch, reqPart.Offset, } topic[respPart.Partition] = uncommit } + + if g.cl.cfg.logger.Level() >= LogLevelDebug { + if bytes.HasSuffix(b.Bytes(), []byte(", ")) { + b.Truncate(b.Len() - 2) + } + b.WriteString("], ") + } + + } + + if g.cl.cfg.logger.Level() >= LogLevelDebug { + update := b.String() + update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic + g.cl.cfg.logger.Log(LogLevelDebug, "updated committed", "to", update) } + } func (g *groupConsumer) loopCommit() {