diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs index f494d4984..939a9bd84 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/using_native_scheduling.cs @@ -96,6 +96,32 @@ public async Task with_inline_endpoint_explicit_scheduling() await host.StopAsync(); } + [Fact] + public async Task schedule_to_topic_with_subscription_listener() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .AutoProvision().AutoPurgeOnStartup(); + + opts.PublishMessage().ToAzureServiceBusTopic("scheduled-topic"); + opts.ListenToAzureServiceBusSubscription("scheduled-sub") + .FromTopic("scheduled-topic") + .ProcessInline(); + }).StartAsync(); + + var session = await host.TrackActivity() + .IncludeExternalTransports() + .Timeout(20.Seconds()) + .ExecuteAndWaitAsync(c => c.ScheduleAsync(new AsbMessage1("topic scheduled"), 3.Seconds())); + + session.Received.SingleMessage() + .Name.ShouldBe("topic scheduled"); + + await host.StopAsync(); + } + [Fact] public async Task with_buffered_endpoint() // durable would have similar mechanics { diff --git a/src/Wolverine/Configuration/EndpointCollection.cs b/src/Wolverine/Configuration/EndpointCollection.cs index f7724a742..3f727bc92 100644 --- a/src/Wolverine/Configuration/EndpointCollection.cs +++ b/src/Wolverine/Configuration/EndpointCollection.cs @@ -42,7 +42,7 @@ public interface IEndpointCollection : IAsyncDisposable Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken); Task StopListenerAsync(Endpoint endpoint, CancellationToken cancellationToken); - IListenerCircuit FindListenerCircuit(Uri address); + IListenerCircuit? FindListenerCircuit(Uri address); } public class EndpointCollection : IEndpointCollection @@ -258,15 +258,15 @@ public async Task StopListenerAsync(Endpoint endpoint, CancellationToken cancell } } - public IListenerCircuit FindListenerCircuit(Uri address) + public IListenerCircuit? FindListenerCircuit(Uri address) { if (address.Scheme == TransportConstants.Local) { return (IListenerCircuit)GetOrBuildSendingAgent(address); } - return (FindListeningAgent(address) ?? - FindListeningAgent(TransportConstants.Durable))!; + return FindListeningAgent(address) ?? + FindListeningAgent(TransportConstants.Durable); } public async Task StartListenerAsync(Endpoint endpoint, CancellationToken cancellationToken) diff --git a/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs b/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs index 3c859d601..43d0fc199 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.EnqueueDirectly.cs @@ -9,7 +9,21 @@ public async ValueTask EnqueueDirectlyAsync(IReadOnlyList envelopes) var groups = envelopes.GroupBy(x => x.Destination ?? TransportConstants.LocalUri).ToArray(); foreach (var group in groups) { - await Endpoints.FindListenerCircuit(group.Key).EnqueueDirectlyAsync(group); + var listener = Endpoints.FindListenerCircuit(group.Key); + if (listener != null) + { + await listener.EnqueueDirectlyAsync(group); + } + else + { + // For send-only endpoints (e.g. Azure Service Bus topics), + // there is no listener circuit. Send through the sending agent instead. + var sender = Endpoints.GetOrBuildSendingAgent(group.Key); + foreach (var envelope in group) + { + await sender.EnqueueOutgoingAsync(envelope); + } + } } } } \ No newline at end of file