Skip to content

Commit

Permalink
kmsg: break API for AppendRequest in partial support of KIP-590
Browse files Browse the repository at this point in the history
KIP-590 adds two tags to the base request header. The client ID was
already added historically.

This breaks the API in favor of something that can be extended in a
non-breaking way in the future. All request headers have a base of the
bytes being appended to, the request being appended, and the correlation
ID of the request. Everything after is "new" from version 0.

KIP-590 adds two tags that should not be used by non-brokers, but kmsg
aims to support people implementing their own brokers.
  • Loading branch information
twmb committed Aug 24, 2020
1 parent 1e71931 commit ff1dc46
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 16 deletions.
13 changes: 6 additions & 7 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn,
addr: b.addr,
conn: conn,
timeouts: b.cl.connTimeoutFn,
clientID: b.cl.cfg.id,
reqFormatter: b.cl.reqFormatter,
softwareName: b.cl.cfg.softwareName,
softwareVersion: b.cl.cfg.softwareVersion,
saslCtx: b.cl.ctx,
Expand Down Expand Up @@ -387,10 +387,10 @@ type brokerCxn struct {
mechanism sasl.Mechanism
expiry time.Time

// bufPool, corrID, and clientID are used in writing requests.
bufPool bufPool
corrID int32
clientID *string
// bufPool, corrID, and reqFormatter are used in writing requests.
bufPool bufPool
corrID int32
reqFormatter *kmsg.RequestFormatter

softwareName string // for KIP-511
softwareVersion string // for KIP-511
Expand Down Expand Up @@ -635,11 +635,10 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
func (cxn *brokerCxn) writeRequest(req kmsg.Request) (int32, error) {
buf := cxn.bufPool.get()
defer cxn.bufPool.put(buf)
buf = kmsg.AppendRequest(
buf = cxn.reqFormatter.AppendRequest(
buf[:0],
req,
cxn.corrID,
cxn.clientID,
)
_, wt := cxn.timeouts(req)
if wt > 0 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Client struct {
anyBrokerIdx int
stopBrokers bool // set to true on close to stop updateBrokers

reqFormatter *kmsg.RequestFormatter
connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration)

bufPool bufPool // for to brokers to share underlying reusable request buffers
Expand Down Expand Up @@ -127,6 +128,7 @@ func NewClient(opts ...Opt) (*Client, error) {
controllerID: unknownControllerID,
brokers: make(map[int32]*broker),

reqFormatter: new(kmsg.RequestFormatter),
connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead),

bufPool: newBufPool(),
Expand All @@ -146,6 +148,10 @@ func NewClient(opts ...Opt) (*Client, error) {
cl.topics.Store(make(map[string]*topicPartitions))
cl.metawait.init()

if cfg.id != nil {
cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id))
}

compressor, err := newCompressor(cl.cfg.compression...)
if err != nil {
return nil, err
Expand Down
67 changes: 58 additions & 9 deletions pkg/kmsg/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,56 @@ type Response interface {
RequestKind() Request
}

// RequestFormatter formats requests.
//
// The default empty struct works correctly, but can be extended with the
// NewRequestFormatter function.
type RequestFormatter struct {
clientID *string

initPrincipalName *string
initClientID *string
}

// RequestFormatterOpt applys options to a RequestFormatter.
type RequestFormatterOpt interface {
apply(*RequestFormatter)
}

type formatterOpt struct{ fn func(*RequestFormatter) }

func (opt formatterOpt) apply(f *RequestFormatter) { opt.fn(f) }

// FormatterClientID attaches the given client ID to any issued request,
// minus controlled shutdown v0, which uses its own special format.
func FormatterClientID(id string) RequestFormatterOpt {
return formatterOpt{func(f *RequestFormatter) { f.clientID = &id }}
}

// FormatterInitialID sets the initial ID of the request.
//
// This function should be used by brokers only and is set when the broker
// redirects a request. See KIP-590 for more detail.
func FormatterInitialID(principalName, clientID string) RequestFormatterOpt {
return formatterOpt{func(f *RequestFormatter) { f.initPrincipalName, f.initClientID = &principalName, &clientID }}
}

// NewRequestFormatter returns a RequestFormatter with the opts applied.
func NewRequestFormatter(opts ...RequestFormatterOpt) *RequestFormatter {
a := new(RequestFormatter)
for _, opt := range opts {
opt.apply(a)
}
return a
}

// AppendRequest appends a full message request to dst, returning the updated
// slice. This message is the full body that needs to be written to issue a
// Kafka request.
//
// clientID is optional; nil means to not send, whereas empty means the client
// id is the empty string. If the request is controlled shutdown v0, this does
// not include the client ID, as controlled shutdown used its own special
// no-client-id encoding at that version.
func AppendRequest(
func (f *RequestFormatter) AppendRequest(
dst []byte,
r Request,
correlationID int32,
clientID *string,
) []byte {
dst = append(dst, 0, 0, 0, 0) // reserve length
k := r.Key()
Expand All @@ -137,12 +174,24 @@ func AppendRequest(
// Clients issue ApiVersions immediately before knowing the broker
// version, and old brokers will not be able to understand a compact
// client id.
dst = kbin.AppendNullableString(dst, clientID)
dst = kbin.AppendNullableString(dst, f.clientID)

// The flexible tags end the request header, and then begins the
// request body.
if r.IsFlexible() {
dst = append(dst, 0) // tagged section; TODO for when tags are added here
var numTags uint8
if f.initPrincipalName != nil {
numTags += 2
}
dst = append(dst, numTags)
if numTags != 0 {
if f.initPrincipalName != nil {
dst = kbin.AppendUvarint(dst, 0)
dst = kbin.AppendCompactString(dst, *f.initPrincipalName)
dst = kbin.AppendUvarint(dst, 1)
dst = kbin.AppendCompactString(dst, *f.initClientID)
}
}
}

// Now the request body.
Expand Down

0 comments on commit ff1dc46

Please sign in to comment.