Skip to content

Commit d620765

Browse files
committed
sink: tighten canFailFromLoadErrs
The old version set canFailFromLoadErrs=false immediately once a batch was added to a request, well before the request is written. It is possible to have failures and delays that cause the request to never be written, but our record could never be failed. We now only set canFailFromLoadErrs=false at the moment we are about to write the request. Closes #239.
1 parent 6c0abd1 commit d620765

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

pkg/kgo/sink.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1609,7 +1609,6 @@ func (p *produceRequest) tryAddBatch(produceVersion int32, recBuf *recBuf, batch
16091609
}
16101610

16111611
batch.tries++
1612-
batch.canFailFromLoadErrs = false
16131612
p.wireLength += batchWireLength
16141613
p.batches.addBatch(
16151614
recBuf.topic,
@@ -1932,6 +1931,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
19321931
batch.mu.Unlock()
19331932
continue
19341933
}
1934+
batch.canFailFromLoadErrs = false // we are going to write this batch: the response status is now unknown
19351935
var pmetrics ProduceBatchMetrics
19361936
if p.version < 3 {
19371937
dst, pmetrics = batch.appendToAsMessageSet(dst, uint8(p.version), p.compressor)

0 commit comments

Comments
 (0)