Skip to content

Commit

Permalink
kgo: fix NoAck produce requests
Browse files Browse the repository at this point in the history
Kafka sends **no response** when we produce with no acks, so the current
code was wrong: we would send a request and then it would effectively
hang until the read timeout. Then we would re-send the request.

This now adds the proper shortcut logic such that as soon as we write
the request successfull, we finish all promises.

We still retry on error because if we receive an error, we did a partial
write and kafka did not understand our request.
  • Loading branch information
twmb committed Feb 18, 2021
1 parent d1ecc7b commit 2bea568
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
21 changes: 21 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,27 @@ func (b *broker) handleReqs() {
continue
}

// Produce requests (and only produce requests) can be written
// without receiving a reply. If we see required acks is 0,
// then we immediately call the promise with no response.
//
// We provide a non-nil *kmsg.FetchResponse for
// *kmsg.FetchRequest just to ensure we do not return with no
// error and no kmsg.Response, per the client contract.
var isNoResp bool
var noResp kmsg.Response
switch r := req.(type) {
case *produceRequest:
isNoResp = r.acks == 0
case *kmsg.ProduceRequest:
isNoResp = r.Acks == 0
noResp = &kmsg.ProduceResponse{Version: req.GetVersion()}
}
if isNoResp {
pr.promise(noResp, nil)
continue
}

rt, _ := cxn.cl.connTimeoutFn(req)

cxn.waitResp(promisedResp{
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ type Record struct {

// Offset is the offset that a record is written as.
//
// For producing, this is left unset. This will be set by the client
// as appropriate.
// For producing, this is left unset. This will be set by the client as
// appropriate. If you are producing with no acks, this will just be
// the offset used in the produce request and does not mirror the
// offset actually stored within Kafka.
Offset int64
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,22 @@ func (s *sink) handleReqResp(req *produceRequest, resp kmsg.Response, err error)
s.firstRespCheck(req.version)
atomic.StoreUint32(&s.consecutiveFailures, 0)

// If we have no acks, we will have no response. The following block is
// basically an extremely condensed version of everything that follows.
// We *do* retry on error even with no acks, because an error would
// mean the write itself failed.
if req.acks == 0 {
for _, partitions := range req.batches {
for partition, batch := range partitions {
if !batch.isFirstBatchInRecordBuf() {
continue
}
s.cl.finishBatch(batch.recBatch, req.producerID, req.producerEpoch, partition, 0, nil)
}
}
return
}

var reqRetry seqRecBatches // handled at the end

pr := resp.(*kmsg.ProduceResponse)
Expand Down

0 comments on commit 2bea568

Please sign in to comment.