diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 27b857b3..03d165d8 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -470,16 +470,14 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) { } // If our connection is a produce connection and our client is - // configured with acks == 0, we only reap it if we have read - // on it (via the discard goroutine). We are not supposed to - // have any reads when configured with no acks; if we do, the - // broker implemented the Kafka protocol incorrectly, but since - // are receiving reads, then we can reap. - lastReadNano := atomic.LoadInt64(&reap.cxn.lastRead) - if b.cl.cfg.acks.val == 0 && reap.isProduce && lastReadNano == 0 { + // configured with acks == 0, we never try reaping it due to no + // reads. We are not supposed to have any reads past the + // initial api version negotiation. If we do, the broker + // implemented the Kafka protocol incorrectly, but that's fine. + if b.cl.cfg.acks.val == 0 && reap.isProduce { continue } - lastRead := time.Unix(0, lastReadNano) + lastRead := time.Unix(0, atomic.LoadInt64(&reap.cxn.lastRead)) if time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&reap.cxn.reading) == 0 { reap.cxn.die() total++