Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Only retry with retryable amqp errors for sender
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinfish committed Dec 24, 2020
1 parent 705d239 commit 57eca53
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 25 deletions.
17 changes: 17 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,25 @@ package servicebus
import (
"fmt"
"reflect"
"time"

"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/go-amqp"
)

// Error Conditions
const (
// Service Bus Errors
errorServerBusy amqp.ErrorCondition = "com.microsoft:server-busy"
errorTimeout amqp.ErrorCondition = "com.microsoft:timeout"
errorOperationCancelled amqp.ErrorCondition = "com.microsoft:operation-cancelled"
errorContainerClose amqp.ErrorCondition = "com.microsoft:container-close"
)

const (
amqpRetryDefaultTimes int = 3
amqpRetryDefaultDelay time.Duration = time.Second
amqpRetryBusyServerDelay time.Duration = 10 * time.Second
)

type (
Expand Down
75 changes: 50 additions & 25 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,31 +234,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {

switch err.(type) {
case *amqp.Error, *amqp.DetachError:
tab.For(ctx).Debug("recovering connection")
_, retryErr := common.Retry(10, 10*time.Second, func() (interface{}, error) {
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
defer sp.End()

err := s.Recover(ctx)
if err == nil {
tab.For(ctx).Debug("recovered connection")
return nil, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, common.Retryable(err.Error())
}
})

if retryErr != nil {
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
if err := s.Close(ctx); err != nil {
tab.For(ctx).Error(err)
}
return retryErr
err = s.handleAMQPError(ctx, err)
if err != nil {
tab.For(ctx).Error(err)
return err
}
default:
tab.For(ctx).Error(err)
Expand All @@ -268,6 +247,52 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
}
}

// handleAMQPError is called internally when an event has failed to send so we
// can parse the error to determine whether we should attempt to retry sending the event again.
func (s *Sender) handleAMQPError(ctx context.Context, err error) error {
if amqpError, ok := err.(*amqp.Error); ok {
switch amqpError.Condition {
case errorServerBusy:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryBusyServerDelay)
case errorTimeout:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
case errorOperationCancelled:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
case errorContainerClose:
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
default:
return err
}
}
return s.retryRetryableAmqpError(ctx, amqpRetryDefaultTimes, amqpRetryDefaultDelay)
}

func (s *Sender) retryRetryableAmqpError(ctx context.Context, times int, delay time.Duration) error {
tab.For(ctx).Debug("recovering sender connection")
_, retryErr := common.Retry(times, delay, func() (interface{}, error) {
ctx, sp := s.startProducerSpanFromContext(ctx, "sb.Sender.trySend.tryRecover")
defer sp.End()

err := s.Recover(ctx)
if err == nil {
tab.For(ctx).Debug("recovered connection")
return nil, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, common.Retryable(err.Error())
}
})
if retryErr != nil {
tab.For(ctx).Debug("sender recovering retried, but error was unrecoverable")
return retryErr
}
return nil
}

func (s *Sender) connClosedError(ctx context.Context) error {
name := "Sender"
if s.Name != "" {
Expand Down

0 comments on commit 57eca53

Please sign in to comment.