diff --git a/build/CITargets.cs b/build/CITargets.cs index d4aae4a07..1818ff568 100644 --- a/build/CITargets.cs +++ b/build/CITargets.cs @@ -313,7 +313,10 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat var efCoreMultiTenancy = RootDirectory / "src" / "Persistence" / "EfCoreTests.MultiTenancy" / "EfCoreTests.MultiTenancy.csproj"; BuildTestProjects(efCoreTests, efCoreMultiTenancy); - StartDockerServices("postgresql", "sqlserver"); + // RabbitMQ is required by Bug_2588_ef_core_durable_outbox_with_conventional_routing, + // which exercises EF Core + RabbitMQ conventional routing + durable outbox policy. + // See GH-2588. + StartDockerServices("postgresql", "sqlserver", "rabbitmq"); RunSingleProjectOneClassAtATime(efCoreTests); RunSingleProjectOneClassAtATime(efCoreMultiTenancy); diff --git a/src/Persistence/EfCoreTests/Bugs/Bug_2588_ef_core_durable_outbox_with_conventional_routing.cs b/src/Persistence/EfCoreTests/Bugs/Bug_2588_ef_core_durable_outbox_with_conventional_routing.cs new file mode 100644 index 000000000..5c87ab0a2 --- /dev/null +++ b/src/Persistence/EfCoreTests/Bugs/Bug_2588_ef_core_durable_outbox_with_conventional_routing.cs @@ -0,0 +1,140 @@ +using IntegrationTests; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using SharedPersistenceModels.Items; +using Shouldly; +using Wolverine; +using Wolverine.Configuration; +using Wolverine.EntityFrameworkCore; +using Wolverine.Postgresql; +using Wolverine.RabbitMQ; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Xunit; + +namespace EfCoreTests.Bugs; + +/// +/// Reproducer for https://github.com/JasperFx/wolverine/issues/2588. +/// +/// The reporter's setup mirrors a typical Wolverine app: EF Core DbContext +/// (manual envelope mapping), Postgres message persistence, RabbitMQ with +/// conventional routing, and Policies.UseDurableOutboxOnAllSendingEndpoints +/// (plus AutoApplyTransactions / UseDurableInboxOnAllListeners). Their HTTP +/// endpoint returns a tuple `(Response, CascadedEvent)`. They observe at +/// runtime that the cascading event bypasses the EF transaction / outbox — +/// `Mode == Inline` (the actual reporter saw `InlineSendingAgent`; this +/// repro shows the equivalent default `BufferedInMemory`, both meaning +/// "policy never applied"). +/// +/// The pre-existing Bug_2304 test exercises a similar policy expectation +/// against Marten + RabbitMQ and passes — but only because it never +/// registers the message handler with `IncludeType`. This reproducer +/// includes the handler, mirroring the reporter's real app. +/// +/// The test does NOT exchange messages with RabbitMQ — it just inspects +/// the resolved sender endpoint Mode after `RoutingFor` is called. +/// +[Collection("postgresql")] +public class Bug_2588_ef_core_durable_outbox_with_conventional_routing : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Faithful to reporter: EF Core with manual envelope mapping. + opts.Services.AddDbContext(o => + o.UseNpgsql(Servers.PostgresConnectionString)); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine"); + opts.UseEntityFrameworkCoreTransactions(); + + opts.UseRabbitMq() + .UseConventionalRouting() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.Policies.AutoApplyTransactions(); + opts.Policies.UseDurableInboxOnAllListeners(); + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.Durability.Mode = DurabilityMode.Solo; + + // Critical to reproduce: register the handler so conventional + // routing's DiscoverListeners pre-creates the exchange via + // ApplyListenerRoutingDefaults. That early creation makes + // BrokerTransport.InitializeAsync compile the exchange BEFORE + // any DiscoverSenders has added the subscription, so AllSenders + // policies (UseDurableOutboxOnAllSendingEndpoints) never apply + // — the _hasCompiled flag short-circuits a re-application + // when DiscoverSenders later adds the subscription on first + // publish. + opts.Discovery.DisableConventionalDiscovery().IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588Message)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + // Reporter's symptom in unit-test form. With + // UseDurableOutboxOnAllSendingEndpoints() the conventionally-routed + // RabbitMQ exchange should have EndpointMode.Durable so cascading + // messages participate in the outbox transaction. On main with the + // handler registered, this comes back as BufferedInMemory because + // the exchange was Compile()'d during BrokerTransport.InitializeAsync + // (before DiscoverSenders ran) and the AllSenders policy gated on + // `e.Subscriptions.Any()` short-circuited. + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } +} + +public record Bug2588Message(Guid Id); + +public class Bug2588Handler +{ + // Triggers conventional listener creation for Bug2588Message at startup, + // which in turn calls RabbitMqMessageRoutingConvention.ApplyListenerRoutingDefaults + // and pre-creates the sender exchange before AllSenders policies apply. + public static void Handle(Bug2588Message _) { } +} + +public class Bug2588DbContext(DbContextOptions options) : DbContext(options) +{ + public DbSet Items => Set(); + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.MapWolverineEnvelopeStorage("wolverine"); + + modelBuilder.Entity(map => + { + map.ToTable("bug_2588_items"); + map.HasKey(x => x.Id); + map.Property(x => x.Id).HasColumnName("id"); + map.Property(x => x.Name).HasColumnName("name"); + }); + + base.OnModelCreating(modelBuilder); + } +} diff --git a/src/Persistence/EfCoreTests/EfCoreTests.csproj b/src/Persistence/EfCoreTests/EfCoreTests.csproj index c30d65840..d24c909af 100644 --- a/src/Persistence/EfCoreTests/EfCoreTests.csproj +++ b/src/Persistence/EfCoreTests/EfCoreTests.csproj @@ -42,6 +42,13 @@ + + diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs new file mode 100644 index 000000000..b982b3b9e --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs @@ -0,0 +1,76 @@ +using IntegrationTests; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.Configuration; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; + +namespace Wolverine.AmazonSqs.Tests.Bugs; + +/// +/// Locks down GH-2588 for the Amazon SQS conventional routing. Without the +/// structural fix, registering a handler triggers listener discovery which +/// pre-creates the corresponding queue; that endpoint is then compiled during +/// BrokerTransport.InitializeAsync BEFORE any sender subscription is +/// registered, so AllSenders policies like +/// UseDurableOutboxOnAllSendingEndpoints short-circuit on +/// endpoint.Subscriptions.Any() == false and never upgrade the endpoint +/// mode to Durable. +/// +public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IDisposable +{ + private readonly IHost _host; + + public Bug_2588_durable_outbox_with_handler_and_conventional_routing() + { + _host = WolverineHost.For(opts => + { + opts.UseAmazonSqsTransportLocally() + .UseConventionalRouting() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString); + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.DisableConventionalDiscovery().IncludeType(); + }); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588SqsMessage)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } + + public void Dispose() + { + _host?.Dispose(); + } +} + +public class Bug2588SqsMessage; + +public class Bug2588SqsHandler +{ + public static void Handle(Bug2588SqsMessage message) + { + // no-op + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/ConventionalRouting/conventional_listener_discovery.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/ConventionalRouting/conventional_listener_discovery.cs index 59e2bcbcb..a67da363a 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/ConventionalRouting/conventional_listener_discovery.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/ConventionalRouting/conventional_listener_discovery.cs @@ -86,7 +86,12 @@ public void disable_listener_by_lambda() var uri = "sqs://routed".ToUri(); var endpoint = theRuntime.Endpoints.EndpointFor(uri); - endpoint.ShouldBeNull(); + + // An endpoint may exist at this URI as a SENDER (since a handler is + // registered for RoutedMessage and the framework eagerly pre-registers + // sender configuration for handled message types — see GH-2588), but + // the listener side must NOT have been created. + if (endpoint != null) endpoint.IsListener.ShouldBeFalse(); theRuntime.Endpoints.ActiveListeners().Any(x => x.Uri == uri) .ShouldBeFalse(); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs new file mode 100644 index 000000000..defd1ae8f --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs @@ -0,0 +1,133 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests.Bugs; + +/// +/// Locks down GH-2588 for the Azure Service Bus queue-based conventional routing. +/// Without the structural fix, registering a handler triggers listener discovery +/// which pre-creates the corresponding queue/topic; that endpoint is then compiled +/// during BrokerTransport.InitializeAsync BEFORE any sender subscription is +/// registered, so AllSenders policies like UseDurableOutboxOnAllSendingEndpoints +/// short-circuit on endpoint.Subscriptions.Any() == false and never upgrade +/// the endpoint mode to Durable. +/// +[Trait("Category", "Flaky")] +public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .UseConventionalRouting() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.DisableConventionalDiscovery().IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + if (_host != null) await _host.StopAsync(); + _host?.Dispose(); + await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588AsbMessage)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } +} + +/// +/// Companion to +/// but exercising the topic/subscription broadcasting convention rather than the +/// queue-based one. Both inherit from MessageRoutingConvention<,,,> +/// and share the same fix path. +/// +[Trait("Category", "Flaky")] +public class Bug_2588_durable_outbox_with_handler_and_topic_broadcasting_routing : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .UseTopicAndSubscriptionConventionalRouting(x => + { + // Keep names short — Azure Service Bus has a 50-char limit + x.SubscriptionNameForListener(t => t.Name.ToLowerInvariant()); + x.TopicNameForListener(t => t.Name.ToLowerInvariant()); + x.TopicNameForSender(t => t.Name.ToLowerInvariant()); + }) + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.DisableConventionalDiscovery().IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + if (_host != null) await _host.StopAsync(); + _host?.Dispose(); + await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588AsbMessage)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } +} + +public class Bug2588AsbMessage; + +public class Bug2588AsbHandler +{ + public static void Handle(Bug2588AsbMessage message) + { + // no-op + } +} diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/conventional_listener_discovery.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/conventional_listener_discovery.cs index 41364c498..9471331fb 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/conventional_listener_discovery.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/conventional_listener_discovery.cs @@ -87,7 +87,12 @@ public void disable_listener_by_lambda() var uri = "sqs://routed".ToUri(); var endpoint = theRuntime.Endpoints.EndpointFor(uri); - endpoint.ShouldBeNull(); + + // An endpoint may exist at this URI as a SENDER (since a handler is + // registered for RoutedMessage and the framework eagerly pre-registers + // sender configuration for handled message types — see GH-2588), but + // the listener side must NOT have been created. + if (endpoint != null) endpoint.IsListener.ShouldBeFalse(); theRuntime.Endpoints.ActiveListeners().Any(x => x.Uri == uri) .ShouldBeFalse(); diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs new file mode 100644 index 000000000..986024137 --- /dev/null +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs @@ -0,0 +1,81 @@ +using IntegrationTests; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.Configuration; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Xunit; + +namespace Wolverine.Pubsub.Tests.Bugs; + +/// +/// Locks down GH-2588 for the GCP Pub/Sub conventional routing. Without the +/// structural fix, registering a handler triggers listener discovery which +/// pre-creates the corresponding topic; that endpoint is then compiled during +/// BrokerTransport.InitializeAsync BEFORE any sender subscription is +/// registered, so AllSenders policies like +/// UseDurableOutboxOnAllSendingEndpoints short-circuit on +/// endpoint.Subscriptions.Any() == false and never upgrade the endpoint +/// mode to Durable. +/// +public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IAsyncLifetime +{ + private IHost _host = default!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UsePubsubTesting() + .AutoProvision() + .AutoPurgeOnStartup() + .EnableDeadLettering() + .EnableSystemEndpoints() + .UseConventionalRouting(); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString); + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.DisableConventionalDiscovery().IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + if (_host != null) await _host.StopAsync(); + _host?.Dispose(); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588PubsubMessage)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } +} + +public class Bug2588PubsubMessage; + +public class Bug2588PubsubHandler +{ + public static void Handle(Bug2588PubsubMessage message) + { + // no-op + } +} diff --git a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/conventional_listener_discovery.cs b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/conventional_listener_discovery.cs index 70568ae30..df0b76813 100644 --- a/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/conventional_listener_discovery.cs +++ b/src/Transports/GCP/Wolverine.Pubsub.Tests/ConventionalRouting/conventional_listener_discovery.cs @@ -90,7 +90,11 @@ public void disable_listener_by_lambda() var uri = $"{PubsubTransport.ProtocolName}://wolverine/routed".ToUri(); var endpoint = theRuntime.Endpoints.EndpointFor(uri); - endpoint.ShouldBeNull(); + // An endpoint may exist at this URI as a SENDER (since a handler is + // registered for RoutedMessage and the framework eagerly pre-registers + // sender configuration for handled message types — see GH-2588), but + // the listener side must NOT have been created. + if (endpoint != null) endpoint.IsListener.ShouldBeFalse(); theRuntime.Endpoints.ActiveListeners() .Any(x => x.Uri == uri) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs new file mode 100644 index 000000000..e749a7d20 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_2588_durable_outbox_with_handler_and_conventional_routing.cs @@ -0,0 +1,88 @@ +using IntegrationTests; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.ComplianceTests; +using Wolverine.Configuration; +using Wolverine.Marten; +using Wolverine.Runtime; +using Wolverine.Runtime.Routing; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +/// +/// Companion to . +/// That test passes only because it never registers a handler for Bug2304Message — +/// so conventional routing never pre-creates the sender exchange via +/// RabbitMqMessageRoutingConvention.ApplyListenerRoutingDefaults. As soon as +/// a handler IS registered (the realistic case), the exchange gets created during +/// listener discovery and is then compiled by BrokerTransport.InitializeAsync +/// BEFORE any sender subscription is registered — meaning AllSenders policies like +/// UseDurableOutboxOnAllSendingEndpoints short-circuit on +/// endpoint.Subscriptions.Any() == false. Locks down the structural fix +/// in WolverineRuntime.discoverListenersFromConventions / +/// MessageRoutingConvention.PreregisterSenders. See GH-2588. +/// +public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IDisposable +{ + private readonly IHost _host; + + public Bug_2588_durable_outbox_with_handler_and_conventional_routing() + { + _host = WolverineHost.For(opts => + { + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DisableNpgsqlLogging = true; + }) + .IntegrateWithWolverine(); + + opts.UseRabbitMq() + .UseConventionalRouting() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + // Critical to reproduce: register the handler so conventional routing's + // DiscoverListeners pre-creates the sender exchange via + // ApplyListenerRoutingDefaults BEFORE BrokerTransport.InitializeAsync compiles it. + opts.DisableConventionalDiscovery().IncludeType(); + }); + } + + [Fact] + public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered() + { + var runtime = _host.Services.GetRequiredService(); + + var routes = runtime.RoutingFor(typeof(Bug2588Message)) + .ShouldBeOfType>() + .Routes; + + routes.Length.ShouldBeGreaterThan(0); + + var route = routes.Single().ShouldBeOfType(); + var endpoint = route.Sender.Endpoint; + + endpoint.Mode.ShouldBe(EndpointMode.Durable); + } + + public void Dispose() + { + _host?.Dispose(); + } +} + +public class Bug2588Message; + +public class Bug2588Handler +{ + public static void Handle(Bug2588Message message) + { + // no-op + } +} diff --git a/src/Wolverine/Runtime/Routing/IMessageRoutingConvention.cs b/src/Wolverine/Runtime/Routing/IMessageRoutingConvention.cs index ef13b884f..cdad10040 100644 --- a/src/Wolverine/Runtime/Routing/IMessageRoutingConvention.cs +++ b/src/Wolverine/Runtime/Routing/IMessageRoutingConvention.cs @@ -22,6 +22,25 @@ public interface IMessageRoutingConvention /// /// IEnumerable DiscoverSenders(Type messageType, IWolverineRuntime runtime); + + /// + /// Eagerly register subscription metadata and apply sender configuration + /// for the given handled message types, BEFORE any + /// BrokerTransport.InitializeAsync compiles the endpoints. This + /// gives endpoint policies like UseDurableOutboxOnAllSendingEndpoints + /// a chance to see Subscriptions.Any() == true on conventionally- + /// routed sender endpoints when their first Compile() runs. Unlike + /// , this MUST NOT build the sending agent + /// — the broker is not yet connected at this phase of host startup. + /// + /// Default no-op so custom + /// implementations are unaffected. The built-in + /// MessageRoutingConvention<,,,> base class overrides this. + /// See https://github.com/JasperFx/wolverine/issues/2588. + /// + void PreregisterSenders(IReadOnlyList handledMessageTypes, IWolverineRuntime runtime) + { + } } #endregion \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index a7c0d5ad3..a3c8206e3 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -459,6 +459,29 @@ private void discoverListenersFromConventions() { routingConvention.DiscoverListeners(this, handledMessageTypes); } + + // ALSO pre-register sender subscription metadata for each handled + // message type so that endpoint policies (e.g. + // UseDurableOutboxOnAllSendingEndpoints) apply to conventionally- + // routed sender endpoints. Without this, transports like RabbitMQ + // create the sender endpoint as a side effect of listener + // discovery (ApplyListenerRoutingDefaults), but the Subscription + // metadata used by AllSenders policies is added lazily by + // DiscoverSenders only on the first publish — by which point + // BrokerTransport.InitializeAsync has already Compile()'d the + // endpoint with no subscriptions and Endpoint._hasCompiled + // short-circuits the policy from ever applying. See GH-2588. + // + // PreregisterSenders is intentionally lighter than DiscoverSenders + // — it does NOT build the sending agent (which would need a live + // broker connection that hasn't been opened yet). The full + // DiscoverSenders still runs lazily on first publish via + // RoutingFor; by then the endpoint has already been compiled with + // the subscription in place, so the policy decisions stick. + foreach (var routingConvention in Options.RoutingConventions) + { + routingConvention.PreregisterSenders(handledMessageTypes, this); + } } else { diff --git a/src/Wolverine/Transports/MessageRoutingConvention.cs b/src/Wolverine/Transports/MessageRoutingConvention.cs index 128f74673..80bd2a520 100644 --- a/src/Wolverine/Transports/MessageRoutingConvention.cs +++ b/src/Wolverine/Transports/MessageRoutingConvention.cs @@ -24,6 +24,14 @@ public abstract class MessageRoutingConvention _queueNameForListener = t => t.ToMessageTypeName(); private NamingSource _namingSource = NamingSource.FromMessageType; + /// + /// Tracks message types whose sender configuration has already been applied so that + /// doesn't run twice for a given message type when + /// is later called following an earlier + /// call. See GH-2588. + /// + private readonly HashSet _configuredSenders = new(); + void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IReadOnlyList handledMessageTypes) { if(_onlyApplyToOutboundMessages) @@ -150,19 +158,64 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea IEnumerable IMessageRoutingConvention.DiscoverSenders(Type messageType, IWolverineRuntime runtime) { - if(_onlyApplyToInboundMessages) + var endpoint = tryRegisterSenderConfiguration(messageType, runtime); + if (endpoint == null) { yield break; } + // This will start up the sending agent. Only safe to call once the broker + // transport has been initialized (i.e. the sending connection is open). + var sendingAgent = runtime.Endpoints.GetOrBuildSendingAgent(endpoint.Uri); + yield return sendingAgent.Endpoint; + } + + void IMessageRoutingConvention.PreregisterSenders(IReadOnlyList handledMessageTypes, IWolverineRuntime runtime) + { + // Eagerly apply subscription metadata and sender configuration for the + // conventionally-routed sender endpoints derived from this convention's + // handled message types. This must run BEFORE BrokerTransport.InitializeAsync + // calls Compile() on the endpoints — otherwise endpoint policies like + // UseDurableOutboxOnAllSendingEndpoints() that gate on + // `endpoint.Subscriptions.Any()` won't see the subscription and won't + // upgrade the endpoint mode to Durable. See GH-2588. + // + // CRITICAL: do NOT build the sending agent here — the broker isn't connected + // yet at this phase of host startup. The agent gets built lazily later when + // DiscoverSenders runs on the first publish path. + if (_onlyApplyToInboundMessages) + { + return; + } + + foreach (var messageType in handledMessageTypes) + { + tryRegisterSenderConfiguration(messageType, runtime); + } + } + + /// + /// Locate or create the subscriber endpoint for , register + /// the subscription and apply exactly once per message + /// type. Returns the endpoint, or null if filtering rules say this convention should + /// not produce a sender for the message type. Does NOT build the sending agent — that + /// is the caller's responsibility (and only safe once the broker is connected). + /// + private Endpoint? tryRegisterSenderConfiguration(Type messageType, IWolverineRuntime runtime) + { + if (_onlyApplyToInboundMessages) + { + return null; + } + if (!_typeFilters.Matches(messageType)) { - yield break; + return null; } if (messageType.CanBeCastTo() || messageType == typeof(Envelope)) { - yield break; + return null; } var transport = runtime.Options.Transports.GetOrCreate(); @@ -170,7 +223,7 @@ IEnumerable IMessageRoutingConvention.DiscoverSenders(Type messageType var destinationName = _identifierForSender(messageType); if (destinationName.IsEmpty()) { - yield break; + return null; } var corrected = transport.MaybeCorrectName(destinationName); @@ -180,19 +233,19 @@ IEnumerable IMessageRoutingConvention.DiscoverSenders(Type messageType // Register the subscription so that endpoint policies like // UseDurableOutboxOnAllSendingEndpoints() recognize this as a sender - // endpoint when Compile() applies policies. See GH-2304. + // endpoint when Compile() applies policies. See GH-2304 / GH-2588. if (!endpoint.Subscriptions.Any(s => s.Matches(messageType))) { endpoint.Subscriptions.Add(Subscription.ForType(messageType)); } - _configureSending(configuration, new MessageRoutingContext(messageType, runtime)); - - configuration.As().Apply(); + if (_configuredSenders.Add(messageType)) + { + _configureSending(configuration, new MessageRoutingContext(messageType, runtime)); + configuration.As().Apply(); + } - // This will start up the sending agent - var sendingAgent = runtime.Endpoints.GetOrBuildSendingAgent(endpoint.Uri); - yield return sendingAgent.Endpoint; + return endpoint; } private bool _onlyApplyToOutboundMessages; diff --git a/src/Wolverine/WolverineSystemPart.cs b/src/Wolverine/WolverineSystemPart.cs index 1084999d5..d53e4ddc1 100644 --- a/src/Wolverine/WolverineSystemPart.cs +++ b/src/Wolverine/WolverineSystemPart.cs @@ -90,17 +90,24 @@ public void WriteSendingEndpoints() foreach (var messageType in messageTypes) _runtime.RoutingFor(messageType); - var table = new Table(){Title = new TableTitle("Subscriptions"){Style = new Style(decoration:Decoration.Bold)}}; + var table = new Table(){Title = new TableTitle("Senders"){Style = new Style(decoration:Decoration.Bold)}}; table.AddColumn("Uri", c => c.NoWrap = true); table.AddColumn("Name"); table.AddColumn("Mode"); table.AddColumn("Serializer(s)", c => c.NoWrap = true); + // Restrict to endpoints that have actually been wired up as senders. Without + // this filter, the table includes every endpoint registered in any transport + // — including listener-only queues — which makes it look like e.g. a Durable + // listener queue is actually a BufferedInMemory sender. An endpoint may also + // appear in both Senders and Listeners tables when it acts as both. See + // GH-2588. var senders = _runtime - .Options - .Transports - .SelectMany(x => x.Endpoints()) + .Endpoints + .ActiveSendingAgents() + .Select(x => x.Endpoint) + .Distinct() .OrderBy(x => x.Uri.ToString()); foreach (var endpoint in senders)