From ff1dc467e32b6be41656c1f3bc57cb4d45e32a0c Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 23 Aug 2020 20:04:23 -0600 Subject: [PATCH] kmsg: break API for AppendRequest in partial support of KIP-590 KIP-590 adds two tags to the base request header. The client ID was already added historically. This breaks the API in favor of something that can be extended in a non-breaking way in the future. All request headers have a base of the bytes being appended to, the request being appended, and the correlation ID of the request. Everything after is "new" from version 0. KIP-590 adds two tags that should not be used by non-brokers, but kmsg aims to support people implementing their own brokers. --- pkg/kgo/broker.go | 13 ++++----- pkg/kgo/client.go | 6 ++++ pkg/kmsg/interface.go | 67 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 4becf965..da530c27 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -345,7 +345,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, addr: b.addr, conn: conn, timeouts: b.cl.connTimeoutFn, - clientID: b.cl.cfg.id, + reqFormatter: b.cl.reqFormatter, softwareName: b.cl.cfg.softwareName, softwareVersion: b.cl.cfg.softwareVersion, saslCtx: b.cl.ctx, @@ -387,10 +387,10 @@ type brokerCxn struct { mechanism sasl.Mechanism expiry time.Time - // bufPool, corrID, and clientID are used in writing requests. - bufPool bufPool - corrID int32 - clientID *string + // bufPool, corrID, and reqFormatter are used in writing requests. + bufPool bufPool + corrID int32 + reqFormatter *kmsg.RequestFormatter softwareName string // for KIP-511 softwareVersion string // for KIP-511 @@ -635,11 +635,10 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { func (cxn *brokerCxn) writeRequest(req kmsg.Request) (int32, error) { buf := cxn.bufPool.get() defer cxn.bufPool.put(buf) - buf = kmsg.AppendRequest( + buf = cxn.reqFormatter.AppendRequest( buf[:0], req, cxn.corrID, - cxn.clientID, ) _, wt := cxn.timeouts(req) if wt > 0 { diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 19f2d00d..4033357c 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -47,6 +47,7 @@ type Client struct { anyBrokerIdx int stopBrokers bool // set to true on close to stop updateBrokers + reqFormatter *kmsg.RequestFormatter connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration) bufPool bufPool // for to brokers to share underlying reusable request buffers @@ -127,6 +128,7 @@ func NewClient(opts ...Opt) (*Client, error) { controllerID: unknownControllerID, brokers: make(map[int32]*broker), + reqFormatter: new(kmsg.RequestFormatter), connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead), bufPool: newBufPool(), @@ -146,6 +148,10 @@ func NewClient(opts ...Opt) (*Client, error) { cl.topics.Store(make(map[string]*topicPartitions)) cl.metawait.init() + if cfg.id != nil { + cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id)) + } + compressor, err := newCompressor(cl.cfg.compression...) if err != nil { return nil, err diff --git a/pkg/kmsg/interface.go b/pkg/kmsg/interface.go index 08536d70..4412bfd5 100644 --- a/pkg/kmsg/interface.go +++ b/pkg/kmsg/interface.go @@ -109,19 +109,56 @@ type Response interface { RequestKind() Request } +// RequestFormatter formats requests. +// +// The default empty struct works correctly, but can be extended with the +// NewRequestFormatter function. +type RequestFormatter struct { + clientID *string + + initPrincipalName *string + initClientID *string +} + +// RequestFormatterOpt applys options to a RequestFormatter. +type RequestFormatterOpt interface { + apply(*RequestFormatter) +} + +type formatterOpt struct{ fn func(*RequestFormatter) } + +func (opt formatterOpt) apply(f *RequestFormatter) { opt.fn(f) } + +// FormatterClientID attaches the given client ID to any issued request, +// minus controlled shutdown v0, which uses its own special format. +func FormatterClientID(id string) RequestFormatterOpt { + return formatterOpt{func(f *RequestFormatter) { f.clientID = &id }} +} + +// FormatterInitialID sets the initial ID of the request. +// +// This function should be used by brokers only and is set when the broker +// redirects a request. See KIP-590 for more detail. +func FormatterInitialID(principalName, clientID string) RequestFormatterOpt { + return formatterOpt{func(f *RequestFormatter) { f.initPrincipalName, f.initClientID = &principalName, &clientID }} +} + +// NewRequestFormatter returns a RequestFormatter with the opts applied. +func NewRequestFormatter(opts ...RequestFormatterOpt) *RequestFormatter { + a := new(RequestFormatter) + for _, opt := range opts { + opt.apply(a) + } + return a +} + // AppendRequest appends a full message request to dst, returning the updated // slice. This message is the full body that needs to be written to issue a // Kafka request. -// -// clientID is optional; nil means to not send, whereas empty means the client -// id is the empty string. If the request is controlled shutdown v0, this does -// not include the client ID, as controlled shutdown used its own special -// no-client-id encoding at that version. -func AppendRequest( +func (f *RequestFormatter) AppendRequest( dst []byte, r Request, correlationID int32, - clientID *string, ) []byte { dst = append(dst, 0, 0, 0, 0) // reserve length k := r.Key() @@ -137,12 +174,24 @@ func AppendRequest( // Clients issue ApiVersions immediately before knowing the broker // version, and old brokers will not be able to understand a compact // client id. - dst = kbin.AppendNullableString(dst, clientID) + dst = kbin.AppendNullableString(dst, f.clientID) // The flexible tags end the request header, and then begins the // request body. if r.IsFlexible() { - dst = append(dst, 0) // tagged section; TODO for when tags are added here + var numTags uint8 + if f.initPrincipalName != nil { + numTags += 2 + } + dst = append(dst, numTags) + if numTags != 0 { + if f.initPrincipalName != nil { + dst = kbin.AppendUvarint(dst, 0) + dst = kbin.AppendCompactString(dst, *f.initPrincipalName) + dst = kbin.AppendUvarint(dst, 1) + dst = kbin.AppendCompactString(dst, *f.initClientID) + } + } } // Now the request body.