Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public class AmqpAnnotatedMessage
/// <param name="message">The message to copy.</param>
public AmqpAnnotatedMessage(AmqpAnnotatedMessage message)
{
Body = message.Body;
var data = message.Body as AmqpDataBody;
Body = new AmqpDataBody(data!.Data);
ApplicationProperties = new Dictionary<string, object>(message.ApplicationProperties);
Properties = new AmqpMessageProperties(message.Properties);
MessageAnnotations = new Dictionary<string, object>(message.MessageAnnotations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,18 @@ internal class AmqpMessageConstants
internal const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
internal const string UriName = AmqpConstants.Vendor + ":uri";
internal const string DateTimeOffsetName = AmqpConstants.Vendor + ":datetime-offset";
/// <summary>
/// Property key representing deadletter reason, when a message is received from a deadletter subqueue of an entity.
/// This key and the associated values are stored in the <see cref="ServiceBusReceivedMessage.ApplicationProperties"/> dictionary
/// for dead lettered messages.
/// </summary>
internal const string DeadLetterReasonHeader = "DeadLetterReason";

/// <summary>
/// Property key representing detailed error description, when a message is received from a deadletter subqueue of an entity.
/// This key and the associated values are stored in the <see cref="ServiceBusReceivedMessage.ApplicationProperties"/> dictionary
/// for dead lettered messages.
/// </summary>
internal const string DeadLetterErrorDescriptionHeader = "DeadLetterErrorDescription";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -654,12 +654,12 @@ private Rejected GetRejectedOutcome(
rejected = new Rejected { Error = new Error { Condition = AmqpClientConstants.DeadLetterName, Info = new Fields() } };
if (deadLetterReason != null)
{
rejected.Error.Info.Add(ServiceBusReceivedMessage.DeadLetterReasonHeader, deadLetterReason);
rejected.Error.Info.Add(AmqpMessageConstants.DeadLetterReasonHeader, deadLetterReason);
}

if (deadLetterErrorDescription != null)
{
rejected.Error.Info.Add(ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader, deadLetterErrorDescription);
rejected.Error.Info.Add(AmqpMessageConstants.DeadLetterErrorDescriptionHeader, deadLetterErrorDescription);
}

if (propertiesToModify != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage)
{
Argument.AssertNotNull(receivedMessage, nameof(receivedMessage));
AmqpMessage = new AmqpAnnotatedMessage(receivedMessage.AmqpMessage);
AmqpMessage.Header.DeliveryCount = null;
AmqpMessage.MessageAnnotations.Remove(AmqpMessageConstants.LockedUntilName);
AmqpMessage.MessageAnnotations.Remove(AmqpMessageConstants.SequenceNumberName);
AmqpMessage.MessageAnnotations.Remove(AmqpMessageConstants.DeadLetterSourceName);
AmqpMessage.MessageAnnotations.Remove(AmqpMessageConstants.EnqueueSequenceNumberName);
AmqpMessage.MessageAnnotations.Remove(AmqpMessageConstants.EnqueuedTimeUtcName);
ApplicationProperties.Remove(AmqpMessageConstants.DeadLetterReasonHeader);
ApplicationProperties.Remove(AmqpMessageConstants.DeadLetterErrorDescriptionHeader);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,6 @@ public BinaryData Body
/// </remarks>
public IReadOnlyDictionary<string, object> ApplicationProperties => new ReadOnlyDictionary<string, object>(AmqpMessage.ApplicationProperties);

/// <summary>
/// User property key representing deadletter reason, when a message is received from a deadletter subqueue of an entity.
/// </summary>
internal const string DeadLetterReasonHeader = "DeadLetterReason";

/// <summary>
/// User property key representing detailed error description, when a message is received from a deadletter subqueue of an entity.
/// </summary>
internal const string DeadLetterErrorDescriptionHeader = "DeadLetterErrorDescription";

/// <summary>
/// Gets the lock token for the current message.
/// </summary>
Expand Down Expand Up @@ -396,7 +386,7 @@ public string DeadLetterReason
{
get
{
if (ApplicationProperties.TryGetValue(DeadLetterReasonHeader, out object reason))
if (ApplicationProperties.TryGetValue(AmqpMessageConstants.DeadLetterReasonHeader, out object reason))
{
return reason as string;
}
Expand All @@ -411,7 +401,7 @@ public string DeadLetterErrorDescription
{
get
{
if (ApplicationProperties.TryGetValue(DeadLetterErrorDescriptionHeader, out object description))
if (ApplicationProperties.TryGetValue(AmqpMessageConstants.DeadLetterErrorDescriptionHeader, out object description))
{
return description as string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Azure.Core.Amqp;
using Azure.Core.Serialization;
using Azure.Messaging.ServiceBus.Amqp;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Message
Expand Down Expand Up @@ -121,6 +122,18 @@ public async Task CreateFromReceivedMessageCopiesProperties()
var received = await receiver.ReceiveMessageAsync();
AssertMessagesEqual(msg, received);
var toSend = new ServiceBusMessage(received);

// verify that all system set properties have been cleared out
Assert.IsNull(toSend.AmqpMessage.Header.DeliveryCount);
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.LockedUntilName));
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.SequenceNumberName));
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.DeadLetterSourceName));
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueueSequenceNumberName));
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.EnqueuedTimeUtcName));
Assert.IsFalse(toSend.AmqpMessage.MessageAnnotations.ContainsKey(AmqpMessageConstants.DeadLetterSourceName));
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterReasonHeader));
Assert.IsFalse(toSend.ApplicationProperties.ContainsKey(AmqpMessageConstants.DeadLetterErrorDescriptionHeader));

AssertMessagesEqual(toSend, received);

void AssertMessagesEqual(ServiceBusMessage sentMessage, ServiceBusReceivedMessage received)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Plugins;
using Moq;
using NUnit.Framework;
Expand Down Expand Up @@ -44,8 +45,8 @@ public override ValueTask AfterMessageReceiveAsync(ServiceBusReceivedMessage mes
SetContentType(message, "contentType");
SetCorrelationId(message, "correlationId");
SetUserProperty(message, "propertyKey", "propertyValue");
SetUserProperty(message, ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader, "deadLetterDescription");
SetUserProperty(message, ServiceBusReceivedMessage.DeadLetterReasonHeader, "deadLetterReason");
SetUserProperty(message, AmqpMessageConstants.DeadLetterErrorDescriptionHeader, "deadLetterDescription");
SetUserProperty(message, AmqpMessageConstants.DeadLetterReasonHeader, "deadLetterReason");
SetLabel(message, "label");
SetMessageId(message, "messageId");
SetPartitionKey(message, "partitionKey");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Amqp;
using NUnit.Framework;
using NUnit.Framework.Internal;

Expand Down Expand Up @@ -486,8 +487,8 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession)
Assert.AreEqual(messageEnum.Current.SessionId, item.SessionId);
var props = new Dictionary<string, object>();
// these should be ignored by DeadLetter property getters as they are not strings
props[ServiceBusReceivedMessage.DeadLetterReasonHeader] = DateTime.UtcNow;
props[ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader] = DateTime.UtcNow;
props[AmqpMessageConstants.DeadLetterReasonHeader] = DateTime.UtcNow;
props[AmqpMessageConstants.DeadLetterErrorDescriptionHeader] = DateTime.UtcNow;

await receiver.DeadLetterMessageAsync(item.LockToken, props);
}
Expand All @@ -514,8 +515,8 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession)
Assert.AreEqual(messageEnum.Current.SessionId, msg.SessionId);
Assert.IsNull(msg.DeadLetterErrorDescription);
Assert.IsNull(msg.DeadLetterReason);
Assert.IsNotNull(msg.ApplicationProperties[ServiceBusReceivedMessage.DeadLetterReasonHeader]);
Assert.IsNotNull(msg.ApplicationProperties[ServiceBusReceivedMessage.DeadLetterErrorDescriptionHeader]);
Assert.IsNotNull(msg.ApplicationProperties[AmqpMessageConstants.DeadLetterReasonHeader]);
Assert.IsNotNull(msg.ApplicationProperties[AmqpMessageConstants.DeadLetterErrorDescriptionHeader]);
await deadLetterReceiver.CompleteMessageAsync(msg.LockToken);
}
}
Expand Down