Skip to content
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions)

// AcceptSessionForQueue accepts a session from a queue with a specific session ID.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand All @@ -243,6 +244,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin

// AcceptSessionForSubscription accepts a session from a subscription with a specific session ID.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand Down Expand Up @@ -270,6 +272,7 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam

// AcceptNextSessionForQueue accepts the next available session from a queue.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand All @@ -296,6 +299,7 @@ func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName s

// AcceptNextSessionForSubscription accepts the next available session from a subscription.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand Down
30 changes: 30 additions & 0 deletions sdk/messaging/azservicebus/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azservicebus

import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
)

// Code is an error code, usable by consuming code to work with
// programatically.
type Code = exported.Code
Comment thread
richardpark-msft marked this conversation as resolved.

const (
// CodeConnectionLost means our connection was lost and all retry attempts failed.
// This typically reflects an extended outage or connection disruption and may
// require manual intervention.
CodeConnectionLost = exported.CodeConnectionLost

// CodeLockLost means that the lock token you have for a message has expired.
// This message will be available again after the lock period expires, or, potentially
// go to the dead letter queue if delivery attempts have been exceeded.
CodeLockLost = exported.CodeLockLost
)

// Error represents a Service Bus specific error.
// NOTE: the Code is considered part of the published API but the message that
// comes back from Error(), as well as the underlying wrapped error, are NOT and
// are subject to change.
type Error = exported.Error
25 changes: 25 additions & 0 deletions sdk/messaging/azservicebus/example_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azservicebus_test

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -20,6 +21,9 @@ func ExampleClient_NewReceiverForSubscription() {
},
)
exitOnError("Failed to create Receiver", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
}

func ExampleClient_NewReceiverForQueue() {
Expand All @@ -30,6 +34,9 @@ func ExampleClient_NewReceiverForQueue() {
},
)
exitOnError("Failed to create Receiver", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
}

func ExampleClient_NewReceiverForQueue_deadLetterQueue() {
Expand All @@ -41,6 +48,9 @@ func ExampleClient_NewReceiverForQueue_deadLetterQueue() {
},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
}

func ExampleClient_NewReceiverForSubscription_deadLetterQueue() {
Expand All @@ -53,6 +63,9 @@ func ExampleClient_NewReceiverForSubscription_deadLetterQueue() {
},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)

// close the receiver when it's no longer needed
defer receiver.Close(context.TODO())
}

func ExampleReceiver_ReceiveMessages() {
Expand Down Expand Up @@ -80,6 +93,18 @@ func ExampleReceiver_ReceiveMessages() {
err = receiver.CompleteMessage(context.TODO(), message, nil)

if err != nil {
var sbErr *azservicebus.Error

if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
// The message lock has expired. This isn't fatal for the client, but it does mean
// that this message can be received by another Receiver (or potentially this one!).
fmt.Printf("Message lock expired\n")

// You can extend the message lock by calling receiver.RenewMessageLock(msg) before the
// message lock has expired.
continue
}

panic(err)
}

Expand Down
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/example_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func ExampleClient_NewSender() {
if err != nil {
panic(err)
}

// close the sender when it's no longer needed
defer sender.Close(context.TODO())
}

func ExampleSender_SendMessage_message() {
Expand Down
13 changes: 13 additions & 0 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ type FakeAMQPReceiver struct {
}
}

type FakeRPCLink struct {
Resp *RPCResponse
Error error
}

func (r *FakeRPCLink) Close(ctx context.Context) error {
return nil
}

func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error) {
return r.Resp, r.Error
}

func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error {
r.RequestedCredits += credit

Expand Down
78 changes: 48 additions & 30 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"reflect"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/go-amqp"
)

Expand All @@ -37,38 +38,45 @@ const (
RecoveryKindConn RecoveryKind = "connection"
)

type SBErrInfo struct {
inner error
RecoveryKind RecoveryKind
}

func (sbe *SBErrInfo) String() string {
return sbe.inner.Error()
}

func (sbe *SBErrInfo) AsError() error {
return sbe.inner
}

func IsFatalSBError(err error) bool {
return GetSBErrInfo(err).RecoveryKind == RecoveryKindFatal
return GetRecoveryKind(err) == RecoveryKindFatal
}

// GetSBErrInfo wraps the passed in 'err' with a proper error with one of either:
// - `fatalServiceBusError` if no recovery is possible.
// - `serviceBusError` if the error is recoverable. The `recoveryKind` field contains the
// type of recovery needed.
func GetSBErrInfo(err error) *SBErrInfo {
// TransformError will create a proper error type that users
// can potentially inspect.
// If the error is actionable then it'll be of type exported.Error which
// has a 'Code' field that can be used programatically.
// If it's not actionable or if it's nil it'll just be returned.
func TransformError(err error) error {
if err == nil {
return nil
}

sbe := &SBErrInfo{
inner: err,
RecoveryKind: GetRecoveryKind(err),
_, ok := err.(*exported.Error)

if ok {
// it's already been wrapped.
return err
}

if isLockLostError(err) {
return exported.NewError(exported.CodeLockLost, err)
}

return sbe
rk := GetRecoveryKind(err)

switch rk {
case RecoveryKindLink:
// note that we could give back a more differentiated error code
// here but it's probably best to just give the customer the simplest
// recovery mechanism possible.
return exported.NewError(exported.CodeConnectionLost, err)
case RecoveryKindConn:
return exported.NewError(exported.CodeConnectionLost, err)
default:
// isn't one of our specifically called out cases so we'll just return it.
return err
}
}

func IsDetachError(err error) bool {
Expand Down Expand Up @@ -97,6 +105,8 @@ func IsDrainingError(err error) bool {
return strings.Contains(err.Error(), "link is currently draining")
}

const errorConditionLockLost = amqp.ErrorCondition("com.microsoft:message-lock-lost")

var amqpConditionsToRecoveryKind = map[amqp.ErrorCondition]RecoveryKind{
// no recovery needed, these are temporary errors.
amqp.ErrorCondition("com.microsoft:server-busy"): RecoveryKindNone,
Expand All @@ -118,7 +128,7 @@ var amqpConditionsToRecoveryKind = map[amqp.ErrorCondition]RecoveryKind{
amqp.ErrorNotAllowed: RecoveryKindFatal, // "amqp:not-allowed"
amqp.ErrorCondition("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal
amqp.ErrorCondition("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal,
amqp.ErrorCondition("com.microsoft:message-lock-lost"): RecoveryKindFatal,
errorConditionLockLost: RecoveryKindFatal,
}

// GetRecoveryKindForSession determines the recovery type for session-based links.
Expand Down Expand Up @@ -187,7 +197,7 @@ func GetRecoveryKind(err error) RecoveryKind {
}
}

var rpcErr rpcError
var rpcErr RPCError

if errors.As(err, &rpcErr) {
// Described more here:
Expand Down Expand Up @@ -304,12 +314,20 @@ func (e ErrConnectionClosed) Error() string {
}

func isLockLostError(err error) bool {
var rpcErr rpcError
var rpcErr RPCError

if errors.As(err, &rpcErr) {
if rpcErr.Resp.Code == RPCResponseCodeLockLost {
return true
}
// this is the error you get if you settle on the management$ link
// with an expired locktoken.
if errors.As(err, &rpcErr) && rpcErr.Resp.Code == RPCResponseCodeLockLost {
return true
}

var amqpErr *amqp.Error

// this is the error you get if you settle on the actual receiver link you
// got the message on with an expired locktoken.
if errors.As(err, &amqpErr) && amqpErr.Condition == errorConditionLockLost {
return true
}

return false
Expand Down
Loading