Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Wolverine.AzureServiceBus.Internal;
using Wolverine.Configuration;
using Wolverine.ErrorHandling;
using Wolverine.Runtime.Interop.MassTransit;

namespace Wolverine.AzureServiceBus;

Expand Down Expand Up @@ -137,4 +138,25 @@ public AzureServiceBusSubscriptionListenerConfiguration InteropWith(IAzureServic
add(e => e.EnvelopeMapper = mapper);
return this;
}

/// <summary>
/// Utilize an envelope mapper that is interoperable with MassTransit
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
public AzureServiceBusSubscriptionListenerConfiguration UseMassTransitInterop(Action<IMassTransitInterop>? configure = null)
{
add(e => e.UseMassTransitInterop(configure));
return this;
}

/// <summary>
/// Use an envelope mapper that is interoperable with NServiceBus
/// </summary>
/// <returns></returns>
public AzureServiceBusSubscriptionListenerConfiguration UseNServiceBusInterop()
{
add(e => e.UseNServiceBusInterop());
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Azure.Messaging.ServiceBus.Administration;
using Wolverine.AzureServiceBus.Internal;
using Wolverine.Configuration;
using Wolverine.Runtime.Interop.MassTransit;

namespace Wolverine.AzureServiceBus;

Expand Down Expand Up @@ -34,4 +35,24 @@ public AzureServiceBusTopicSubscriberConfiguration InteropWith(IAzureServiceBusE
add(e => e.EnvelopeMapper = mapper);
return this;
}

/// <summary>
/// Use envelope mapping that is interoperable with MassTransit
/// </summary>
/// <returns></returns>
public AzureServiceBusTopicSubscriberConfiguration UseMassTransitInterop()
{
add(e => e.UseMassTransitInterop());
return this;
}

/// <summary>
/// Use envelope mapping that is interoperable with NServiceBus
/// </summary>
/// <returns></returns>
public AzureServiceBusTopicSubscriberConfiguration UseNServiceBusInterop()
{
add(e => e.UseNServiceBusInterop());
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
using System.Diagnostics;
using System.Text;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.Interop.MassTransit;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Wolverine.Util;

namespace Wolverine.AzureServiceBus.Internal;

public class AzureServiceBusSubscription : AzureServiceBusEndpoint, IBrokerQueue
public class AzureServiceBusSubscription : AzureServiceBusEndpoint, IBrokerQueue, IMassTransitInteropEndpoint
{
private bool _hasInitialized;

Expand Down Expand Up @@ -222,4 +228,76 @@ internal async ValueTask InitializeAsync(ServiceBusAdministrationClient client,
await PurgeAsync(logger);
}
}

internal void UseNServiceBusInterop()
{
DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings());
customizeMapping((m, _) =>
{
m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId");
m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent");
m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId");

var replyAddress = new Lazy<string>(() =>
{
var replyEndpoint = Parent.ReplyEndpoint() as AzureServiceBusQueue;
return replyEndpoint?.QueueName ?? string.Empty;
});

void WriteReplyToAddress(Envelope e, ServiceBusMessage props)
{
props.ApplicationProperties["NServiceBus.ReplyToAddress"] = replyAddress.Value;
}

void ReadReplyUri(Envelope e, ServiceBusReceivedMessage serviceBusReceivedMessage)
{
if (serviceBusReceivedMessage.ApplicationProperties.TryGetValue("NServiceBus.ReplyToAddress",
out var raw))
{
var queueName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!;
e.ReplyUri = new Uri($"{Parent.Protocol}://queue/{queueName}");
}
}

m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress);

m.MapProperty(x => x.MessageType!, (e, msg) =>
{
if (msg.ApplicationProperties.TryGetValue("NServiceBus.EnclosedMessageTypes", out var raw))
{
var typeName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!;
if (typeName.IsNotEmpty())
{
var messageType = Type.GetType(typeName);
e.MessageType = messageType!.ToMessageTypeName();
}
}
},
(e, msg) =>
{
msg.ApplicationProperties["NServiceBus.EnclosedMessageTypes"] = e.Message!.GetType().ToMessageTypeName();
});
});
}

Uri? IMassTransitInteropEndpoint.MassTransitUri()
{
return new Uri($"sb://{Parent.HostName}/topic/{Topic.TopicName}/{SubscriptionName}");
}

Uri? IMassTransitInteropEndpoint.MassTransitReplyUri()
{
return Parent.ReplyEndpoint()!.As<IMassTransitInteropEndpoint>().MassTransitUri();
}

Uri? IMassTransitInteropEndpoint.TranslateMassTransitToWolverineUri(Uri uri)
{
var lastSegment = uri.Segments.Last();
return Parent.Queues[lastSegment].Uri;
}

internal void UseMassTransitInterop(Action<IMassTransitInterop>? configure = null)
{
customizeMapping((m, _) => m.InteropWithMassTransit(configure));
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
using System.Text;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using JasperFx.CodeGeneration;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.Interop.MassTransit;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Wolverine.Util;

namespace Wolverine.AzureServiceBus.Internal;

public class AzureServiceBusTopic : AzureServiceBusEndpoint
public class AzureServiceBusTopic : AzureServiceBusEndpoint, IMassTransitInteropEndpoint
{
private bool _hasInitialized;

Expand Down Expand Up @@ -118,4 +124,76 @@ public AzureServiceBusSubscription FindOrCreateSubscription(string subscriptionN
}

public override bool IsPartitioned { get => Options.EnablePartitioning; }

internal void UseNServiceBusInterop()
{
DefaultSerializer = new NewtonsoftSerializer(new JsonSerializerSettings());
customizeMapping((m, _) =>
{
m.MapPropertyToHeader(x => x.ConversationId, "NServiceBus.ConversationId");
m.MapPropertyToHeader(x => x.SentAt, "NServiceBus.TimeSent");
m.MapPropertyToHeader(x => x.CorrelationId!, "NServiceBus.CorrelationId");

var replyAddress = new Lazy<string>(() =>
{
var replyEndpoint = Parent.ReplyEndpoint() as AzureServiceBusQueue;
return replyEndpoint?.QueueName ?? string.Empty;
});

void WriteReplyToAddress(Envelope e, ServiceBusMessage props)
{
props.ApplicationProperties["NServiceBus.ReplyToAddress"] = replyAddress.Value;
}

void ReadReplyUri(Envelope e, ServiceBusReceivedMessage serviceBusReceivedMessage)
{
if (serviceBusReceivedMessage.ApplicationProperties.TryGetValue("NServiceBus.ReplyToAddress",
out var raw))
{
var queueName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!;
e.ReplyUri = new Uri($"{Parent.Protocol}://queue/{queueName}");
}
}

m.MapProperty(x => x.ReplyUri!, ReadReplyUri, WriteReplyToAddress);

m.MapProperty(x => x.MessageType!, (e, msg) =>
{
if (msg.ApplicationProperties.TryGetValue("NServiceBus.EnclosedMessageTypes", out var raw))
{
var typeName = (raw is byte[] b ? Encoding.Default.GetString(b) : raw.ToString())!;
if (typeName.IsNotEmpty())
{
var messageType = Type.GetType(typeName);
e.MessageType = messageType!.ToMessageTypeName();
}
}
},
(e, msg) =>
{
msg.ApplicationProperties["NServiceBus.EnclosedMessageTypes"] = e.Message!.GetType().ToMessageTypeName();
});
});
}

Uri? IMassTransitInteropEndpoint.MassTransitUri()
{
return new Uri($"sb://{Parent.HostName}/topic/{TopicName}");
}

Uri? IMassTransitInteropEndpoint.MassTransitReplyUri()
{
return Parent.ReplyEndpoint()!.As<IMassTransitInteropEndpoint>().MassTransitUri();
}

Uri? IMassTransitInteropEndpoint.TranslateMassTransitToWolverineUri(Uri uri)
{
var lastSegment = uri.Segments.Last();
return Parent.Queues[lastSegment].Uri;
}

internal void UseMassTransitInterop(Action<IMassTransitInterop>? configure = null)
{
customizeMapping((m, _) => m.InteropWithMassTransit(configure));
}
}
Loading