Skip to content

Commit

Permalink
consuming: improve error messages
Browse files Browse the repository at this point in the history
For the basic logger, this gets rid of `map` in the output. We also
remove one log around fetching, because the same thing is logged
both previously (added) and later (when fetched).
  • Loading branch information
twmb committed Nov 12, 2021
1 parent 101d6bd commit db90100
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
38 changes: 34 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kgo
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -29,11 +30,11 @@ func (o Offset) MarshalJSON() ([]byte, error) {
// String returns the offset as a string; the purpose of this is for logs.
func (o Offset) String() string {
if o.relative == 0 {
return fmt.Sprintf("{%d.%d~%d}", o.at, o.epoch, o.currentEpoch)
return fmt.Sprintf("{%d.%d %d}", o.at, o.epoch, o.currentEpoch)
} else if o.relative > 0 {
return fmt.Sprintf("{%d+%d.%d~%d}", o.at, o.relative, o.epoch, o.currentEpoch)
return fmt.Sprintf("{%d+%d.%d %d}", o.at, o.relative, o.epoch, o.currentEpoch)
} else {
return fmt.Sprintf("{%d-%d.%d~%d}", o.at, o.relative, o.epoch, o.currentEpoch)
return fmt.Sprintf("{%d-%d.%d %d}", o.at, o.relative, o.epoch, o.currentEpoch)
}
}

Expand Down Expand Up @@ -502,6 +503,35 @@ func (h assignHow) String() string {
return ""
}

type fmtAssignment map[string]map[int32]Offset

func (f fmtAssignment) String() string {
var sb strings.Builder

var topicsWritten int
for topic, partitions := range f {
topicsWritten++
sb.WriteString(topic)
sb.WriteString("[")

var partitionsWritten int
for partition, offset := range partitions {
fmt.Fprintf(&sb, "%d%s", partition, offset)
partitionsWritten++
if partitionsWritten < len(partitions) {
sb.WriteString(" ")
}
}

sb.WriteString("]")
if topicsWritten < len(f) {
sb.WriteString(", ")
}
}

return sb.String()
}

// assignPartitions, called under the consumer's mu, is used to set new
// cursors or add to the existing cursors.
func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how assignHow, tps *topicsPartitions, why string) {
Expand All @@ -512,7 +542,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
c.cl.cfg.logger.Log(LogLevelInfo, "assigning partitions",
"why", why,
"how", how,
"input", assignments,
"input", fmtAssignment(assignments),
)
}
var session *consumerSession
Expand Down
10 changes: 2 additions & 8 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {

s := newAssignRevokeSession()
added, lost := g.diffAssigned()
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", added, "lost", lost)
g.cfg.logger.Log(LogLevelInfo, "new group session begun", "group", g.cfg.group, "added", tpsFmt(added), "lost", tpsFmt(lost))
s.prerevoke(g, lost) // for cooperative consumers

// Since we have joined the group, we immediately begin heartbeating.
Expand All @@ -676,7 +676,6 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() error {
go func() {
defer close(fetchDone)
defer close(fetchErrCh)
g.cfg.logger.Log(LogLevelInfo, "fetching offsets for added partitions", "group", g.cfg.group, "added", added)
fetchErrCh <- g.fetchOffsets(ctx, added, lost)
}()
} else {
Expand Down Expand Up @@ -1040,12 +1039,7 @@ func (g *groupConsumer) handleSyncResp(protocol string, resp *kmsg.SyncGroupResp
return err
}

var sb strings.Builder
for topic, partitions := range assigned {
fmt.Fprintf(&sb, "%s%v", topic, partitions)
sb.WriteString(", ")
}
g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", strings.TrimSuffix(sb.String(), ", "))
g.cfg.logger.Log(LogLevelInfo, "synced", "group", g.cfg.group, "assigned", tpsFmt(assigned))

// Past this point, we will fall into the setupAssigned prerevoke code,
// meaning for cooperative, we will revoke what we need to.
Expand Down
17 changes: 17 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
package kgo

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/twmb/franz-go/pkg/kerr"
)

type tpsFmt map[string][]int32

func (f tpsFmt) String() string {
var sb strings.Builder
var topicsWritten int
for topic, partitions := range f {
topicsWritten++
fmt.Fprintf(&sb, "%s%v", topic, partitions)
if topicsWritten < len(f) {
sb.WriteString(", ")
}
}
return sb.String()
}

type pausedTopics map[string]pausedPartitions

type pausedPartitions struct {
Expand Down

0 comments on commit db90100

Please sign in to comment.