Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Bugs Fixed

- Handle a missing CountDetails node in the returned responses for Get<Entity>RuntimeProperties which could cause a panic. (#18213)
- Adding the `associated-link-name` property to management operations (RenewLock, settlement and others), which
can help extend link lifetime (#18291)

## 1.0.0 (2022-05-16)

Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/internal/amqpLinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func assertFailedLinks(t *testing.T, lwid *LinksWithID, expectedErr error, expec
})
require.ErrorIs(t, err, expectedErr)

_, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1)
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
require.ErrorIs(t, err, expectedRPCError)

msg, err := lwid.Receiver.Receive(context.TODO())
Expand All @@ -58,7 +58,7 @@ func assertLinks(t *testing.T, lwid *LinksWithID) {
})
require.NoError(t, err)

_, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1)
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
require.NoError(t, err)

require.NoError(t, lwid.Receiver.IssueCredit(1))
Expand Down
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse,
return r.Resp, r.Error
}

func (r *FakeAMQPReceiver) LinkName() string {
return "fakelink"
}

func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error {
r.RequestedCredits += credit

Expand Down
1 change: 1 addition & 0 deletions sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AMQPReceiverCloser interface {
type AMQPSender interface {
Send(ctx context.Context, msg *amqp.Message) error
MaxMessageSize() uint64
LinkName() string
}

// AMQPSenderCloser is implemented by *amqp.Sender
Expand Down
46 changes: 35 additions & 11 deletions sdk/messaging/azservicebus/internal/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
DeferredDisposition DispositionStatus = "defered"
)

func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) {
func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, linkName string, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) {
const messagesField, messageField = "messages", "message"

backwardsMode := uint32(0)
Expand All @@ -50,6 +50,8 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive
Value: values,
}

addAssociatedLinkName(linkName, msg)

rsp, err := rpcLink.RPC(ctx, msg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -108,7 +110,7 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive
return transformedMessages, nil
}

func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) {
func PeekMessages(ctx context.Context, rpcLink RPCLink, linkName string, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) {
const messagesField, messageField = "messages", "message"

msg := &amqp.Message{
Expand All @@ -121,6 +123,8 @@ func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64
},
}

addAssociatedLinkName(linkName, msg)

if deadline, ok := ctx.Deadline(); ok {
msg.ApplicationProperties["server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
}
Expand Down Expand Up @@ -218,9 +222,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken
},
}

if linkName != "" {
renewRequestMsg.ApplicationProperties["associated-link-name"] = linkName
}
addAssociatedLinkName(linkName, renewRequestMsg)

response, err := rpcLink.RPC(ctx, renewRequestMsg)

Expand Down Expand Up @@ -257,7 +259,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken
}

// RenewSessionLocks renews a session lock.
func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error) {
func RenewSessionLock(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) (time.Time, error) {
body := map[string]interface{}{
"session-id": sessionID,
}
Expand All @@ -269,6 +271,8 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t
},
}

addAssociatedLinkName(linkName, msg)

resp, err := rpcLink.RPC(ctx, msg)

if err != nil {
Expand All @@ -291,7 +295,7 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t
}

// GetSessionState retrieves state associated with the session.
func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error) {
func GetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) ([]byte, error) {
amqpMsg := &amqp.Message{
Value: map[string]interface{}{
"session-id": sessionID,
Expand All @@ -301,6 +305,8 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]
},
}

addAssociatedLinkName(linkName, amqpMsg)

resp, err := rpcLink.RPC(ctx, amqpMsg)

if err != nil {
Expand Down Expand Up @@ -334,7 +340,7 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]
}

// SetSessionState sets the state associated with the session.
func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error {
func SetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string, state []byte) error {
uuid, err := uuid.New()

if err != nil {
Expand All @@ -352,6 +358,8 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta
},
}

addAssociatedLinkName(linkName, amqpMsg)

resp, err := rpcLink.RPC(ctx, amqpMsg)

if err != nil {
Expand All @@ -368,7 +376,7 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta
// SendDisposition allows you settle a message using the management link, rather than via your
// *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated
// with a link (ex: deferred messages).
func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error {
func SendDisposition(ctx context.Context, rpcLink RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error {
if lockToken == nil {
err := errors.New("lock token on the message is not set, thus cannot send disposition")
return err
Expand Down Expand Up @@ -398,6 +406,8 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID,
Value: value,
}

addAssociatedLinkName(linkName, msg)

// no error, then it was successful
_, err := rpcLink.RPC(ctx, msg)
if err != nil {
Expand All @@ -409,7 +419,7 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID,

// ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers
// that can be used to cancel each message.
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) {
func ScheduleMessages(ctx context.Context, rpcLink RPCLink, linkName string, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) {
if len(messages) <= 0 {
return nil, errors.New("expected one or more messages")
}
Expand Down Expand Up @@ -470,6 +480,8 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim
},
}

addAssociatedLinkName(linkName, msg)

if deadline, ok := ctx.Deadline(); ok {
msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
}
Expand Down Expand Up @@ -502,7 +514,7 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim

// CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery,
// but have not yet ben enqueued.
func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error {
func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, linkName string, seq []int64) error {
msg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
"operation": "com.microsoft:cancel-scheduled-message",
Expand All @@ -512,6 +524,8 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64)
},
}

addAssociatedLinkName(linkName, msg)

if deadline, ok := ctx.Deadline(); ok {
msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond)
}
Expand All @@ -527,3 +541,13 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64)

return nil
}

// addAssociatedLinkName adds the 'associated-link-name' application
// property to the AMQP message. Setting this property associates
// management link activity with a sender or receiver link, which can
// prevent it from idling out.
func addAssociatedLinkName(linkName string, msg *amqp.Message) {
if linkName != "" {
Comment thread
richardpark-msft marked this conversation as resolved.
msg.ApplicationProperties["associated-link-name"] = linkName
}
}
4 changes: 4 additions & 0 deletions sdk/messaging/azservicebus/internal/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (tester *rpcTester) Close(ctx context.Context) error {
return nil
}

func (tester *rpcTester) LinkName() string {
return "hello"
}

// receiver functions

func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error {
Expand Down
8 changes: 4 additions & 4 deletions sdk/messaging/azservicebus/messageSettler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type CompleteMessageOptions struct {
func (s *messageSettler) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error {
return s.settleWithRetries(ctx, message, func(receiver internal.AMQPReceiver, rpcLink internal.RPCLink) error {
if s.useManagementLink(message, receiver) {
return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil)
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil)
} else {
return receiver.AcceptMessage(ctx, message.rawAMQPMessage)
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMe
propertiesToModify = options.PropertiesToModify
}

return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
}

var annotations amqp.Annotations
Expand Down Expand Up @@ -127,7 +127,7 @@ func (s *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMess
propertiesToModify = options.PropertiesToModify
}

return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
}

var annotations amqp.Annotations
Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *Receive
propertiesToModify = options.PropertiesToModify
}

return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify)
}

info := map[string]interface{}{
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers
var receivedMessages []*ReceivedMessage

err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveDeferredMessages", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, r.receiveMode, sequenceNumbers)
amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, lwid.Receiver.LinkName(), r.receiveMode, sequenceNumbers)

if err != nil {
return err
Expand Down Expand Up @@ -257,7 +257,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option
updateInternalSequenceNumber = false
}

messages, err := internal.PeekMessages(ctx, links.RPC, sequenceNumber, int32(maxMessageCount))
messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount))

if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type CancelScheduledMessagesOptions struct {
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) error {
err := s.links.Retry(ctx, EventSender, "CancelScheduledMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
return internal.CancelScheduledMessages(ctx, lwv.RPC, sequenceNumbers)
return internal.CancelScheduledMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), sequenceNumbers)
}, s.retryOptions)

return internal.TransformError(err)
Expand All @@ -133,7 +133,7 @@ func (s *Sender) scheduleAMQPMessages(ctx context.Context, messages []*amqp.Mess
var sequenceNumbers []int64

err := s.links.Retry(ctx, EventSender, "ScheduleMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
sn, err := internal.ScheduleMessages(ctx, lwv.RPC, scheduledEnqueueTime, messages)
sn, err := internal.ScheduleMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), scheduledEnqueueTime, messages)

if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/session_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSess
var sessionState []byte

err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "GetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
s, err := internal.GetSessionState(ctx, lwv.RPC, sr.SessionID())
s, err := internal.GetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID())

if err != nil {
return err
Expand All @@ -221,7 +221,7 @@ type SetSessionStateOptions struct {
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error {
err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
return internal.SetSessionState(ctx, lwv.RPC, sr.SessionID(), state)
return internal.SetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID(), state)
}, sr.inner.retryOptions)

return internal.TransformError(err)
Expand All @@ -237,7 +237,7 @@ type RenewSessionLockOptions struct {
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error {
err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error {
newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, *sr.sessionID)
newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, lwv.Receiver.LinkName(), *sr.sessionID)

if err != nil {
return err
Expand Down