From b1aa62ce3b5ac3e481f65d9081e8fe3e01cd3166 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 24 Feb 2026 12:21:54 -0600 Subject: [PATCH] Fix NRE when scheduling messages to send-only endpoints like ASB topics. Closes #2223 FindListenerCircuit returns null for send-only endpoints (e.g. Azure Service Bus topics) since they have no listener. EnqueueDirectlyAsync now falls back to the sending agent when no listener circuit is found, allowing scheduled messages to be sent through the normal outbound path. Co-Authored-By: Claude Opus 4.6 --- .../using_native_scheduling.cs | 26 +++++++++++++++++++ .../Configuration/EndpointCollection.cs | 8 +++--- .../WolverineRuntime.EnqueueDirectly.cs | 16 +++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs index f494d4984..939a9bd84 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs @@ -96,6 +96,32 @@ public async Task with_inline_endpoint_explicit_scheduling() await host.StopAsync(); } + [Fact] + public async Task schedule_to_topic_with_subscription_listener() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .AutoProvision().AutoPurgeOnStartup(); + + opts.PublishMessage().ToAzureServiceBusTopic("scheduled-topic"); + opts.ListenToAzureServiceBusSubscription("scheduled-sub") + .FromTopic("scheduled-topic") + .ProcessInline(); + }).StartAsync(); + + var session = await host.TrackActivity() + .IncludeExternalTransports() + .Timeout(20.Seconds()) + .ExecuteAndWaitAsync(c => c.ScheduleAsync(new AsbMessage1("topic scheduled"), 3.Seconds())); + + session.Received.SingleMessage() + .Name.ShouldBe("topic scheduled"); + + await host.StopAsync(); + } + [Fact] public async Task with_buffered_endpoint() // durable would have similar mechanics { diff --git a/src/Wolverine/Configuration/EndpointCollection.cs b/src/Wolverine/Configuration/EndpointCollection.cs index f7724a742..3f727bc92 100644 --- a/src/Wolverine/Configuration/EndpointCollection.cs +++ b/src/Wolverine/Configuration/EndpointCollection.cs @@ -42,7 +42,7 @@ public interface IEndpointCollection : IAsyncDisposable Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken); Task StopListenerAsync(Endpoint endpoint, CancellationToken cancellationToken); - IListenerCircuit FindListenerCircuit(Uri address); + IListenerCircuit? FindListenerCircuit(Uri address); } public class EndpointCollection : IEndpointCollection @@ -258,15 +258,15 @@ public async Task StopListenerAsync(Endpoint endpoint, CancellationToken cancell } } - public IListenerCircuit FindListenerCircuit(Uri address) + public IListenerCircuit? FindListenerCircuit(Uri address) { if (address.Scheme == TransportConstants.Local) { return (IListenerCircuit)GetOrBuildSendingAgent(address); } - return (FindListeningAgent(address) ?? - FindListeningAgent(TransportConstants.Durable))!; + return FindListeningAgent(address) ?? + FindListeningAgent(TransportConstants.Durable); } public async Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken) diff --git a/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs b/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs index 3c859d601..43d0fc199 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs @@ -9,7 +9,21 @@ public async ValueTask EnqueueDirectlyAsync(IReadOnlyList envelopes) var groups = envelopes.GroupBy(x => x.Destination ?? TransportConstants.LocalUri).ToArray(); foreach (var group in groups) { - await Endpoints.FindListenerCircuit(group.Key).EnqueueDirectlyAsync(group); + var listener = Endpoints.FindListenerCircuit(group.Key); + if (listener != null) + { + await listener.EnqueueDirectlyAsync(group); + } + else + { + // For send-only endpoints (e.g. Azure Service Bus topics), + // there is no listener circuit. Send through the sending agent instead. + var sender = Endpoints.GetOrBuildSendingAgent(group.Key); + foreach (var envelope in group) + { + await sender.EnqueueOutgoingAsync(envelope); + } + } } } } \ No newline at end of file