Skip to content

Commit

Permalink
client: fix fetchBrokerMetadata; add debug logs to sharding
Browse files Browse the repository at this point in the history
fetchBrokerMetadata never cleared fetchingBrokers, meaning we would take
the first status and always use it.
  • Loading branch information
twmb committed Apr 16, 2021
1 parent 6b64728 commit fea3195
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
cl.fetchingBrokers = wait
cl.fetchingBrokersMu.Unlock()

defer close(wait.done)
defer func() {
cl.fetchingBrokersMu.Lock()
defer cl.fetchingBrokersMu.Unlock()
cl.fetchingBrokers = nil
close(wait.done)
}()

_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest())
return wait.err
Expand Down Expand Up @@ -1272,12 +1277,16 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
issue func(reqTry)
)

l := cl.cfg.logger
debug := l.Level() >= LogLevelDebug

// issue is called to progressively split and issue requests.
//
// This recursively calls itself if a request fails and can be retried.
issue = func(try reqTry) {
issues, reshardable, err := sharder.shard(ctx, try.req)
if err != nil {
l.Log(LogLevelDebug, "unable to shard request", "previous_tries", try.tries, "err", err)
addShard(shard(nil, try.req, nil, err)) // failure to shard means data loading failed; this request is failed
return
}
Expand All @@ -1293,6 +1302,18 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
reshardable = true
}

if debug {
var brokerAnys []string
for _, issue := range issues {
if issue.any {
brokerAnys = append(brokerAnys, "any")
} else {
brokerAnys = append(brokerAnys, fmt.Sprintf("%d", issue.broker))
}
}
l.Log(LogLevelDebug, "sharded request", "destinations", brokerAnys)
}

for i := range issues {
tries := try.tries
myIssue := issues[i]
Expand Down Expand Up @@ -1331,8 +1352,10 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
// requests where the original request is split to
// dedicated brokers; we do not want to re-shard that.
if !reshardable {
l.Log(LogLevelDebug, "sharded request failed, reissuing without resharding", "time_since_start", time.Since(start), "tries", try.tries, "err", err)
goto start
}
l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "time_since_start", time.Since(start), "tries", try.tries, "err", err)
issue(reqTry{tries, myIssue.req})
return
}
Expand Down

0 comments on commit fea3195

Please sign in to comment.