Skip to content

Commit

Permalink
Add IsTimeout method to Error (#903)
Browse files Browse the repository at this point in the history
This convenience method just checks if Error code is
ErrTimedOut/ErrTimedOutQueue.

Also modifies the example in README.md.

Modifies the CHANGELOG.md.
  • Loading branch information
milindl authored Nov 28, 2022
1 parent 41f1cff commit 921537f
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ This is a maintenance release:
will be used for subsequent (new) connections to a broker.
* Channel based producer (Producer `ProduceChannel()`) and channel based
consumer (Consumer `Events()`) are deprecated.
* Added `IsTimeout()` on Error type. This is a convenience method that checks
if the error is due to a timeout.


## v1.9.2
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func main() {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else if err.(kafka.Error).Code() != kafka.ErrTimedOut {
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// kafka.ErrTimedOut is not considered an error because it is
// raised by ReadMessage on timeout.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (c *Consumer) Logs() chan LogEvent {
// a new message or error. `timeout` may be set to -1 for
// indefinite wait.
//
// Timeout is returned as (nil, err) where err is `err.(kafka.Error).Code() == kafka.ErrTimedOut`.
// Timeout is returned as (nil, err) where `err.(kafka.Error).IsTimeout() == true`.
//
// Messages are returned as (msg, nil),
// while general errors are returned as (nil, err),
Expand Down
18 changes: 18 additions & 0 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ func TestConsumerAPIs(t *testing.T) {
t.Errorf("Committed() failed but returned non-nil Offsets: %s\n", offsets)
}

// Test timeouts using ReadMessage.
msg, err := c.ReadMessage(time.Millisecond)
t.Logf("ReadMessage() returned message %s and error %s\n", msg, err)

// Check both ErrTimedOut and IsTimeout() to ensure they're consistent.
if err == nil || !err.(Error).IsTimeout() {
t.Errorf("ReadMessage() should time out, instead got %s\n", err)
}
if err == nil || err.(Error).Code() != ErrTimedOut {
t.Errorf("ReadMessage() should time out, instead got %s\n", err)
}
if msg != nil {
t.Errorf("ReadMessage() should not return a message in case of error\n")
}

err = c.Close()
if err != nil {
t.Errorf("Close failed: %s", err)
Expand Down Expand Up @@ -295,6 +310,9 @@ func TestConsumerAssignment(t *testing.T) {
if err.(Error).Code() != ErrTimedOut {
t.Errorf("Expected ReadMessage to fail with ErrTimedOut, not %v", err)
}
if !err.(Error).IsTimeout() {
t.Errorf("Expected ReadMessage to fail with a timeout error, not %v", err)
}

if tmout == 0 {
if duration.Seconds() > 0.1 {
Expand Down
6 changes: 6 additions & 0 deletions kafka/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ func (e Error) IsRetriable() bool {
return e.retriable
}

// IsTimeout returns true if the error is a timeout error.
// A timeout error indicates that the operation timed out locally.
func (e Error) IsTimeout() bool {
return e.code == ErrTimedOut || e.code == ErrTimedOutQueue
}

// TxnRequiresAbort returns true if the error is an abortable transaction error
// that requires the application to abort the current transaction with
// AbortTransaction() and start a new transaction with BeginTransaction()
Expand Down

0 comments on commit 921537f

Please sign in to comment.