diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusSubscriptionListenerConfiguration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusSubscriptionListenerConfiguration.cs index c741e88d9..94e1eaedc 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusSubscriptionListenerConfiguration.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusSubscriptionListenerConfiguration.cs @@ -2,6 +2,7 @@ using Wolverine.AzureServiceBus.Internal; using Wolverine.Configuration; using Wolverine.ErrorHandling; +using Wolverine.Runtime.Interop.MassTransit; namespace Wolverine.AzureServiceBus; @@ -137,4 +138,25 @@ public AzureServiceBusSubscriptionListenerConfiguration InteropWith(IAzureServic add(e => e.EnvelopeMapper = mapper); return this; } + + /// + /// Utilize an envelope mapper that is interoperable with MassTransit + /// + /// + /// + public AzureServiceBusSubscriptionListenerConfiguration UseMassTransitInterop(Action? configure = null) + { + add(e => e.UseMassTransitInterop(configure)); + return this; + } + + /// + /// Use an envelope mapper that is interoperable with NServiceBus + /// + /// + public AzureServiceBusSubscriptionListenerConfiguration UseNServiceBusInterop() + { + add(e => e.UseNServiceBusInterop()); + return this; + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTopicSubscriberConfiguration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTopicSubscriberConfiguration.cs index 9743eaccb..9db0c079a 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTopicSubscriberConfiguration.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTopicSubscriberConfiguration.cs @@ -1,6 +1,7 @@ using Azure.Messaging.ServiceBus.Administration; using Wolverine.AzureServiceBus.Internal; using Wolverine.Configuration; +using Wolverine.Runtime.Interop.MassTransit; namespace Wolverine.AzureServiceBus; @@ -34,4 +35,24 @@ public AzureServiceBusTopicSubscriberConfiguration InteropWith(IAzureServiceBusE add(e => e.EnvelopeMapper = mapper); return this; } + + /// + /// Use envelope mapping that is interoperable with MassTransit + /// + /// + public AzureServiceBusTopicSubscriberConfiguration UseMassTransitInterop() + { + add(e => e.UseMassTransitInterop()); + return this; + } + + /// + /// Use envelope mapping that is interoperable with NServiceBus + /// + /// + public AzureServiceBusTopicSubscriberConfiguration UseNServiceBusInterop() + { + add(e => e.UseNServiceBusInterop()); + return this; + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs index fa7ad3c75..90b650dd4 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs @@ -1,16 +1,22 @@ using System.Diagnostics; +using System.Text; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using JasperFx.Core; +using JasperFx.Core.Reflection; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using Wolverine.Configuration; using Wolverine.Runtime; +using Wolverine.Runtime.Interop.MassTransit; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Transports.Sending; +using Wolverine.Util; namespace Wolverine.AzureServiceBus.Internal; -public class AzureServiceBusSubscription : AzureServiceBusEndpoint, IBrokerQueue +public class AzureServiceBusSubscription : AzureServiceBusEndpoint, IBrokerQueue, IMassTransitInteropEndpoint { private bool _hasInitialized; @@ -222,4 +228,76 @@ internal async ValueTask InitializeAsync(ServiceBusAdministrationClient client, await PurgeAsync(logger); } } + + internal void UseNServiceBusInterop() + { + DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings()); + customizeMapping((m, _) => + { + m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId"); + m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent"); + m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId"); + + var replyAddress = new Lazy(() => + { + var replyEndpoint = Parent.ReplyEndpoint() as AzureServiceBusQueue; + return replyEndpoint?.QueueName ?? string.Empty; + }); + + void WriteReplyToAddress(Envelope e, ServiceBusMessage props) + { + props.ApplicationProperties["NServiceBus.ReplyToAddress"] = replyAddress.Value; + } + + void ReadReplyUri(Envelope e, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + if (serviceBusReceivedMessage.ApplicationProperties.TryGetValue("NServiceBus.ReplyToAddress", + out var raw)) + { + var queueName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!; + e.ReplyUri = new Uri($"{Parent.Protocol}://queue/{queueName}"); + } + } + + m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress); + + m.MapProperty(x => x.MessageType!, (e, msg) => + { + if (msg.ApplicationProperties.TryGetValue("NServiceBus.EnclosedMessageTypes", out var raw)) + { + var typeName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!; + if (typeName.IsNotEmpty()) + { + var messageType = Type.GetType(typeName); + e.MessageType = messageType!.ToMessageTypeName(); + } + } + }, + (e, msg) => + { + msg.ApplicationProperties["NServiceBus.EnclosedMessageTypes"] = e.Message!.GetType().ToMessageTypeName(); + }); + }); + } + + Uri? IMassTransitInteropEndpoint.MassTransitUri() + { + return new Uri($"sb://{Parent.HostName}/topic/{Topic.TopicName}/{SubscriptionName}"); + } + + Uri? IMassTransitInteropEndpoint.MassTransitReplyUri() + { + return Parent.ReplyEndpoint()!.As().MassTransitUri(); + } + + Uri? IMassTransitInteropEndpoint.TranslateMassTransitToWolverineUri(Uri uri) + { + var lastSegment = uri.Segments.Last(); + return Parent.Queues[lastSegment].Uri; + } + + internal void UseMassTransitInterop(Action? configure = null) + { + customizeMapping((m, _) => m.InteropWithMassTransit(configure)); + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs index 7c565f668..8d458f9a3 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs @@ -1,15 +1,21 @@ +using System.Text; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; -using JasperFx.CodeGeneration; +using JasperFx.Core; +using JasperFx.Core.Reflection; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using Wolverine.Configuration; using Wolverine.Runtime; +using Wolverine.Runtime.Interop.MassTransit; +using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Transports.Sending; +using Wolverine.Util; namespace Wolverine.AzureServiceBus.Internal; -public class AzureServiceBusTopic : AzureServiceBusEndpoint +public class AzureServiceBusTopic : AzureServiceBusEndpoint, IMassTransitInteropEndpoint { private bool _hasInitialized; @@ -118,4 +124,76 @@ public AzureServiceBusSubscription FindOrCreateSubscription(string subscriptionN } public override bool IsPartitioned { get => Options.EnablePartitioning; } + + internal void UseNServiceBusInterop() + { + DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings()); + customizeMapping((m, _) => + { + m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId"); + m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent"); + m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId"); + + var replyAddress = new Lazy(() => + { + var replyEndpoint = Parent.ReplyEndpoint() as AzureServiceBusQueue; + return replyEndpoint?.QueueName ?? string.Empty; + }); + + void WriteReplyToAddress(Envelope e, ServiceBusMessage props) + { + props.ApplicationProperties["NServiceBus.ReplyToAddress"] = replyAddress.Value; + } + + void ReadReplyUri(Envelope e, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + if (serviceBusReceivedMessage.ApplicationProperties.TryGetValue("NServiceBus.ReplyToAddress", + out var raw)) + { + var queueName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!; + e.ReplyUri = new Uri($"{Parent.Protocol}://queue/{queueName}"); + } + } + + m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress); + + m.MapProperty(x => x.MessageType!, (e, msg) => + { + if (msg.ApplicationProperties.TryGetValue("NServiceBus.EnclosedMessageTypes", out var raw)) + { + var typeName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!; + if (typeName.IsNotEmpty()) + { + var messageType = Type.GetType(typeName); + e.MessageType = messageType!.ToMessageTypeName(); + } + } + }, + (e, msg) => + { + msg.ApplicationProperties["NServiceBus.EnclosedMessageTypes"] = e.Message!.GetType().ToMessageTypeName(); + }); + }); + } + + Uri? IMassTransitInteropEndpoint.MassTransitUri() + { + return new Uri($"sb://{Parent.HostName}/topic/{TopicName}"); + } + + Uri? IMassTransitInteropEndpoint.MassTransitReplyUri() + { + return Parent.ReplyEndpoint()!.As().MassTransitUri(); + } + + Uri? IMassTransitInteropEndpoint.TranslateMassTransitToWolverineUri(Uri uri) + { + var lastSegment = uri.Segments.Last(); + return Parent.Queues[lastSegment].Uri; + } + + internal void UseMassTransitInterop(Action? configure = null) + { + customizeMapping((m, _) => m.InteropWithMassTransit(configure)); + } } \ No newline at end of file