From cd5e7fe2871310fc0c766ad3c9d56bfa0495e0e3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 26 Feb 2021 17:16:02 -0700 Subject: [PATCH] client: rewrite *kmsg.ProduceRequest's Acks, TimeoutMillis Following up on the prior commit, for added safety, rather than just documenting that the acks must match the client's configuration, we will directly overwrite the fields we require to match. Again, it is not recommended to use Request to issue a raw *kmsg.ProduceRequest. --- pkg/kgo/broker.go | 37 +++++++++++++++++++++++++------------ pkg/kgo/client.go | 6 ++++-- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 2e7d4e65..0e6866c4 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -299,30 +299,43 @@ func (b *broker) handleReqs() { default: } - corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req) - - if err != nil { - pr.promise(nil, err) - cxn.die() - continue - } - // Produce requests (and only produce requests) can be written // without receiving a reply. If we see required acks is 0, // then we immediately call the promise with no response. // - // We provide a non-nil *kmsg.FetchResponse for - // *kmsg.FetchRequest just to ensure we do not return with no + // We provide a non-nil *kmsg.ProduceResponse for + // *kmsg.ProduceRequest just to ensure we do not return with no // error and no kmsg.Response, per the client contract. + // + // As documented on the client's Request function, if this is a + // *kmsg.ProduceRequest, we rewrite the acks to match the + // client configured acks, and we rewrite the timeout millis if + // acks is 0. We do this to ensure that our discard goroutine + // is used correctly, and so that we do not write a request + // with 0 acks and then send it to handleResps where it will + // not get a response. var isNoResp bool var noResp kmsg.Response switch r := req.(type) { case *produceRequest: isNoResp = r.acks == 0 case *kmsg.ProduceRequest: - isNoResp = r.Acks == 0 + r.Acks = b.cl.cfg.acks.val + if r.Acks == 0 { + isNoResp = true + r.TimeoutMillis = int32(b.cl.cfg.produceTimeout.Milliseconds()) + } noResp = &kmsg.ProduceResponse{Version: req.GetVersion()} } + + corrID, err := cxn.writeRequest(pr.ctx, pr.enqueue, req) + + if err != nil { + pr.promise(nil, err) + cxn.die() + continue + } + if isNoResp { pr.promise(noResp, nil) continue @@ -949,7 +962,7 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { // // Thus, we just simply discard everything. // -// Since we still want to support hooks, read still read the size of a response +// Since we still want to support hooks, we still read the size of a response // and then read that entire size before calling a hook. There are a few // differences: // diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 6985b05a..f54fccd2 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -492,8 +492,10 @@ func (cl *Client) Close() { // If using this function to issue kmsg.ProduceRequest's, you must configure // the client with the same RequiredAcks option that you use in the request. // If you are issuing produce requests with 0 acks, you must configure the -// client with the same timeout you use in the request. It is strongly -// recommended to not issue raw kmsg.ProduceRequest's. +// client with the same timeout you use in the request. The client will +// internally rewrite the incoming request's acks to match the client's +// configuration, and it will rewrite the timeout millis if the acks is 0. It +// is strongly recommended to not issue raw kmsg.ProduceRequest's. func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { resps, merge := cl.shardedRequest(ctx, req) // If there is no merge function, only one request was issued directly