Skip to content

Commit

Permalink
update debug logging (broker and consumer group)
Browse files Browse the repository at this point in the history
- removes mostly useless debug log about api versions on connect in the
broker (more debug logging is worth addressing here)
- adds verbose committed / uncommitted debug logs, showing the state of
how things change
  • Loading branch information
twmb committed Mar 29, 2021
1 parent 2ef926d commit 04f8e12
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
1 change: 0 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@ start:
}
cxn.versions[key.ApiKey] = key.MaxVersion
}
cxn.cl.cfg.logger.Log(LogLevelDebug, "initialized api versions", "versions", cxn.versions)
return nil
}

Expand Down
51 changes: 51 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kgo

import (
"bytes"
"context"
"fmt"
"regexp"
Expand Down Expand Up @@ -1429,12 +1430,19 @@ type uncommitted map[string]map[int32]uncommit

// updateUncommitted sets the latest uncommitted offset.
func (g *groupConsumer) updateUncommitted(fetches Fetches) {
var b bytes.Buffer

g.mu.Lock()
defer g.mu.Unlock()

for _, fetch := range fetches {
var topicOffsets map[int32]uncommit
for _, topic := range fetch.Topics {

if g.cl.cfg.logger.Level() >= LogLevelDebug {
fmt.Fprintf(&b, "%s[", topic.Topic)
}

for _, partition := range topic.Partitions {
if len(partition.Records) == 0 {
continue
Expand All @@ -1451,18 +1459,37 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
g.uncommitted[topic.Topic] = topicOffsets
}
}

uncommit := topicOffsets[partition.Partition]

// Our new head points just past the final consumed offset,
// that is, if we rejoin, this is the offset to begin at.
newOffset := final.Offset + 1
if g.cl.cfg.logger.Level() >= LogLevelDebug {
fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, uncommit.head.Offset, newOffset)
}
uncommit.head = EpochOffset{
final.LeaderEpoch, // -1 if old message / unknown
newOffset,
}
topicOffsets[partition.Partition] = uncommit
}

if g.cl.cfg.logger.Level() >= LogLevelDebug {
if bytes.HasSuffix(b.Bytes(), []byte(", ")) {
b.Truncate(b.Len() - 2)
}
b.WriteString("], ")
}
}
}

if g.cl.cfg.logger.Level() >= LogLevelDebug {
update := b.String()
update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic
g.cl.cfg.logger.Log(LogLevelDebug, "updated uncommitted", "to", update)
}

}

// updateCommitted updates the group's uncommitted map. This function triply
Expand Down Expand Up @@ -1491,6 +1518,8 @@ func (g *groupConsumer) updateCommitted(
return resp.Topics[i].Topic < resp.Topics[j].Topic
})

var b bytes.Buffer

for i := range resp.Topics {
reqTopic := &req.Topics[i]
respTopic := &resp.Topics[i]
Expand All @@ -1509,6 +1538,9 @@ func (g *groupConsumer) updateCommitted(
return respTopic.Partitions[i].Partition < respTopic.Partitions[j].Partition
})

if g.cl.cfg.logger.Level() >= LogLevelDebug {
fmt.Fprintf(&b, "%s[", respTopic.Topic)
}
for i := range respTopic.Partitions {
reqPart := &reqTopic.Partitions[i]
respPart := &respTopic.Partitions[i]
Expand All @@ -1525,13 +1557,32 @@ func (g *groupConsumer) updateCommitted(
continue
}

if g.cl.cfg.logger.Level() >= LogLevelDebug {
fmt.Fprintf(&b, "%d{%d=>%d}, ", reqPart.Partition, uncommit.committed.Offset, reqPart.Offset)
}

uncommit.committed = EpochOffset{
reqPart.LeaderEpoch,
reqPart.Offset,
}
topic[respPart.Partition] = uncommit
}

if g.cl.cfg.logger.Level() >= LogLevelDebug {
if bytes.HasSuffix(b.Bytes(), []byte(", ")) {
b.Truncate(b.Len() - 2)
}
b.WriteString("], ")
}

}

if g.cl.cfg.logger.Level() >= LogLevelDebug {
update := b.String()
update = strings.TrimSuffix(update, ", ") // trim trailing comma and space after final topic
g.cl.cfg.logger.Log(LogLevelDebug, "updated committed", "to", update)
}

}

func (g *groupConsumer) loopCommit() {
Expand Down

0 comments on commit 04f8e12

Please sign in to comment.