Skip to content

Commit

Permalink
client: make public ErrClientClosed
Browse files Browse the repository at this point in the history
I noticed that there is no obvious way to detect the client closing when
polling fetches / records. The next commit will introduce injecting a
fake error on poll when the client is closing, but in order to allow a
poll loop to detect client closing, we need to make the error public.
  • Loading branch information
twmb committed Jun 12, 2021
1 parent 8b7b43e commit f50b320
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
6 changes: 3 additions & 3 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, enqueuedForWritingAt tim
after.Stop()
writeErr = ctx.Err()
case <-cxn.cl.ctx.Done():
writeErr = errClientClosing
writeErr = ErrClientClosed
case <-cxn.deadCh:
writeErr = errChosenBrokerDead
}
Expand Down Expand Up @@ -913,7 +913,7 @@ func (cxn *brokerCxn) writeConn(ctx context.Context, buf []byte, timeout time.Du
cxn.conn.SetWriteDeadline(time.Now())
<-writeDone
if writeErr != nil {
writeErr = errClientClosing
writeErr = ErrClientClosed
}
case <-ctx.Done():
cxn.conn.SetWriteDeadline(time.Now())
Expand Down Expand Up @@ -970,7 +970,7 @@ func (cxn *brokerCxn) readConn(ctx context.Context, timeout time.Duration, enque
cxn.conn.SetReadDeadline(time.Now())
<-readDone
if err != nil {
err = errClientClosing
err = ErrClientClosed
}
case <-ctx.Done():
cxn.conn.SetReadDeadline(time.Now())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (cl *Client) Close() {
sns.source.maybeConsume() // same
}

cl.failBufferedRecords(errClientClosing)
cl.failBufferedRecords(ErrClientClosed)
}

// Request issues a request to Kafka, waiting for and returning the response.
Expand Down
13 changes: 11 additions & 2 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ var (

errRecordRetries = errors.New("record failed after being retried too many times")

errClientClosing = errors.New("client closing")

//////////////
// EXTERNAL //
//////////////
Expand All @@ -104,6 +102,17 @@ var (
// ErrAborting is returned for all buffered records while
// AbortBufferedRecords is being called.
ErrAborting = errors.New("client is aborting buffered records")

// ErrClientClosed is returned in various places when the client's
// Close function has been called.
//
// For producing, records are failed with this error.
//
// For consuming, a fake partition is injected into a poll response
// that has this error.
//
// For any request, the request is failed with this error.
ErrClientClosed = errors.New("client closed")
)

// ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID,
select {
case <-cl.ctx.Done():
cl.cfg.logger.Log(LogLevelInfo, "producer id initialization failure due to dying client", "err", err)
return &producerID{lastID, lastEpoch, errClientClosing}, true
return &producerID{lastID, lastEpoch, ErrClientClosed}, true
default:
}
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (cl *Client) waitUnknownTopic(
for err == nil {
select {
case <-cl.ctx.Done():
err = errClientClosing
err = ErrClientClosed
case <-after:
err = errRecordTimeout
case retriableErr, ok := <-unknown.wait:
Expand Down
6 changes: 3 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
default:
s.cl.cfg.logger.Log(LogLevelError, "fatal InitProducerID error, failing all buffered records", "broker", s.nodeID, "err", err)
fallthrough
case errClientClosing:
case ErrClientClosed:
s.cl.failBufferedRecords(err)
}
return false
Expand Down Expand Up @@ -495,8 +495,8 @@ func (s *sink) handleReqClientErr(req *produceRequest, err error) {
// retry to force a metadata reload.
s.handleRetryBatches(req.batches, req.backoffSeq, false, false)

case err == errClientClosing:
s.cl.failBufferedRecords(errClientClosing)
case err == ErrClientClosed:
s.cl.failBufferedRecords(ErrClientClosed)

default:
s.cl.cfg.logger.Log(LogLevelWarn, "random error while producing, requeueing unattempted request", "broker", s.nodeID, "err", err)
Expand Down

0 comments on commit f50b320

Please sign in to comment.