Skip to content

Commit

Permalink
kgo: add a bit more context to sharded logs, avoid info log on Close
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Apr 13, 2023
1 parent 0d137e7 commit de53fda
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ func (cl *Client) Close() {
if c.g != nil {
cl.LeaveGroup()
} else if c.d != nil {
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in Close")
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "") // we do not use a log message when not in a group
c.mu.Unlock()
}

Expand Down Expand Up @@ -2080,7 +2080,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
issue = func(try reqTry) {
issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr)
if err != nil {
l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err)
l.Log(LogLevelDebug, "unable to shard request", "req", kmsg.Key(try.req.Key()).Name(), "previous_tries", try.tries, "err", err)
addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed
return
}
Expand All @@ -2097,8 +2097,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
}

if debug {
var key int16
var brokerAnys []string
for _, issue := range issues {
key = issue.req.Key()
if issue.err != nil {
brokerAnys = append(brokerAnys, "err")
} else if issue.any {
Expand All @@ -2107,7 +2109,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
brokerAnys = append(brokerAnys, fmt.Sprintf("%d", issue.broker))
}
}
l.Log(LogLevelDebug, "sharded request", "destinations", brokerAnys)
l.Log(LogLevelDebug, "sharded request", "req", kmsg.Key(key).Name(), "destinations", brokerAnys)
}

for i := range issues {
Expand Down Expand Up @@ -2158,10 +2160,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
// requests where the original request is split to
// dedicated brokers; we do not want to re-shard that.
if !reshardable {
l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "time_since_start", time.Since(start), "tries", try.tries, "err", err)
l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", try.tries, "err", err)
goto start
}
l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err)
l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", try.tries, "err", err)
issue(reqTry{tries, myUnderlyingReq, err})
return
}
Expand Down

0 comments on commit de53fda

Please sign in to comment.