From 8fab998a548652bf7b68dbb939b89ce05a8e222c Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 26 Feb 2021 16:43:10 -0700 Subject: [PATCH] kgo: use a discard goroutine when produce acks is 0 Kafka proper does *not* reply at all to produce requests when required acks is 0. The client used to wait for responses to this, but this was never actually tested against Kafka itself. The recent fix to **never** read responses for produce requests with acks == 0 actually breaks Microsoft EventHubs when using acks == 0, because MS EH actually still sends produce responses. The "easy" fix for this is, if the client is configured with acks == 0, then we can have a discard goroutine that just drains the connection using a small read buffer. This goroutine is only used for the producer connection, and we expect responses to be small. This also adds a large doc about how it is NOT recommended to use the client for raw kmsg.ProduceRequest's, but also describes how to do so correctly. I do not think that Sarama works against MS EH, because Sarama only reads the connection if acks is non-zero. I think librdkafka works by having a dedicated thread reading the connection always, and when it gets responses, searching for which request to fulfill. --- pkg/kgo/broker.go | 183 +++++++++++++++++++++++++++++++++++++--------- pkg/kgo/client.go | 6 ++ 2 files changed, 155 insertions(+), 34 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 1308d31a..2e7d4e65 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -358,8 +358,10 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) } // and returning an error of if that fails. func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, error) { pcxn := &b.cxnNormal + var isProduceCxn bool // see docs on brokerCxn.discard for why we do this if reqKey == 0 { pcxn = &b.cxnProduce + isProduceCxn = true } else if reqKey == 1 { pcxn = &b.cxnFetch } @@ -381,7 +383,7 @@ func (b *broker) loadConnection(ctx context.Context, reqKey int16) (*brokerCxn, conn: conn, deadCh: make(chan struct{}), } - if err = cxn.init(); err != nil { + if err = cxn.init(isProduceCxn); err != nil { b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "id", b.meta.NodeID, "err", err) cxn.closeConn() return nil, err @@ -443,7 +445,7 @@ type brokerCxn struct { deadCh chan struct{} } -func (cxn *brokerCxn) init() error { +func (cxn *brokerCxn) init(isProduceCxn bool) error { for i := 0; i < len(cxn.versions[:]); i++ { cxn.versions[i] = -1 } @@ -461,7 +463,11 @@ func (cxn *brokerCxn) init() error { } cxn.resps = make(chan promisedResp, 10) - go cxn.handleResps() + if isProduceCxn && cxn.cl.cfg.acks.val == 0 { + go cxn.discard() // see docs on discard for why we do this + } else { + go cxn.handleResps() + } return nil } @@ -787,37 +793,8 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque err = ErrConnDead return } - size := int32(binary.BigEndian.Uint32(sizeBuf)) - if size < 0 { - err = ErrInvalidRespSize - return - } - if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { - // A TLS alert is 21, and a TLS alert has the version - // following, where all major versions are 03xx. We - // look for an alert and major version byte to suspect - // if this we received a TLS alert. - tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2]) - if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 { - versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion) - for _, guess := range []struct { - num uint16 - text string - }{ - {tls.VersionSSL30, "SSL v3"}, - {tls.VersionTLS10, "TLS v1.0"}, - {tls.VersionTLS11, "TLS v1.1"}, - {tls.VersionTLS12, "TLS v1.2"}, - {tls.VersionTLS13, "TLS v1.3"}, - } { - if tlsVersion == guess.num { - versionGuess = guess.text - } - } - err = fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess) - } else { - err = fmt.Errorf("invalid large response size %d > limit %d", size, maxSize) - } + var size int32 + if size, err = cxn.parseReadSize(sizeBuf); err != nil { return } buf = make([]byte, size) @@ -842,6 +819,42 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque return } +// Parses a length 4 slice and enforces the min / max read size based off the +// client configuration. +func (cxn *brokerCxn) parseReadSize(sizeBuf []byte) (int32, error) { + size := int32(binary.BigEndian.Uint32(sizeBuf)) + if size < 0 { + return 0, ErrInvalidRespSize + } + if maxSize := cxn.b.cl.cfg.maxBrokerReadBytes; size > maxSize { + // A TLS alert is 21, and a TLS alert has the version + // following, where all major versions are 03xx. We + // look for an alert and major version byte to suspect + // if this we received a TLS alert. + tlsVersion := uint16(sizeBuf[1]) | uint16(sizeBuf[2]) + if sizeBuf[0] == 21 && tlsVersion&0x0300 != 0 { + versionGuess := fmt.Sprintf("unknown TLS version (hex %x)", tlsVersion) + for _, guess := range []struct { + num uint16 + text string + }{ + {tls.VersionSSL30, "SSL v3"}, + {tls.VersionTLS10, "TLS v1.0"}, + {tls.VersionTLS11, "TLS v1.1"}, + {tls.VersionTLS12, "TLS v1.2"}, + {tls.VersionTLS13, "TLS v1.3"}, + } { + if tlsVersion == guess.num { + versionGuess = guess.text + } + } + return 0, fmt.Errorf("invalid large response size %d > limit %d; the first three bytes recieved appear to be a tls alert record for %s; is this a plaintext connection speaking to a tls endpoint?", size, maxSize, versionGuess) + } + return 0, fmt.Errorf("invalid large response size %d > limit %d", size, maxSize) + } + return size, nil +} + // readResponse reads a response from conn, ensures the correlation ID is // correct, and returns a newly allocated slice on success. func (cxn *brokerCxn) readResponse(ctx context.Context, timeout time.Duration, enqueuedForReadingAt time.Time, key int16, corrID int32, flexibleHeader bool) ([]byte, error) { @@ -928,6 +941,108 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { } } +// If acks are zero, then a real Kafka installation never replies to produce +// requests. Unfortunately, Microsoft EventHubs rolled their own implementation +// and _does_ reply to ack-0 produce requests. We need to process these +// responses, because otherwise kernel buffers will fill up, Microsoft will be +// unable to reply, and then they will stop taking our produce requests. +// +// Thus, we just simply discard everything. +// +// Since we still want to support hooks, read still read the size of a response +// and then read that entire size before calling a hook. There are a few +// differences: +// +// (1) we do not know what version we produced, so we cannot validate the read, +// we just have to trust that the size is valid (and the data follows +// correctly). +// +// (2) rather than creating a slice for the response, we discard the entire +// response into a reusable small slice. The small size is because produce +// responses are relatively small to begin with, so we expect only a few reads +// per response. +// +// (3) we have no time for when the read was enqueued, so we miss that in the +// hook. +// +// (4) we start the time-to-read duration *after* the size bytes are read, +// since we have no idea when a read actually should start, since we should not +// receive responses to begin with. +// +// (5) we set a read deadline *after* the size bytes are read, and only if the +// client has not yet closed. +func (cxn *brokerCxn) discard() { + defer cxn.die() + + discardBuf := make([]byte, 256) + for { + var ( + nread int + err error + timeToRead time.Duration + + deadlineMu sync.Mutex + deadlineSet bool + + readDone = make(chan struct{}) + ) + go func() { + defer close(readDone) + if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil { + err = ErrConnDead + return + } + deadlineMu.Lock() + if !deadlineSet { + cxn.conn.SetReadDeadline(time.Now().Add(cxn.cl.cfg.produceTimeout)) + } + deadlineMu.Unlock() + + readStart := time.Now() + defer func() { timeToRead = time.Since(readStart) }() + var size int32 + if size, err = cxn.parseReadSize(discardBuf[:4]); err != nil { + return + } + + var nread2 int + for size > 0 && err == nil { + discard := discardBuf + if int(size) < len(discard) { + discard = discard[:size] + } + nread2, err = cxn.conn.Read(discard) + nread += nread2 + size -= int32(nread2) // nread2 max is 128 + } + if err != nil { + err = ErrConnDead + } + }() + + select { + case <-readDone: + case <-cxn.cl.ctx.Done(): + deadlineMu.Lock() + deadlineSet = true + deadlineMu.Unlock() + cxn.conn.SetReadDeadline(time.Now()) + <-readDone + return + } + cxn.conn.SetReadDeadline(time.Time{}) + + cxn.cl.cfg.hooks.each(func(h Hook) { + if h, ok := h.(BrokerReadHook); ok { + h.OnRead(cxn.b.meta, 0, nread, 0, timeToRead, err) + } + }) + if err != nil { + return + } + } +} + // handleResps serially handles all broker responses for an single connection. func (cxn *brokerCxn) handleResps() { defer cxn.die() // always track our death diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 5eb8ecb1..6985b05a 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -488,6 +488,12 @@ func (cl *Client) Close() { // The passed context can be used to cancel a request and return early. Note // that if the request was written to Kafka but the context canceled before a // response is received, Kafka may still operate on the received request. +// +// If using this function to issue kmsg.ProduceRequest's, you must configure +// the client with the same RequiredAcks option that you use in the request. +// If you are issuing produce requests with 0 acks, you must configure the +// client with the same timeout you use in the request. It is strongly +// recommended to not issue raw kmsg.ProduceRequest's. func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) { resps, merge := cl.shardedRequest(ctx, req) // If there is no merge function, only one request was issued directly