From de53fdabcdd09d72e28574a6aef1adf07b91a0c2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 13 Apr 2023 11:42:29 -0600 Subject: [PATCH] kgo: add a bit more context to sharded logs, avoid info log on Close --- pkg/kgo/client.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 99c33fba..05f395f7 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -936,8 +936,8 @@ func (cl *Client) Close() { if c.g != nil { cl.LeaveGroup() } else if c.d != nil { - c.mu.Lock() // lock for assign - c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in Close") + c.mu.Lock() // lock for assign + c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "") // we do not use a log message when not in a group c.mu.Unlock() } @@ -2080,7 +2080,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res issue = func(try reqTry) { issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr) if err != nil { - l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err) + l.Log(LogLevelDebug, "unable to shard request", "req", kmsg.Key(try.req.Key()).Name(), "previous_tries", try.tries, "err", err) addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed return } @@ -2097,8 +2097,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res } if debug { + var key int16 var brokerAnys []string for _, issue := range issues { + key = issue.req.Key() if issue.err != nil { brokerAnys = append(brokerAnys, "err") } else if issue.any { @@ -2107,7 +2109,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res brokerAnys = append(brokerAnys, fmt.Sprintf("%d", issue.broker)) } } - l.Log(LogLevelDebug, "sharded request", "destinations", brokerAnys) + l.Log(LogLevelDebug, "sharded request", "req", kmsg.Key(key).Name(), "destinations", brokerAnys) } for i := range issues { @@ -2158,10 +2160,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // requests where the original request is split to // dedicated brokers; we do not want to re-shard that. if !reshardable { - l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "time_since_start", time.Since(start), "tries", try.tries, "err", err) + l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", try.tries, "err", err) goto start } - l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err) + l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", try.tries, "err", err) issue(reqTry{tries, myUnderlyingReq, err}) return }