Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.AzureServiceBus.Internal;
using Wolverine.Runtime;
using Wolverine.Util;
using Xunit;

namespace Wolverine.AzureServiceBus.Tests.Bugs;

public class Bug_2307_batching_with_conventional_routing : IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.UseConventionalRouting(x => x.IncludeTypes(t => t == typeof(BatchedItem)))
.AutoProvision();

opts.BatchMessagesOf<BatchedItem>(batching =>
{
batching.BatchSize = 5;
batching.TriggerTime = 3.Seconds();
});

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

public async Task DisposeAsync()
{
if (_host != null) await _host.StopAsync();
_host?.Dispose();
await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync();
}

[Fact]
public void conventional_routing_should_create_listener_for_batch_element_type()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

// The batch element type should have a listener endpoint created by conventional routing.
// Without the fix, only the array type (BatchedItem[]) gets a listener, not the element type.
var expectedQueueName = typeof(BatchedItem).ToMessageTypeName().ToLowerInvariant();

var endpoints = runtime.Options.Transports.AllEndpoints()
.Where(x => x is AzureServiceBusQueue)
.Where(x => x.IsListener)
.ToArray();

endpoints.ShouldContain(
e => e.EndpointName == expectedQueueName,
$"Expected a listener endpoint for queue '{expectedQueueName}' but found only: {string.Join(", ", endpoints.Select(e => e.EndpointName))}");
}
}

public record BatchedItem(string Name);

public static class BatchedItemHandler
{
public static void Handle(BatchedItem[] items)
{
// batch handler
}
}
10 changes: 10 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,16 @@ private void discoverListenersFromConventions()
{
// Let any registered routing conventions discover listener endpoints
var handledMessageTypes = Handlers.Chains.Select(x => x.MessageType).ToList();

// Include batch element types so that conventional routing creates listeners for
// the element type (e.g., BatchedItem) rather than only the array type (BatchedItem[])
foreach (var batch in Options.BatchDefinitions)
{
if (!handledMessageTypes.Contains(batch.ElementType))
{
handledMessageTypes.Add(batch.ElementType);
}
}
if (!Options.ExternalTransportsAreStubbed)
{
foreach (var routingConvention in Options.RoutingConventions)
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Transports/Local/LocalTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ internal void DiscoverListeners(IWolverineRuntime runtime, IReadOnlyList<Type> h
foreach (var messageType in handledMessageTypes)
{
var chain = runtime.Options.HandlerGraph.ChainFor(messageType);
if (chain.Handlers.Any())
if (chain != null && chain.Handlers.Any())
{
FindOrCreateQueueForMessageTypeByConvention(messageType);
}
Expand Down
24 changes: 19 additions & 5 deletions src/Wolverine/Transports/MessageRoutingConvention.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,22 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea
foreach (var messageType in handledMessageTypes.Where(t => _typeFilters.Matches(t)))
{
var chain = runtime.Options.HandlerGraph.ChainFor(messageType);
if (chain == null) continue;


// Batch element types won't have their own handler chain (only the array type does),
// but they still need external listeners created so messages can be received and
// routed to the local batching queue. See GH-2307.
var isBatchElementType = runtime.Options.BatchDefinitions.Any(b => b.ElementType == messageType);

if (chain == null)
{
if (isBatchElementType)
{
maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime);
}

continue;
}

if (runtime.Options.MultipleHandlerBehavior == MultipleHandlerBehavior.ClassicCombineIntoOneLogicalHandler && chain.Handlers.Any())
{
maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime);
Expand All @@ -47,7 +61,7 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea
{
maybeCreateListenerForMessageOrHandlerType(transport, messageType, runtime);
}

foreach (var handlerChain in chain.ByEndpoint)
{
var handlerType = handlerChain.Handlers.First().HandlerType;
Expand All @@ -59,8 +73,8 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea
}
}
}


}
}

Expand Down
Loading