Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using RabbitMQ.Client;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Runtime;
using Xunit;

namespace Wolverine.RabbitMQ.Tests.Bugs;

public class Bug_topic_exchange_ignores_durable_outbox_policy : IDisposable
{
private readonly IHost _host;

public Bug_topic_exchange_ignores_durable_outbox_policy()
{
_host = Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
.AutoProvision();

opts.StubAllExternalTransports();

opts.UseRabbitMq().DeclareExchange("members", e =>
{
e.ExchangeType = ExchangeType.Topic;
e.IsDurable = true;
});

opts.PublishMessagesToRabbitMqExchange<TopicExchangeBugMessage>("members",
_ => "member.created");

opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
})
.Start();
}

[Fact]
public void topic_exchange_endpoint_should_be_durable()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var endpoint = runtime.Options.Transports
.AllEndpoints()
.OfType<RabbitMqExchange>()
.FirstOrDefault(e => e.ExchangeName == "members");

endpoint.ShouldNotBeNull();
endpoint.Mode.ShouldBe(EndpointMode.Durable);
}

public void Dispose()
{
_host?.Dispose();
}
}

public class TopicExchangeBugMessage;

public static class TopicExchangeBugHandler
{
public static void Handle(TopicExchangeBugMessage message)
{
// no-op
}
}
7 changes: 6 additions & 1 deletion src/Wolverine/Runtime/Routing/TopicRouting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Wolverine.Runtime.Routing;

public class TopicRouting<T> : IMessageRouteSource, IMessageRoute, IMessageInvoker
public class TopicRouting<T> : IMessageRouteSource, IMessageRoute, IMessageInvoker, IEndpointSource
{
private readonly Func<T, string> _topicSource;
private readonly Endpoint _topicEndpoint;
Expand All @@ -31,6 +31,11 @@ public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime

public bool IsAdditive => true;

public IEnumerable<Endpoint> ActiveEndpoints()
{
yield return _topicEndpoint;
}

public Envelope CreateForSending(object message, DeliveryOptions? options, ISendingAgent localDurableQueue,
WolverineRuntime runtime, string? topicName)
{
Expand Down
11 changes: 11 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.Routing.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using ImTools;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Wolverine.Configuration;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
Expand Down Expand Up @@ -30,6 +31,16 @@ public interface IMessageRouteSource

#endregion

/// <summary>
/// Optional interface for IMessageRouteSource implementations to expose their
/// target endpoints, enabling endpoint policies (like UseDurableOutboxOnAllSendingEndpoints)
/// to discover and configure them.
/// </summary>
public interface IEndpointSource
{
IEnumerable<Endpoint> ActiveEndpoints();
}

internal class AgentMessages : IMessageRouteSource
{
public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime runtime)
Expand Down
5 changes: 4 additions & 1 deletion src/Wolverine/WolverineOptions.Policies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ void IPolicies.AllSenders(Action<ISubscriberConfiguration> configure)

if (!e.Subscriptions.Any())
{
return;
var isRoutingTarget = CustomRouteSources
.OfType<IEndpointSource>()
.Any(rs => rs.ActiveEndpoints().Contains(e));
if (!isRoutingTarget) return;
}

var configuration = new SubscriberConfiguration(e);
Expand Down
Loading