Skip to content

Commit

Permalink
sink: allow concurrent produces
Browse files Browse the repository at this point in the history
When doSequenced from the broker to the sink, I accidentally lost the
concurrent issuing aspect. The sequenced aspect in the sink issued and
processed requests in order, but using waitResp meant we actually never
issued more than one request at a time.

Thix fixes that by ensuring we issue the produce with `do` before we go
to waiting for a response.
  • Loading branch information
twmb committed Apr 14, 2021
1 parent ebc8ee2 commit b2a0578
Showing 1 changed file with 40 additions and 28 deletions.
68 changes: 40 additions & 28 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type sink struct {

drainState workLoop

// seqReqResps, guarded by seqReqRespsMu, contains responses that must
// seqRespsMu, guarded by seqRespsMu, contains responses that must
// be handled sequentially. These responses are handled asyncronously,
// but sequentially.
seqReqRespsMu sync.Mutex
seqReqResps []seqSinkReqResp
seqRespsMu sync.Mutex
seqResps []*seqResp

backoffMu sync.Mutex // guards the following
needBackoff bool
Expand All @@ -47,8 +47,10 @@ type sink struct {
recBufsStart int // incremented every req to avoid large batch starvation
}

type seqSinkReqResp struct {
req kmsg.Request
type seqResp struct {
resp kmsg.Response
err error
done chan struct{}
promise func(kmsg.Response, error)
}

Expand Down Expand Up @@ -301,42 +303,52 @@ func (s *sink) produce(sem <-chan struct{}) bool {
return moreToDrain
}

// With handleseqReqResps below, this function ensures that all request responses
// With handleSeqResps below, this function ensures that all request responses
// are handled in order. We use this guarantee while in handleReqResp below.
//
// Note that some request may finish while a later concurrently issued one
// is successful; this is fine.
func (s *sink) doSequenced(
req kmsg.Request,
promise func(kmsg.Response, error),
) {
s.seqReqRespsMu.Lock()
defer s.seqReqRespsMu.Unlock()

s.seqReqResps = append(s.seqReqResps, seqSinkReqResp{req, promise})
if len(s.seqReqResps) == 1 {
go s.handleSeqReqResps(s.seqReqResps[0])
wait := &seqResp{
done: make(chan struct{}),
promise: promise,
}
}

// Ensures that all request responses are processed in order.
func (s *sink) handleSeqReqResps(reqResp seqSinkReqResp) {
more:
br, err := s.cl.brokerOrErr(s.cl.ctx, s.nodeID, ErrUnknownBroker)
if err != nil {
reqResp.promise(nil, err)
wait.err = err
close(wait.done)
} else {
br.do(s.cl.ctx, req, func(resp kmsg.Response, err error) {
wait.resp = resp
wait.err = err
close(wait.done)
})
}
resp, err := br.waitResp(s.cl.ctx, reqResp.req)
reqResp.promise(resp, err)

s.seqReqRespsMu.Lock()
s.seqReqResps = s.seqReqResps[1:]
if len(s.seqReqResps) > 0 {
reqResp = s.seqReqResps[0]
s.seqReqRespsMu.Unlock()
s.seqRespsMu.Lock()
defer s.seqRespsMu.Unlock()

s.seqResps = append(s.seqResps, wait)
if len(s.seqResps) == 1 {
go s.handleSeqResps(s.seqResps[0])
}
}

// Ensures that all request responses are processed in order.
func (s *sink) handleSeqResps(wait *seqResp) {
more:
<-wait.done
wait.promise(wait.resp, wait.err)

s.seqRespsMu.Lock()
s.seqResps = s.seqResps[1:]
if len(s.seqResps) > 0 {
wait = s.seqResps[0]
s.seqRespsMu.Unlock()
goto more
}
s.seqReqRespsMu.Unlock()
s.seqRespsMu.Unlock()
}

// Issues an AddPartitionsToTxnRequest before a produce request for all
Expand Down

0 comments on commit b2a0578

Please sign in to comment.