From 4fdc7e06b3ae5bb931d6e82608fca0732ac4bb6f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 9 Jul 2021 16:21:11 -0600 Subject: [PATCH] client: make client.Request(FindCoordinatorRequest) future compatible This allows users of kmsg.FindCoordinatorRequest to use the old v0 thru v7 version on newer brokers. We will translate the request to a v8 request, and translate the response back to a v7 response. --- pkg/kgo/client.go | 66 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index ac0b068d..ed371ffb 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -456,6 +456,7 @@ func (cl *Client) Close() { // The following requests are split: // // ListOffsets +// OffsetFetch (if using v8+ for Kafka 3.0+) // DescribeGroups // ListGroups // DeleteRecords @@ -470,6 +471,12 @@ func (cl *Client) Close() { // DescribeTransactions // ListTransactions // +// Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests. +// This function is forward-compatible for the old, singular OffsetFetch and +// FindCoordinator requests, but is not backward-compatible for batched +// requests. It is recommended to only use the old format unless you know you +// are speaking to Kafka 3.0+. +// // In short, this method tries to do the correct thing depending on what type // of request is being issued. // @@ -643,7 +650,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo // First, handle any sharded request. This comes before the conditional // below because this handles two group requests, which we do not want // to fall into the handleCoordinatorReq logic. - switch req.(type) { + switch t := req.(type) { case *kmsg.ListOffsetsRequest, // key 2 *kmsg.OffsetFetchRequest, // key 9 *kmsg.DescribeGroupsRequest, // key 15 @@ -660,6 +667,12 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo *kmsg.DescribeTransactionsRequest, // key 65 *kmsg.ListTransactionsRequest: // key 66 return cl.handleShardedReq(ctx, req) + + // We support being forward-compatible with FindCoordinator, so we need + // to use our special hijack function that batches a singular key. + case *kmsg.FindCoordinatorRequest: + last, resp, err := cl.findCoordinator(ctx, t) + return shards(shard(last, req, resp, err)), nil } if metaReq, isMetaReq := req.(*kmsg.MetadataRequest); isMetaReq { @@ -787,6 +800,34 @@ type coordinatorLoad struct { err error } +// findCoordinator is allows FindCoordinator request to be forward compatible, +// by duplicating a top level request into a single-element batch request, and +// downconverting the response. +func (cl *Client) findCoordinator(ctx context.Context, req *kmsg.FindCoordinatorRequest) (*broker, *kmsg.FindCoordinatorResponse, error) { + var compat bool + if len(req.CoordinatorKeys) == 0 { + req.CoordinatorKeys = []string{req.CoordinatorKey} + compat = true + } + r := cl.retriable() + resp, err := req.RequestWith(ctx, r) + if resp != nil { + if compat && resp.Version >= 4 { + if l := len(resp.Coordinators); l != 1 { + return r.last, resp, fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l) + } + + first := resp.Coordinators[0] + resp.ErrorCode = first.ErrorCode + resp.ErrorMessage = first.ErrorMessage + resp.NodeID = first.NodeID + resp.Host = first.Host + resp.Port = first.Port + } + } + return r.last, resp, err +} + // loadController returns the group/txn coordinator for the given key, retrying // as necessary. Any non-retriable error does not cache the coordinator. func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*broker, error) { @@ -822,33 +863,18 @@ func (cl *Client) loadCoordinator(ctx context.Context, key coordinatorKey) (*bro } var resp *kmsg.FindCoordinatorResponse - resp, c.err = (&kmsg.FindCoordinatorRequest{ + _, resp, c.err = cl.findCoordinator(ctx, &kmsg.FindCoordinatorRequest{ CoordinatorKey: key.name, CoordinatorType: key.typ, - CoordinatorKeys: []string{key.name}, - }).RequestWith(ctx, cl.retriable()) - + }) if c.err != nil { return nil, c.err } - if c.err = kerr.ErrorForCode(resp.ErrorCode); c.err != nil { return nil, c.err } c.node = resp.NodeID - if resp.Version >= 4 { - if l := len(resp.Coordinators); l != 1 { - c.err = fmt.Errorf("unexpectedly received %d coordinators when requesting 1", l) - return nil, c.err - } - first := resp.Coordinators[0] - if c.err = kerr.ErrorForCode(first.ErrorCode); c.err != nil { - return nil, c.err - } - c.node = first.NodeID - } - var b *broker b, c.err = cl.brokerOrErr(ctx, c.node, &errUnknownCoordinator{c.node, key}) return b, c.err @@ -1084,7 +1110,9 @@ func (cl *Client) handleReqWithCoordinator( code = t.ErrorCode } - // List, OffsetFetch, Describe, and Delete handled in sharding. + + // ListGroups, OffsetFetch, DeleteGroups, DescribeGroups, and + // DescribeTransactions handled in sharding. if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(name, typ, err) { return err