Skip to content

Commit

Permalink
client: make client.Request(FindCoordinatorRequest) future compatible
Browse files Browse the repository at this point in the history
This allows users of kmsg.FindCoordinatorRequest to use the old v0 thru
v7 version on newer brokers. We will translate the request to a v8
request, and translate the response back to a v7 response.
  • Loading branch information
twmb committed Jul 9, 2021
1 parent 3a3cc06 commit 4fdc7e0
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func (cl *Client) Close() {
// The following requests are split:
//
// ListOffsets
// OffsetFetch (if using v8+ for Kafka 3.0+)
// DescribeGroups
// ListGroups
// DeleteRecords
Expand All @@ -470,6 +471,12 @@ func (cl *Client) Close() {
// DescribeTransactions
// ListTransactions
//
// Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests.
// This function is forward-compatible for the old, singular OffsetFetch and
// FindCoordinator requests, but is not backward-compatible for batched
// requests. It is recommended to only use the old format unless you know you
// are speaking to Kafka 3.0+.
//
// In short, this method tries to do the correct thing depending on what type
// of request is being issued.
//
Expand Down Expand Up @@ -643,7 +650,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
// First, handle any sharded request. This comes before the conditional
// below because this handles two group requests, which we do not want
// to fall into the handleCoordinatorReq logic.
switch req.(type) {
switch t := req.(type) {
case *kmsg.ListOffsetsRequest, // key 2
*kmsg.OffsetFetchRequest, // key 9
*kmsg.DescribeGroupsRequest, // key 15
Expand All @@ -660,6 +667,12 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
*kmsg.DescribeTransactionsRequest, // key 65
*kmsg.ListTransactionsRequest: // key 66
return cl.handleShardedReq(ctx, req)

// We support being forward-compatible with FindCoordinator, so we need
// to use our special hijack function that batches a singular key.
case *kmsg.FindCoordinatorRequest:
last, resp, err := cl.findCoordinator(ctx, t)
return shards(shard(last, req, resp, err)), nil
}

if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq {
Expand Down Expand Up @@ -787,6 +800,34 @@ type coordinatorLoad struct {
err error
}

// findCoordinator is allows FindCoordinator request to be forward compatible,
// by duplicating a top level request into a single-element batch request, and
// downconverting the response.
func (cl *Client) findCoordinator(ctx context.Context, req *kmsg.FindCoordinatorRequest) (*broker, *kmsg.FindCoordinatorResponse, error) {
var compat bool
if len(req.CoordinatorKeys) == 0 {
req.CoordinatorKeys = []string{req.CoordinatorKey}
compat = true
}
r := cl.retriable()
resp, err := req.RequestWith(ctx, r)
if resp != nil {
if compat && resp.Version >= 4 {
if l := len(resp.Coordinators); l != 1 {
return r.last, resp, fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l)
}

first := resp.Coordinators[0]
resp.ErrorCode = first.ErrorCode
resp.ErrorMessage = first.ErrorMessage
resp.NodeID = first.NodeID
resp.Host = first.Host
resp.Port = first.Port
}
}
return r.last, resp, err
}

// loadController returns the group/txn coordinator for the given key, retrying
// as necessary. Any non-retriable error does not cache the coordinator.
func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) {
Expand Down Expand Up @@ -822,33 +863,18 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro
}

var resp *kmsg.FindCoordinatorResponse
resp, c.err = (&kmsg.FindCoordinatorRequest{
_, resp, c.err = cl.findCoordinator(ctx, &kmsg.FindCoordinatorRequest{
CoordinatorKey: key.name,
CoordinatorType: key.typ,
CoordinatorKeys: []string{key.name},
}).RequestWith(ctx, cl.retriable())

})
if c.err != nil {
return nil, c.err
}

if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil {
return nil, c.err
}
c.node = resp.NodeID

if resp.Version >= 4 {
if l := len(resp.Coordinators); l != 1 {
c.err = fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l)
return nil, c.err
}
first := resp.Coordinators[0]
if c.err = kerr.ErrorForCode(first.ErrorCode); c.err != nil {
return nil, c.err
}
c.node = first.NodeID
}

var b *broker
b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key})
return b, c.err
Expand Down Expand Up @@ -1084,7 +1110,9 @@ func (cl *Client) handleReqWithCoordinator(
code = t.ErrorCode

}
// List, OffsetFetch, Describe, and Delete handled in sharding.

// ListGroups, OffsetFetch, DeleteGroups, DescribeGroups, and
// DescribeTransactions handled in sharding.

if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(name, typ, err) {
return err
Expand Down

0 comments on commit 4fdc7e0

Please sign in to comment.