diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index a5a073ae..6c78e8bc 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -127,16 +127,18 @@ var ( // Returned when trying to produce a record outside of a transaction. errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction") - // Returned when records are unable to be produced and they hit the - // configured record timeout limit. - errRecordTimeout = errors.New("records have timed out before they were able to be produced") - - errRecordRetries = errors.New("record failed after being retried too many times") - ////////////// // EXTERNAL // ////////////// + // ErrRecordTimeout is passed to produce promises when records are + // unable to be produced within the RecordDeliveryTimeout. + ErrRecordTimeout = errors.New("records have timed out before they were able to be produced") + + // ErrRecordRetries is passed to produce promises when records are + // unable to be produced after RecordRetries attempts. + ErrRecordRetries = errors.New("record failed after being retried too many times") + // ErrMaxBuffered is returned when producing with manual flushing // enabled and the maximum amount of records are buffered. ErrMaxBuffered = errors.New("manual flushing is enabled and the maximum amount of records are buffered, cannot buffer more") diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 872887d6..9fbd0b64 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -705,7 +705,7 @@ func (cl *Client) waitUnknownTopic( case <-cl.ctx.Done(): err = ErrClientClosed case <-after: - err = errRecordTimeout + err = ErrRecordTimeout case retriableErr, ok := <-unknown.wait: if !ok { cl.cfg.logger.Log(LogLevelInfo, "done waiting for metadata for new topic", "topic", topic) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index a159b1c0..c82ac85e 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1272,9 +1272,9 @@ func (b *recBatch) maybeFailErr(cfg *cfg) error { } } if b.isTimedOut(cfg.recordTimeout) { - return errRecordTimeout + return ErrRecordTimeout } else if b.tries >= cfg.recordRetries { - return errRecordRetries + return ErrRecordRetries } else if b.owner.cl.producer.isAborting() { return ErrAborting }