diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index fa53ee0d..774b1942 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -26,11 +26,11 @@ type sink struct { drainState workLoop - // seqReqResps, guarded by seqReqRespsMu, contains responses that must + // seqRespsMu, guarded by seqRespsMu, contains responses that must // be handled sequentially. These responses are handled asyncronously, // but sequentially. - seqReqRespsMu sync.Mutex - seqReqResps []seqSinkReqResp + seqRespsMu sync.Mutex + seqResps []*seqResp backoffMu sync.Mutex // guards the following needBackoff bool @@ -47,8 +47,10 @@ type sink struct { recBufsStart int // incremented every req to avoid large batch starvation } -type seqSinkReqResp struct { - req kmsg.Request +type seqResp struct { + resp kmsg.Response + err error + done chan struct{} promise func(kmsg.Response, error) } @@ -301,42 +303,52 @@ func (s *sink) produce(sem <-chan struct{}) bool { return moreToDrain } -// With handleseqReqResps below, this function ensures that all request responses +// With handleSeqResps below, this function ensures that all request responses // are handled in order. We use this guarantee while in handleReqResp below. -// -// Note that some request may finish while a later concurrently issued one -// is successful; this is fine. func (s *sink) doSequenced( req kmsg.Request, promise func(kmsg.Response, error), ) { - s.seqReqRespsMu.Lock() - defer s.seqReqRespsMu.Unlock() - - s.seqReqResps = append(s.seqReqResps, seqSinkReqResp{req, promise}) - if len(s.seqReqResps) == 1 { - go s.handleSeqReqResps(s.seqReqResps[0]) + wait := &seqResp{ + done: make(chan struct{}), + promise: promise, } -} -// Ensures that all request responses are processed in order. -func (s *sink) handleSeqReqResps(reqResp seqSinkReqResp) { -more: br, err := s.cl.brokerOrErr(s.cl.ctx, s.nodeID, ErrUnknownBroker) if err != nil { - reqResp.promise(nil, err) + wait.err = err + close(wait.done) + } else { + br.do(s.cl.ctx, req, func(resp kmsg.Response, err error) { + wait.resp = resp + wait.err = err + close(wait.done) + }) } - resp, err := br.waitResp(s.cl.ctx, reqResp.req) - reqResp.promise(resp, err) - s.seqReqRespsMu.Lock() - s.seqReqResps = s.seqReqResps[1:] - if len(s.seqReqResps) > 0 { - reqResp = s.seqReqResps[0] - s.seqReqRespsMu.Unlock() + s.seqRespsMu.Lock() + defer s.seqRespsMu.Unlock() + + s.seqResps = append(s.seqResps, wait) + if len(s.seqResps) == 1 { + go s.handleSeqResps(s.seqResps[0]) + } +} + +// Ensures that all request responses are processed in order. +func (s *sink) handleSeqResps(wait *seqResp) { +more: + <-wait.done + wait.promise(wait.resp, wait.err) + + s.seqRespsMu.Lock() + s.seqResps = s.seqResps[1:] + if len(s.seqResps) > 0 { + wait = s.seqResps[0] + s.seqRespsMu.Unlock() goto more } - s.seqReqRespsMu.Unlock() + s.seqRespsMu.Unlock() } // Issues an AddPartitionsToTxnRequest before a produce request for all