diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 1c31950cd110..7c21e6c8b792 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -5,6 +5,8 @@ ### Bugs Fixed - Handle a missing CountDetails node in the returned responses for GetRuntimeProperties which could cause a panic. (#18213) +- Adding the `associated-link-name` property to management operations (RenewLock, settlement and others), which + can help extend link lifetime (#18291) ## 1.0.0 (2022-05-16) diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index 4e8625d804c6..d2194cc07f9b 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -41,7 +41,7 @@ func assertFailedLinks(t *testing.T, lwid *LinksWithID, expectedErr error, expec }) require.ErrorIs(t, err, expectedErr) - _, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1) + _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1) require.ErrorIs(t, err, expectedRPCError) msg, err := lwid.Receiver.Receive(context.TODO()) @@ -58,7 +58,7 @@ func assertLinks(t *testing.T, lwid *LinksWithID) { }) require.NoError(t, err) - _, err = PeekMessages(context.TODO(), lwid.RPC, 0, 1) + _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1) require.NoError(t, err) require.NoError(t, lwid.Receiver.IssueCredit(1)) diff --git a/sdk/messaging/azservicebus/internal/amqp_test_utils.go b/sdk/messaging/azservicebus/internal/amqp_test_utils.go index 05527eb4341a..4585eb389eb3 100644 --- a/sdk/messaging/azservicebus/internal/amqp_test_utils.go +++ b/sdk/messaging/azservicebus/internal/amqp_test_utils.go @@ -95,6 +95,10 @@ func (r *FakeRPCLink) RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, return r.Resp, r.Error } +func (r *FakeAMQPReceiver) LinkName() string { + return "fakelink" +} + func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error { r.RequestedCredits += credit diff --git a/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go b/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go index 38c7e2fb391b..d338856d4fdc 100644 --- a/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go +++ b/sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go @@ -38,6 +38,7 @@ type AMQPReceiverCloser interface { type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message) error MaxMessageSize() uint64 + LinkName() string } // AMQPSenderCloser is implemented by *amqp.Sender diff --git a/sdk/messaging/azservicebus/internal/mgmt.go b/sdk/messaging/azservicebus/internal/mgmt.go index dc0e0374948d..a34ea288204b 100644 --- a/sdk/messaging/azservicebus/internal/mgmt.go +++ b/sdk/messaging/azservicebus/internal/mgmt.go @@ -30,7 +30,7 @@ const ( DeferredDisposition DispositionStatus = "defered" ) -func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) { +func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, linkName string, mode exported.ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) { const messagesField, messageField = "messages", "message" backwardsMode := uint32(0) @@ -50,6 +50,8 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive Value: values, } + addAssociatedLinkName(linkName, msg) + rsp, err := rpcLink.RPC(ctx, msg) if err != nil { return nil, err @@ -108,7 +110,7 @@ func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode exported.Receive return transformedMessages, nil } -func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) { +func PeekMessages(ctx context.Context, rpcLink RPCLink, linkName string, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) { const messagesField, messageField = "messages", "message" msg := &amqp.Message{ @@ -121,6 +123,8 @@ func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64 }, } + addAssociatedLinkName(linkName, msg) + if deadline, ok := ctx.Deadline(); ok { msg.ApplicationProperties["server-timeout"] = uint(time.Until(deadline) / time.Millisecond) } @@ -218,9 +222,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken }, } - if linkName != "" { - renewRequestMsg.ApplicationProperties["associated-link-name"] = linkName - } + addAssociatedLinkName(linkName, renewRequestMsg) response, err := rpcLink.RPC(ctx, renewRequestMsg) @@ -257,7 +259,7 @@ func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockToken } // RenewSessionLocks renews a session lock. -func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error) { +func RenewSessionLock(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) (time.Time, error) { body := map[string]interface{}{ "session-id": sessionID, } @@ -269,6 +271,8 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t }, } + addAssociatedLinkName(linkName, msg) + resp, err := rpcLink.RPC(ctx, msg) if err != nil { @@ -291,7 +295,7 @@ func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (t } // GetSessionState retrieves state associated with the session. -func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error) { +func GetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string) ([]byte, error) { amqpMsg := &amqp.Message{ Value: map[string]interface{}{ "session-id": sessionID, @@ -301,6 +305,8 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([] }, } + addAssociatedLinkName(linkName, amqpMsg) + resp, err := rpcLink.RPC(ctx, amqpMsg) if err != nil { @@ -334,7 +340,7 @@ func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([] } // SetSessionState sets the state associated with the session. -func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error { +func SetSessionState(ctx context.Context, rpcLink RPCLink, linkName string, sessionID string, state []byte) error { uuid, err := uuid.New() if err != nil { @@ -352,6 +358,8 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta }, } + addAssociatedLinkName(linkName, amqpMsg) + resp, err := rpcLink.RPC(ctx, amqpMsg) if err != nil { @@ -368,7 +376,7 @@ func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, sta // SendDisposition allows you settle a message using the management link, rather than via your // *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated // with a link (ex: deferred messages). -func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error { +func SendDisposition(ctx context.Context, rpcLink RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error { if lockToken == nil { err := errors.New("lock token on the message is not set, thus cannot send disposition") return err @@ -398,6 +406,8 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, Value: value, } + addAssociatedLinkName(linkName, msg) + // no error, then it was successful _, err := rpcLink.RPC(ctx, msg) if err != nil { @@ -409,7 +419,7 @@ func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, // ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers // that can be used to cancel each message. -func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) { +func ScheduleMessages(ctx context.Context, rpcLink RPCLink, linkName string, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error) { if len(messages) <= 0 { return nil, errors.New("expected one or more messages") } @@ -470,6 +480,8 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim }, } + addAssociatedLinkName(linkName, msg) + if deadline, ok := ctx.Deadline(); ok { msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond) } @@ -502,7 +514,7 @@ func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Tim // CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery, // but have not yet ben enqueued. -func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error { +func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, linkName string, seq []int64) error { msg := &amqp.Message{ ApplicationProperties: map[string]interface{}{ "operation": "com.microsoft:cancel-scheduled-message", @@ -512,6 +524,8 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) }, } + addAssociatedLinkName(linkName, msg) + if deadline, ok := ctx.Deadline(); ok { msg.ApplicationProperties["com.microsoft:server-timeout"] = uint(time.Until(deadline) / time.Millisecond) } @@ -527,3 +541,13 @@ func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) return nil } + +// addAssociatedLinkName adds the 'associated-link-name' application +// property to the AMQP message. Setting this property associates +// management link activity with a sender or receiver link, which can +// prevent it from idling out. +func addAssociatedLinkName(linkName string, msg *amqp.Message) { + if linkName != "" { + msg.ApplicationProperties["associated-link-name"] = linkName + } +} diff --git a/sdk/messaging/azservicebus/internal/rpc_test.go b/sdk/messaging/azservicebus/internal/rpc_test.go index 157226b658af..91c59d637ef7 100644 --- a/sdk/messaging/azservicebus/internal/rpc_test.go +++ b/sdk/messaging/azservicebus/internal/rpc_test.go @@ -207,6 +207,10 @@ func (tester *rpcTester) Close(ctx context.Context) error { return nil } +func (tester *rpcTester) LinkName() string { + return "hello" +} + // receiver functions func (tester *rpcTester) AcceptMessage(ctx context.Context, msg *amqp.Message) error { diff --git a/sdk/messaging/azservicebus/messageSettler.go b/sdk/messaging/azservicebus/messageSettler.go index bb2f193089b3..1af8dfc471f0 100644 --- a/sdk/messaging/azservicebus/messageSettler.go +++ b/sdk/messaging/azservicebus/messageSettler.go @@ -64,7 +64,7 @@ type CompleteMessageOptions struct { func (s *messageSettler) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error { return s.settleWithRetries(ctx, message, func(receiver internal.AMQPReceiver, rpcLink internal.RPCLink) error { if s.useManagementLink(message, receiver) { - return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil) + return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil) } else { return receiver.AcceptMessage(ctx, message.rawAMQPMessage) } @@ -93,7 +93,7 @@ func (s *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMe propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify) + return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } var annotations amqp.Annotations @@ -127,7 +127,7 @@ func (s *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMess propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify) + return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } var annotations amqp.Annotations @@ -184,7 +184,7 @@ func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *Receive propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, bytesToAMQPUUID(message.LockToken), d, propertiesToModify) + return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } info := map[string]interface{}{ diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index d6edd9bb4b06..1ac9d1416340 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -209,7 +209,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers var receivedMessages []*ReceivedMessage err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveDeferredMessages", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { - amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, r.receiveMode, sequenceNumbers) + amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, lwid.Receiver.LinkName(), r.receiveMode, sequenceNumbers) if err != nil { return err @@ -257,7 +257,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option updateInternalSequenceNumber = false } - messages, err := internal.PeekMessages(ctx, links.RPC, sequenceNumber, int32(maxMessageCount)) + messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount)) if err != nil { return err diff --git a/sdk/messaging/azservicebus/sender.go b/sdk/messaging/azservicebus/sender.go index f885be1d66a6..3dee4d69b9fc 100644 --- a/sdk/messaging/azservicebus/sender.go +++ b/sdk/messaging/azservicebus/sender.go @@ -117,7 +117,7 @@ type CancelScheduledMessagesOptions struct { // If the operation fails it can return an *azservicebus.Error type if the failure is actionable. func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) error { err := s.links.Retry(ctx, EventSender, "CancelScheduledMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { - return internal.CancelScheduledMessages(ctx, lwv.RPC, sequenceNumbers) + return internal.CancelScheduledMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), sequenceNumbers) }, s.retryOptions) return internal.TransformError(err) @@ -133,7 +133,7 @@ func (s *Sender) scheduleAMQPMessages(ctx context.Context, messages []*amqp.Mess var sequenceNumbers []int64 err := s.links.Retry(ctx, EventSender, "ScheduleMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { - sn, err := internal.ScheduleMessages(ctx, lwv.RPC, scheduledEnqueueTime, messages) + sn, err := internal.ScheduleMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), scheduledEnqueueTime, messages) if err != nil { return err diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index ca59ef88c686..72b98f6dcecd 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -199,7 +199,7 @@ func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSess var sessionState []byte err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "GetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { - s, err := internal.GetSessionState(ctx, lwv.RPC, sr.SessionID()) + s, err := internal.GetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID()) if err != nil { return err @@ -221,7 +221,7 @@ type SetSessionStateOptions struct { // If the operation fails it can return an *azservicebus.Error type if the failure is actionable. func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error { err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { - return internal.SetSessionState(ctx, lwv.RPC, sr.SessionID(), state) + return internal.SetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID(), state) }, sr.inner.retryOptions) return internal.TransformError(err) @@ -237,7 +237,7 @@ type RenewSessionLockOptions struct { // If the operation fails it can return an *azservicebus.Error type if the failure is actionable. func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error { err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { - newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, *sr.sessionID) + newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, lwv.Receiver.LinkName(), *sr.sessionID) if err != nil { return err