From 29bc60c505f1a608bb4c1c6a5d0853931e9aa8f9 Mon Sep 17 00:00:00 2001 From: jolov Date: Wed, 25 Oct 2023 14:49:44 -0700 Subject: [PATCH 1/7] Add samples demonstrating cross receiver settlement --- .../Azure.Messaging.ServiceBus.sln | 3 +- .../samples/README.md | 2 + ...Sample16_CrossReceiverMessageSettlement.md | 68 ++++++++++++++ ...Sample16_CrossReceiverMessageSettlement.cs | 91 +++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln index 6d0fcc454555..f36785d5a130 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln @@ -35,7 +35,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C samples\Sample11_CloudEvents.md = samples\Sample11_CloudEvents.md samples\Sample12_ManagingRules.md = samples\Sample12_ManagingRules.md samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md - samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md + samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md + samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}" diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md index 03f341ba293d..e3469c2c5c5f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md @@ -26,3 +26,5 @@ description: Samples for the Azure.Messaging.ServiceBus client library - [Managing rules](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample12_ManagingRules.md) - [Advanced configuration](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample13_AdvancedConfiguration.md) - [Interact with the AMQP message](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample14_AMQPMessage.md) +- [Mocking Client Types](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample15_MockingClientTypes.md) +- [Cross-receiver message settlement] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md new file mode 100644 index 000000000000..af3ff18bf15b --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -0,0 +1,68 @@ +# Cross Receiver Message Settlement + +The message settlement APIs on the `ServiceBusReceiver` require passing in the +`ServiceBusReceivedMessage`. This can be limiting for scenarios in which the message needs to be persisted and then +settled across process boundaries. There are two strategies that can be used for settling a message with a different +receiver. The correct strategy to use will depend on the specific scenario. + +## Storing the Entire `ServiceBusReceivedMessage` + +If it is necessary to store the entire message and then rehydrate it in another process, the following can be done +to accomplish this. First we can get the raw AMQP message bytes and lock token as shown. +*Note: The lock token is not +actually part of the AMQP message so it needs to stored separately from the AMQP bytes.* + +```C# Snippet:ServiceBusWriteReceivedMessage +var client1 = new ServiceBusClient(connectionString); +ServiceBusSender sender = client1.CreateSender(queueName); + +var message = new ServiceBusMessage("some message"); +await sender.SendMessageAsync(message); + +ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); +ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); +ReadOnlyMemory amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory(); +ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); +``` + +In order to rehydrate the message in another process, we would do the following: + +```C# Snippet:ServiceBusReadReceivedMessage +AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes)); +ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes)); + +var client2 = new ServiceBusClient(connectionString); +ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); +await receiver2.CompleteMessageAsync(rehydratedMessage); +``` + +## Storing Only the Lock Token + +If the entire message is not needed when settling the message in a different process, you can simply store off the +lock token. In the example below, we store off the lock token with the GUID bytes. You can also simply store a +string if that is easier for your scenario. + +```C# Snippet:ServiceBusWriteReceivedMessageLockToken +var client1 = new ServiceBusClient(connectionString); +ServiceBusSender sender = client1.CreateSender(queueName); + +var message = new ServiceBusMessage("some message"); +await sender.SendMessageAsync(message); + +ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); +ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); +ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); +``` + + +In order to rehydrate the message in another process using the lock token, we would do the following: +*Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the +`ServiceBusReceivedMessage` other than `LockToken` will have default values.* + +```C# Snippet:ServiceBusReadReceivedMessageLockToken +ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.Span)); + +var client2 = new ServiceBusClient(connectionString); +ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); +await receiver2.CompleteMessageAsync(rehydratedMessage); +``` diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs new file mode 100644 index 000000000000..34cdbac0dbb6 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample16_CrossReceiverMessageSettlement.cs @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Azure.Core.Amqp; +using NUnit.Framework; + +namespace Azure.Messaging.ServiceBus.Tests.Samples +{ + public class Sample16_CrossReceiverMessageSettlement : ServiceBusLiveTestBase + { + [Test] + public async Task RehydrateReceivedMessageUsingRawBytes() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { +#if SNIPPET + string connectionString = ""; + string queueName = ""; +#else + string connectionString = TestEnvironment.ServiceBusConnectionString; + string queueName = scope.QueueName; +#endif + + #region Snippet:ServiceBusWriteReceivedMessage + + var client1 = new ServiceBusClient(connectionString); + ServiceBusSender sender = client1.CreateSender(queueName); + + var message = new ServiceBusMessage("some message"); + await sender.SendMessageAsync(message); + + ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); + ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); + ReadOnlyMemory amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory(); + ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); + #endregion + + #region Snippet:ServiceBusReadReceivedMessage + AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes)); + ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes)); + + var client2 = new ServiceBusClient(connectionString); + ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); + await receiver2.CompleteMessageAsync(rehydratedMessage); + #endregion + + Assert.AreEqual("some message", rehydratedMessage.Body.ToString()); + } + } + + [Test] + public async Task RehydrateReceivedMessageUsingLockToken() + { + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { +#if SNIPPET + string connectionString = ""; + string queueName = ""; +#else + string connectionString = TestEnvironment.ServiceBusConnectionString; + string queueName = scope.QueueName; +#endif + + #region Snippet:ServiceBusWriteReceivedMessageLockToken + + var client1 = new ServiceBusClient(connectionString); + ServiceBusSender sender = client1.CreateSender(queueName); + + var message = new ServiceBusMessage("some message"); + await sender.SendMessageAsync(message); + + ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName); + ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync(); + ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); + #endregion + + #region Snippet:ServiceBusReadReceivedMessageLockToken + + ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray())); + + var client2 = new ServiceBusClient(connectionString); + ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); + await receiver2.CompleteMessageAsync(rehydratedMessage); + #endregion + } + } + } +} From b5362d5c882567a94a592fa68ad0f2008447f8d9 Mon Sep 17 00:00:00 2001 From: jolov Date: Wed, 25 Oct 2023 14:51:30 -0700 Subject: [PATCH 2/7] Fix --- .../samples/Sample16_CrossReceiverMessageSettlement.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md index af3ff18bf15b..44a10d055aaf 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -7,8 +7,8 @@ receiver. The correct strategy to use will depend on the specific scenario. ## Storing the Entire `ServiceBusReceivedMessage` -If it is necessary to store the entire message and then rehydrate it in another process, the following can be done -to accomplish this. First we can get the raw AMQP message bytes and lock token as shown. +If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy. +First we can get the raw AMQP message bytes and lock token as shown below: *Note: The lock token is not actually part of the AMQP message so it needs to stored separately from the AMQP bytes.* From 2112721d6be660a7093dd27454d9c5ecde063ed7 Mon Sep 17 00:00:00 2001 From: jolov Date: Wed, 25 Oct 2023 14:54:29 -0700 Subject: [PATCH 3/7] Fix --- .../Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln index f36785d5a130..0293a1e69387 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln @@ -37,6 +37,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md + samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}" From 0e7db2752cc51196e30df5db990073bbdbd371b1 Mon Sep 17 00:00:00 2001 From: jolov Date: Wed, 25 Oct 2023 15:05:09 -0700 Subject: [PATCH 4/7] fix line break --- .../samples/Sample16_CrossReceiverMessageSettlement.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md index 44a10d055aaf..f3dd98088a96 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -8,7 +8,7 @@ receiver. The correct strategy to use will depend on the specific scenario. ## Storing the Entire `ServiceBusReceivedMessage` If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy. -First we can get the raw AMQP message bytes and lock token as shown below: +First we can get the raw AMQP message bytes and lock token as shown below: *Note: The lock token is not actually part of the AMQP message so it needs to stored separately from the AMQP bytes.* @@ -55,7 +55,7 @@ ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToBy ``` -In order to rehydrate the message in another process using the lock token, we would do the following: +In order to rehydrate the message in another process using the lock token, we would do the following: *Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the `ServiceBusReceivedMessage` other than `LockToken` will have default values.* From 35c9dd2949649a5b6b776fcdcbda2779eac7e4d4 Mon Sep 17 00:00:00 2001 From: jolov Date: Wed, 25 Oct 2023 15:06:17 -0700 Subject: [PATCH 5/7] Fix sln --- .../Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln index 0293a1e69387..1fc9adf490c3 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/Azure.Messaging.ServiceBus.sln @@ -35,9 +35,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C samples\Sample11_CloudEvents.md = samples\Sample11_CloudEvents.md samples\Sample12_ManagingRules.md = samples\Sample12_ManagingRules.md samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md + samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md - samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}" From 72bb7ad936802170614ea87f0dbefcd12beb3bb5 Mon Sep 17 00:00:00 2001 From: jolov Date: Thu, 26 Oct 2023 09:10:16 -0700 Subject: [PATCH 6/7] regenerate --- .../samples/Sample16_CrossReceiverMessageSettlement.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md index f3dd98088a96..617a5f6b090e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -60,7 +60,7 @@ In order to rehydrate the message in another process using the lock token, we wo `ServiceBusReceivedMessage` other than `LockToken` will have default values.* ```C# Snippet:ServiceBusReadReceivedMessageLockToken -ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.Span)); +ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray())); var client2 = new ServiceBusClient(connectionString); ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); From dea608b93d43c3405d9651f6e82a8f4d483a410f Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Thu, 26 Oct 2023 09:11:00 -0700 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: Jesse Squire --- .../Sample16_CrossReceiverMessageSettlement.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md index 617a5f6b090e..c4fe143f8451 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample16_CrossReceiverMessageSettlement.md @@ -3,9 +3,9 @@ The message settlement APIs on the `ServiceBusReceiver` require passing in the `ServiceBusReceivedMessage`. This can be limiting for scenarios in which the message needs to be persisted and then settled across process boundaries. There are two strategies that can be used for settling a message with a different -receiver. The correct strategy to use will depend on the specific scenario. +receiver. The recommended strategy depends on the specific application scenario. -## Storing the Entire `ServiceBusReceivedMessage` +## Storing the entire `ServiceBusReceivedMessage` If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy. First we can get the raw AMQP message bytes and lock token as shown below: @@ -36,10 +36,10 @@ ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName); await receiver2.CompleteMessageAsync(rehydratedMessage); ``` -## Storing Only the Lock Token +## Storing only the lock token -If the entire message is not needed when settling the message in a different process, you can simply store off the -lock token. In the example below, we store off the lock token with the GUID bytes. You can also simply store a +If the entire message is not needed when settling the message in a different process, you can simply preserve the +lock token. In the example below, we store off the lock token using its GUID bytes. You can also simply store a string if that is easier for your scenario. ```C# Snippet:ServiceBusWriteReceivedMessageLockToken @@ -54,7 +54,6 @@ ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync( ReadOnlyMemory lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray(); ``` - In order to rehydrate the message in another process using the lock token, we would do the following: *Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the `ServiceBusReceivedMessage` other than `LockToken` will have default values.*