diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_topic_exchange_ignores_durable_outbox_policy.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_topic_exchange_ignores_durable_outbox_policy.cs new file mode 100644 index 000000000..a310cc118 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_topic_exchange_ignores_durable_outbox_policy.cs @@ -0,0 +1,68 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using RabbitMQ.Client; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.RabbitMQ.Internal; +using Wolverine.Runtime; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +public class Bug_topic_exchange_ignores_durable_outbox_policy : IDisposable +{ + private readonly IHost _host; + + public Bug_topic_exchange_ignores_durable_outbox_policy() + { + _host = Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq() + .AutoProvision(); + + opts.StubAllExternalTransports(); + + opts.UseRabbitMq().DeclareExchange("members", e => + { + e.ExchangeType = ExchangeType.Topic; + e.IsDurable = true; + }); + + opts.PublishMessagesToRabbitMqExchange("members", + _ => "member.created"); + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + }) + .Start(); + } + + [Fact] + public void topic_exchange_endpoint_should_be_durable() + { + var runtime = _host.Services.GetRequiredService(); + + var endpoint = runtime.Options.Transports + .AllEndpoints() + .OfType() + .FirstOrDefault(e => e.ExchangeName == "members"); + + endpoint.ShouldNotBeNull(); + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } + + public void Dispose() + { + _host?.Dispose(); + } +} + +public class TopicExchangeBugMessage; + +public static class TopicExchangeBugHandler +{ + public static void Handle(TopicExchangeBugMessage message) + { + // no-op + } +} diff --git a/src/Wolverine/Runtime/Routing/TopicRouting.cs b/src/Wolverine/Runtime/Routing/TopicRouting.cs index b9a7e45b3..6d712e335 100644 --- a/src/Wolverine/Runtime/Routing/TopicRouting.cs +++ b/src/Wolverine/Runtime/Routing/TopicRouting.cs @@ -9,7 +9,7 @@ namespace Wolverine.Runtime.Routing; -public class TopicRouting : IMessageRouteSource, IMessageRoute, IMessageInvoker +public class TopicRouting : IMessageRouteSource, IMessageRoute, IMessageInvoker, IEndpointSource { private readonly Func _topicSource; private readonly Endpoint _topicEndpoint; @@ -31,6 +31,11 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime public bool IsAdditive => true; + public IEnumerable ActiveEndpoints() + { + yield return _topicEndpoint; + } + public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue, WolverineRuntime runtime, string? topicName) { diff --git a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs index 5cb57097e..1a07520ba 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs @@ -1,6 +1,7 @@ using ImTools; using JasperFx.Core; using JasperFx.Core.Reflection; +using Wolverine.Configuration; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Routing; using Wolverine.Transports; @@ -30,6 +31,16 @@ public interface IMessageRouteSource #endregion +/// +/// Optional interface for IMessageRouteSource implementations to expose their +/// target endpoints, enabling endpoint policies (like UseDurableOutboxOnAllSendingEndpoints) +/// to discover and configure them. +/// +public interface IEndpointSource +{ + IEnumerable ActiveEndpoints(); +} + internal class AgentMessages : IMessageRouteSource { public IEnumerable FindRoutes(Type messageType, IWolverineRuntime runtime) diff --git a/src/Wolverine/WolverineOptions.Policies.cs b/src/Wolverine/WolverineOptions.Policies.cs index d7bebc738..6d437f5ce 100644 --- a/src/Wolverine/WolverineOptions.Policies.cs +++ b/src/Wolverine/WolverineOptions.Policies.cs @@ -153,7 +153,10 @@ void IPolicies.AllSenders(Action configure) if (!e.Subscriptions.Any()) { - return; + var isRoutingTarget = CustomRouteSources + .OfType() + .Any(rs => rs.ActiveEndpoints().Contains(e)); + if (!isRoutingTarget) return; } var configuration = new SubscriberConfiguration(e);