diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2307_batching_with_conventional_routing.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2307_batching_with_conventional_routing.cs new file mode 100644 index 000000000..6aa6d756d --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_2307_batching_with_conventional_routing.cs @@ -0,0 +1,71 @@ +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.AzureServiceBus.Internal; +using Wolverine.Runtime; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.AzureServiceBus.Tests.Bugs; + +public class Bug_2307_batching_with_conventional_routing : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .UseConventionalRouting(x => x.IncludeTypes(t => t == typeof(BatchedItem))) + .AutoProvision(); + + opts.BatchMessagesOf(batching => + { + batching.BatchSize = 5; + batching.TriggerTime = 3.Seconds(); + }); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + if (_host != null) await _host.StopAsync(); + _host?.Dispose(); + await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync(); + } + + [Fact] + public void conventional_routing_should_create_listener_for_batch_element_type() + { + var runtime = _host.Services.GetRequiredService(); + + // The batch element type should have a listener endpoint created by conventional routing. + // Without the fix, only the array type (BatchedItem[]) gets a listener, not the element type. + var expectedQueueName = typeof(BatchedItem).ToMessageTypeName().ToLowerInvariant(); + + var endpoints = runtime.Options.Transports.AllEndpoints() + .Where(x => x is AzureServiceBusQueue) + .Where(x => x.IsListener) + .ToArray(); + + endpoints.ShouldContain( + e => e.EndpointName == expectedQueueName, + $"Expected a listener endpoint for queue '{expectedQueueName}' but found only: {string.Join(", ", endpoints.Select(e => e.EndpointName))}"); + } +} + +public record BatchedItem(string Name); + +public static class BatchedItemHandler +{ + public static void Handle(BatchedItem[] items) + { + // batch handler + } +} diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index a24e1c0bb..1bbb7da8b 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -373,6 +373,16 @@ private void discoverListenersFromConventions() { // Let any registered routing conventions discover listener endpoints var handledMessageTypes = Handlers.Chains.Select(x => x.MessageType).ToList(); + + // Include batch element types so that conventional routing creates listeners for + // the element type (e.g., BatchedItem) rather than only the array type (BatchedItem[]) + foreach (var batch in Options.BatchDefinitions) + { + if (!handledMessageTypes.Contains(batch.ElementType)) + { + handledMessageTypes.Add(batch.ElementType); + } + } if (!Options.ExternalTransportsAreStubbed) { foreach (var routingConvention in Options.RoutingConventions) diff --git a/src/Wolverine/Transports/Local/LocalTransport.cs b/src/Wolverine/Transports/Local/LocalTransport.cs index 3e32f7d5e..3410e38a5 100644 --- a/src/Wolverine/Transports/Local/LocalTransport.cs +++ b/src/Wolverine/Transports/Local/LocalTransport.cs @@ -170,7 +170,7 @@ internal void DiscoverListeners(IWolverineRuntime runtime, IReadOnlyList h foreach (var messageType in handledMessageTypes) { var chain = runtime.Options.HandlerGraph.ChainFor(messageType); - if (chain.Handlers.Any()) + if (chain != null && chain.Handlers.Any()) { FindOrCreateQueueForMessageTypeByConvention(messageType); } diff --git a/src/Wolverine/Transports/MessageRoutingConvention.cs b/src/Wolverine/Transports/MessageRoutingConvention.cs index 6623b2f31..d1ddb67a1 100644 --- a/src/Wolverine/Transports/MessageRoutingConvention.cs +++ b/src/Wolverine/Transports/MessageRoutingConvention.cs @@ -35,8 +35,22 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea foreach (var messageType in handledMessageTypes.Where(t => _typeFilters.Matches(t))) { var chain = runtime.Options.HandlerGraph.ChainFor(messageType); - if (chain == null) continue; - + + // Batch element types won't have their own handler chain (only the array type does), + // but they still need external listeners created so messages can be received and + // routed to the local batching queue. See GH-2307. + var isBatchElementType = runtime.Options.BatchDefinitions.Any(b => b.ElementType == messageType); + + if (chain == null) + { + if (isBatchElementType) + { + maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime); + } + + continue; + } + if (runtime.Options.MultipleHandlerBehavior == MultipleHandlerBehavior.ClassicCombineIntoOneLogicalHandler && chain.Handlers.Any()) { maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime); @@ -47,7 +61,7 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea { maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime); } - + foreach (var handlerChain in chain.ByEndpoint) { var handlerType = handlerChain.Handlers.First().HandlerType; @@ -59,8 +73,8 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea } } } - - + + } }