diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 4aa928b2..d5d09752 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -323,7 +323,12 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error { cl.fetchingBrokers = wait cl.fetchingBrokersMu.Unlock() - defer close(wait.done) + defer func() { + cl.fetchingBrokersMu.Lock() + defer cl.fetchingBrokersMu.Unlock() + cl.fetchingBrokers = nil + close(wait.done) + }() _, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest()) return wait.err @@ -1272,12 +1277,16 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res issue func(reqTry) ) + l := cl.cfg.logger + debug := l.Level() >= LogLevelDebug + // issue is called to progressively split and issue requests. // // This recursively calls itself if a request fails and can be retried. issue = func(try reqTry) { issues, reshardable, err := sharder.shard(ctx, try.req) if err != nil { + l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err) addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed return } @@ -1293,6 +1302,18 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res reshardable = true } + if debug { + var brokerAnys []string + for _, issue := range issues { + if issue.any { + brokerAnys = append(brokerAnys, "any") + } else { + brokerAnys = append(brokerAnys, fmt.Sprintf("%d", issue.broker)) + } + } + l.Log(LogLevelDebug, "sharded request", "destinations", brokerAnys) + } + for i := range issues { tries := try.tries myIssue := issues[i] @@ -1331,8 +1352,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // requests where the original request is split to // dedicated brokers; we do not want to re-shard that. if !reshardable { + l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "time_since_start", time.Since(start), "tries", try.tries, "err", err) goto start } + l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err) issue(reqTry{tries, myIssue.req}) return }