Skip to content

Commit

Permalink
sink: simplify decInflight for batches in req
Browse files Browse the repository at this point in the history
This simplifies the logic: grab all batches before issuing a request,
finish everything once handling the response is done. Importantly, this
also ensures that we do not double finish if we had a txnal req that
removed a partition from the request (because it would not be removed
from batchesFlat).
  • Loading branch information
twmb committed Aug 20, 2021
1 parent 39af32a commit 4c20135
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,11 @@ func (s *sink) produce(sem <-chan struct{}) bool {
req.backoffSeq = s.backoffSeq // safe to read outside mu since we are in drain loop

produced = true

batches := req.batches.sliced()
s.doSequenced(req, func(br *broker, resp kmsg.Response, err error) {
s.handleReqResp(br, req, resp, err)
batches.eachOwnerLocked((*recBatch).decInflight)
<-sem
})
return moreToDrain
Expand Down Expand Up @@ -544,11 +547,6 @@ func (s *sink) handleReqRespNoack(b *bytes.Buffer, debug bool, req *produceReque
}

func (s *sink) handleReqResp(br *broker, req *produceRequest, resp kmsg.Response, err error) {
// The last thing we do before returning is to decrement the inflight
// count on all used batches. This func uses batchesFlat, meaning map
// deletions below do not affect us.
defer req.decBatchesInflight()

if err != nil {
s.handleReqClientErr(req, err)
return
Expand Down Expand Up @@ -1399,11 +1397,10 @@ type produceRequest struct {

backoffSeq uint32

txnID *string
acks int16
timeout int32
batches seqRecBatches
batchesFlat []*recBatch
txnID *string
acks int16
timeout int32
batches seqRecBatches

producerID int64
producerEpoch int16
Expand Down Expand Up @@ -1502,19 +1499,9 @@ func (r *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
recBuf.seq,
batch,
)
r.batchesFlat = append(r.batchesFlat, batch)
return true
}

// decBatchesInflight is called at the end of handling a req response.
func (r *produceRequest) decBatchesInflight() {
for _, batch := range r.batchesFlat {
batch.owner.mu.Lock()
batch.decInflight()
batch.owner.mu.Unlock()
}
}

// seqRecBatch: a recBatch with a sequence number.
type seqRecBatch struct {
seq int32
Expand Down Expand Up @@ -1563,6 +1550,26 @@ func (rbs seqRecBatches) eachOwnerLocked(fn func(seqRecBatch)) {
})
}

func (rbs seqRecBatches) sliced() recBatches {
var batches []*recBatch
for _, partitions := range rbs {
for _, batch := range partitions {
batches = append(batches, batch.recBatch)
}
}
return batches
}

type recBatches []*recBatch

func (bs recBatches) eachOwnerLocked(fn func(*recBatch)) {
for _, b := range bs {
b.owner.mu.Lock()
fn(b)
b.owner.mu.Unlock()
}
}

//////////////
// COUNTING // - this section is all about counting how bytes lay out on the wire
//////////////
Expand Down

0 comments on commit 4c20135

Please sign in to comment.