From f50b320bce1028f7c92a12f5e660566c23465537 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 11 Jun 2021 23:16:45 -0600 Subject: [PATCH] client: make public ErrClientClosed I noticed that there is no obvious way to detect the client closing when polling fetches / records. The next commit will introduce injecting a fake error on poll when the client is closing, but in order to allow a poll loop to detect client closing, we need to make the error public. --- pkg/kgo/broker.go | 6 +++--- pkg/kgo/client.go | 2 +- pkg/kgo/errors.go | 13 +++++++++++-- pkg/kgo/producer.go | 4 ++-- pkg/kgo/sink.go | 6 +++--- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 6c120d2e..6d5405bb 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -842,7 +842,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim after.Stop() writeErr = ctx.Err() case <-cxn.cl.ctx.Done(): - writeErr = errClientClosing + writeErr = ErrClientClosed case <-cxn.deadCh: writeErr = errChosenBrokerDead } @@ -913,7 +913,7 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du cxn.conn.SetWriteDeadline(time.Now()) <-writeDone if writeErr != nil { - writeErr = errClientClosing + writeErr = ErrClientClosed } case <-ctx.Done(): cxn.conn.SetWriteDeadline(time.Now()) @@ -970,7 +970,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque cxn.conn.SetReadDeadline(time.Now()) <-readDone if err != nil { - err = errClientClosing + err = ErrClientClosed } case <-ctx.Done(): cxn.conn.SetReadDeadline(time.Now()) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 678ccd01..8119df87 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -429,7 +429,7 @@ func (cl *Client) Close() { sns.source.maybeConsume() // same } - cl.failBufferedRecords(errClientClosing) + cl.failBufferedRecords(ErrClientClosed) } // Request issues a request to Kafka, waiting for and returning the response. diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index c42c48e3..c8117d98 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -91,8 +91,6 @@ var ( errRecordRetries = errors.New("record failed after being retried too many times") - errClientClosing = errors.New("client closing") - ////////////// // EXTERNAL // ////////////// @@ -104,6 +102,17 @@ var ( // ErrAborting is returned for all buffered records while // AbortBufferedRecords is being called. ErrAborting = errors.New("client is aborting buffered records") + + // ErrClientClosed is returned in various places when the client's + // Close function has been called. + // + // For producing, records are failed with this error. + // + // For consuming, a fake partition is injected into a poll response + // that has this error. + // + // For any request, the request is failed with this error. + ErrClientClosed = errors.New("client closed") ) // ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index ce8a39b0..019df66d 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -516,7 +516,7 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, select { case <-cl.ctx.Done(): cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure due to dying client", "err", err) - return &producerID{lastID, lastEpoch, errClientClosing}, true + return &producerID{lastID, lastEpoch, ErrClientClosed}, true default: } } @@ -629,7 +629,7 @@ func (cl *Client) waitUnknownTopic( for err == nil { select { case <-cl.ctx.Done(): - err = errClientClosing + err = ErrClientClosed case <-after: err = errRecordTimeout case retriableErr, ok := <-unknown.wait: diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6ae326fa..29e2b722 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -277,7 +277,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { default: s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err) fallthrough - case errClientClosing: + case ErrClientClosed: s.cl.failBufferedRecords(err) } return false @@ -495,8 +495,8 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) { // retry to force a metadata reload. s.handleRetryBatches(req.batches, req.backoffSeq, false, false) - case err == errClientClosing: - s.cl.failBufferedRecords(errClientClosing) + case err == ErrClientClosed: + s.cl.failBufferedRecords(ErrClientClosed) default: s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", s.nodeID, "err", err)