Skip to content

Commit

Permalink
broker: avoid reaping produce cxn on no reads when acks == 0
Browse files Browse the repository at this point in the history
See embedded comment.

Additionally, in the discard goroutine, move the SetReadDeadline to
before our read. There is no reason to clear it after the read if we
errored.
  • Loading branch information
twmb committed Apr 28, 2021
1 parent 43a0009 commit bf5b74c
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,23 +451,37 @@ func (b *broker) reapConnections(idleTimeout time.Duration) (total int) {
b.reapMu.Lock()
defer b.reapMu.Unlock()

for _, cxn := range []*brokerCxn{
b.cxnNormal,
b.cxnProduce,
b.cxnFetch,
for _, reap := range []struct {
cxn *brokerCxn
isProduce bool
}{
{b.cxnNormal, false},
{b.cxnProduce, true},
{b.cxnFetch, false},
} {
if cxn == nil || atomic.LoadInt32(&cxn.dead) == 1 {
if reap.cxn == nil || atomic.LoadInt32(&reap.cxn.dead) == 1 {
continue
}
lastWrite := time.Unix(0, atomic.LoadInt64(&cxn.lastWrite))
if time.Since(lastWrite) > idleTimeout && atomic.LoadUint32(&cxn.writing) == 0 {
cxn.die()
lastWrite := time.Unix(0, atomic.LoadInt64(&reap.cxn.lastWrite))
if time.Since(lastWrite) > idleTimeout && atomic.LoadUint32(&reap.cxn.writing) == 0 {
reap.cxn.die()
total++
continue
}
lastRead := time.Unix(0, atomic.LoadInt64(&cxn.lastRead))
if time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&cxn.reading) == 0 {
cxn.die()

// If our connection is a produce connection and our client is
// configured with acks == 0, we only reap it if we have read
// on it (via the discard goroutine). We are not supposed to
// have any reads when configured with no acks; if we do, the
// broker implemented the Kafka protocol incorrectly, but since
// are receiving reads, then we can reap.
lastReadNano := atomic.LoadInt64(&reap.cxn.lastRead)
if b.cl.cfg.acks.val == 0 && reap.isProduce && lastReadNano == 0 {
continue
}
lastRead := time.Unix(0, lastReadNano)
if time.Since(lastRead) > idleTimeout && atomic.LoadUint32(&reap.cxn.reading) == 0 {
reap.cxn.die()
total++
}
}
Expand Down Expand Up @@ -1103,6 +1117,7 @@ func (cxn *brokerCxn) discard() {

readDone = make(chan struct{})
)
cxn.conn.SetReadDeadline(time.Time{})
go func() {
defer close(readDone)
if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil {
Expand Down Expand Up @@ -1153,7 +1168,6 @@ func (cxn *brokerCxn) discard() {
<-readDone
return
}
cxn.conn.SetReadDeadline(time.Time{})

cxn.cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(BrokerReadHook); ok {
Expand Down

0 comments on commit bf5b74c

Please sign in to comment.