Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build/CITargets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ void BuildTestProjectsWithFramework(string frameworkOverride, params AbsolutePat
var efCoreMultiTenancy = RootDirectory / "src" / "Persistence" / "EfCoreTests.MultiTenancy" / "EfCoreTests.MultiTenancy.csproj";

BuildTestProjects(efCoreTests, efCoreMultiTenancy);
StartDockerServices("postgresql", "sqlserver");
// RabbitMQ is required by Bug_2588_ef_core_durable_outbox_with_conventional_routing,
// which exercises EF Core + RabbitMQ conventional routing + durable outbox policy.
// See GH-2588.
StartDockerServices("postgresql", "sqlserver", "rabbitmq");

RunSingleProjectOneClassAtATime(efCoreTests);
RunSingleProjectOneClassAtATime(efCoreMultiTenancy);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
using IntegrationTests;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SharedPersistenceModels.Items;
using Shouldly;
using Wolverine;
using Wolverine.Configuration;
using Wolverine.EntityFrameworkCore;
using Wolverine.Postgresql;
using Wolverine.RabbitMQ;
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;
using Xunit;

namespace EfCoreTests.Bugs;

/// <summary>
/// Reproducer for https://github.com/JasperFx/wolverine/issues/2588.
///
/// The reporter's setup mirrors a typical Wolverine app: EF Core DbContext
/// (manual envelope mapping), Postgres message persistence, RabbitMQ with
/// conventional routing, and Policies.UseDurableOutboxOnAllSendingEndpoints
/// (plus AutoApplyTransactions / UseDurableInboxOnAllListeners). Their HTTP
/// endpoint returns a tuple `(Response, CascadedEvent)`. They observe at
/// runtime that the cascading event bypasses the EF transaction / outbox —
/// `Mode == Inline` (the actual reporter saw `InlineSendingAgent`; this
/// repro shows the equivalent default `BufferedInMemory`, both meaning
/// "policy never applied").
///
/// The pre-existing Bug_2304 test exercises a similar policy expectation
/// against Marten + RabbitMQ and passes — but only because it never
/// registers the message handler with `IncludeType`. This reproducer
/// includes the handler, mirroring the reporter's real app.
///
/// The test does NOT exchange messages with RabbitMQ — it just inspects
/// the resolved sender endpoint Mode after `RoutingFor` is called.
/// </summary>
[Collection("postgresql")]
public class Bug_2588_ef_core_durable_outbox_with_conventional_routing : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Faithful to reporter: EF Core with manual envelope mapping.
opts.Services.AddDbContext<Bug2588DbContext>(o =>
o.UseNpgsql(Servers.PostgresConnectionString));

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine");
opts.UseEntityFrameworkCoreTransactions();

opts.UseRabbitMq()
.UseConventionalRouting()
.AutoProvision()
.AutoPurgeOnStartup();

opts.Policies.AutoApplyTransactions();
opts.Policies.UseDurableInboxOnAllListeners();
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.Durability.Mode = DurabilityMode.Solo;

// Critical to reproduce: register the handler so conventional
// routing's DiscoverListeners pre-creates the exchange via
// ApplyListenerRoutingDefaults. That early creation makes
// BrokerTransport.InitializeAsync compile the exchange BEFORE
// any DiscoverSenders has added the subscription, so AllSenders
// policies (UseDurableOutboxOnAllSendingEndpoints) never apply
// — the _hasCompiled flag short-circuits a re-application
// when DiscoverSenders later adds the subscription on first
// publish.
opts.Discovery.DisableConventionalDiscovery().IncludeType<Bug2588Handler>();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var routes = runtime.RoutingFor(typeof(Bug2588Message))
.ShouldBeOfType<MessageRouter<Bug2588Message>>()
.Routes;

routes.Length.ShouldBeGreaterThan(0);

var route = routes.Single().ShouldBeOfType<MessageRoute>();
var endpoint = route.Sender.Endpoint;

// Reporter's symptom in unit-test form. With
// UseDurableOutboxOnAllSendingEndpoints() the conventionally-routed
// RabbitMQ exchange should have EndpointMode.Durable so cascading
// messages participate in the outbox transaction. On main with the
// handler registered, this comes back as BufferedInMemory because
// the exchange was Compile()'d during BrokerTransport.InitializeAsync
// (before DiscoverSenders ran) and the AllSenders policy gated on
// `e.Subscriptions.Any()` short-circuited.
endpoint.Mode.ShouldBe(EndpointMode.Durable);
}
}

public record Bug2588Message(Guid Id);

public class Bug2588Handler
{
// Triggers conventional listener creation for Bug2588Message at startup,
// which in turn calls RabbitMqMessageRoutingConvention.ApplyListenerRoutingDefaults
// and pre-creates the sender exchange before AllSenders policies apply.
public static void Handle(Bug2588Message _) { }
}

public class Bug2588DbContext(DbContextOptions<Bug2588DbContext> options) : DbContext(options)
{
public DbSet<Item> Items => Set<Item>();

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.MapWolverineEnvelopeStorage("wolverine");

modelBuilder.Entity<Item>(map =>
{
map.ToTable("bug_2588_items");
map.HasKey(x => x.Id);
map.Property(x => x.Id).HasColumnName("id");
map.Property(x => x.Name).HasColumnName("name");
});

base.OnModelCreating(modelBuilder);
}
}
7 changes: 7 additions & 0 deletions src/Persistence/EfCoreTests/EfCoreTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
<ProjectReference Include="..\Wolverine.EntityFrameworkCore\Wolverine.EntityFrameworkCore.csproj"/>
<ProjectReference Include="..\Wolverine.Postgresql\Wolverine.Postgresql.csproj"/>
<ProjectReference Include="..\Wolverine.SqlServer\Wolverine.SqlServer.csproj"/>
<!--
Wolverine.RabbitMQ is referenced for the GH-2588 reproducer
(Bugs/Bug_2588_*.cs). The test only inspects endpoint Mode after
configuration; it does NOT open a RabbitMQ connection so no broker
is required at test time.
-->
<ProjectReference Include="..\..\Transports\RabbitMQ\Wolverine.RabbitMQ\Wolverine.RabbitMQ.csproj"/>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using IntegrationTests;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.Configuration;
using Wolverine.Postgresql;
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;

namespace Wolverine.AmazonSqs.Tests.Bugs;

/// <summary>
/// Locks down GH-2588 for the Amazon SQS conventional routing. Without the
/// structural fix, registering a handler triggers listener discovery which
/// pre-creates the corresponding queue; that endpoint is then compiled during
/// <c>BrokerTransport.InitializeAsync</c> BEFORE any sender subscription is
/// registered, so AllSenders policies like
/// <c>UseDurableOutboxOnAllSendingEndpoints</c> short-circuit on
/// <c>endpoint.Subscriptions.Any() == false</c> and never upgrade the endpoint
/// mode to Durable.
/// </summary>
public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IDisposable
{
private readonly IHost _host;

public Bug_2588_durable_outbox_with_handler_and_conventional_routing()
{
_host = WolverineHost.For(opts =>
{
opts.UseAmazonSqsTransportLocally()
.UseConventionalRouting()
.AutoProvision()
.AutoPurgeOnStartup();

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString);
opts.Durability.Mode = DurabilityMode.Solo;

opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.DisableConventionalDiscovery().IncludeType<Bug2588SqsHandler>();
});
}

[Fact]
public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var routes = runtime.RoutingFor(typeof(Bug2588SqsMessage))
.ShouldBeOfType<MessageRouter<Bug2588SqsMessage>>()
.Routes;

routes.Length.ShouldBeGreaterThan(0);

var route = routes.Single().ShouldBeOfType<MessageRoute>();
var endpoint = route.Sender.Endpoint;

endpoint.Mode.ShouldBe(EndpointMode.Durable);
}

public void Dispose()
{
_host?.Dispose();
}
}

public class Bug2588SqsMessage;

public class Bug2588SqsHandler
{
public static void Handle(Bug2588SqsMessage message)
{
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ public void disable_listener_by_lambda()

var uri = "sqs://routed".ToUri();
var endpoint = theRuntime.Endpoints.EndpointFor(uri);
endpoint.ShouldBeNull();

// An endpoint may exist at this URI as a SENDER (since a handler is
// registered for RoutedMessage and the framework eagerly pre-registers
// sender configuration for handled message types — see GH-2588), but
// the listener side must NOT have been created.
if (endpoint != null) endpoint.IsListener.ShouldBeFalse();

theRuntime.Endpoints.ActiveListeners().Any(x => x.Uri == uri)
.ShouldBeFalse();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;
using Xunit;

namespace Wolverine.AzureServiceBus.Tests.Bugs;

/// <summary>
/// Locks down GH-2588 for the Azure Service Bus queue-based conventional routing.
/// Without the structural fix, registering a handler triggers listener discovery
/// which pre-creates the corresponding queue/topic; that endpoint is then compiled
/// during <c>BrokerTransport.InitializeAsync</c> BEFORE any sender subscription is
/// registered, so AllSenders policies like <c>UseDurableOutboxOnAllSendingEndpoints</c>
/// short-circuit on <c>endpoint.Subscriptions.Any() == false</c> and never upgrade
/// the endpoint mode to Durable.
/// </summary>
[Trait("Category", "Flaky")]
public class Bug_2588_durable_outbox_with_handler_and_conventional_routing : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.UseConventionalRouting()
.AutoProvision()
.AutoPurgeOnStartup();

opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.DisableConventionalDiscovery().IncludeType<Bug2588AsbHandler>();
}).StartAsync();
}

public async Task DisposeAsync()
{
if (_host != null) await _host.StopAsync();
_host?.Dispose();
await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync();
}

[Fact]
public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var routes = runtime.RoutingFor(typeof(Bug2588AsbMessage))
.ShouldBeOfType<MessageRouter<Bug2588AsbMessage>>()
.Routes;

routes.Length.ShouldBeGreaterThan(0);

var route = routes.Single().ShouldBeOfType<MessageRoute>();
var endpoint = route.Sender.Endpoint;

endpoint.Mode.ShouldBe(EndpointMode.Durable);
}
}

/// <summary>
/// Companion to <see cref="Bug_2588_durable_outbox_with_handler_and_conventional_routing"/>
/// but exercising the topic/subscription broadcasting convention rather than the
/// queue-based one. Both inherit from <c>MessageRoutingConvention&lt;,,,&gt;</c>
/// and share the same fix path.
/// </summary>
[Trait("Category", "Flaky")]
public class Bug_2588_durable_outbox_with_handler_and_topic_broadcasting_routing : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.UseTopicAndSubscriptionConventionalRouting(x =>
{
// Keep names short — Azure Service Bus has a 50-char limit
x.SubscriptionNameForListener(t => t.Name.ToLowerInvariant());
x.TopicNameForListener(t => t.Name.ToLowerInvariant());
x.TopicNameForSender(t => t.Name.ToLowerInvariant());
})
.AutoProvision()
.AutoPurgeOnStartup();

opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.DisableConventionalDiscovery().IncludeType<Bug2588AsbHandler>();
}).StartAsync();
}

public async Task DisposeAsync()
{
if (_host != null) await _host.StopAsync();
_host?.Dispose();
await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync();
}

[Fact]
public void conventionally_routed_sender_should_be_durable_when_handler_is_also_registered()
{
var runtime = _host.Services.GetRequiredService<IWolverineRuntime>();

var routes = runtime.RoutingFor(typeof(Bug2588AsbMessage))
.ShouldBeOfType<MessageRouter<Bug2588AsbMessage>>()
.Routes;

routes.Length.ShouldBeGreaterThan(0);

var route = routes.Single().ShouldBeOfType<MessageRoute>();
var endpoint = route.Sender.Endpoint;

endpoint.Mode.ShouldBe(EndpointMode.Durable);
}
}

public class Bug2588AsbMessage;

public class Bug2588AsbHandler
{
public static void Handle(Bug2588AsbMessage message)
{
// no-op
}
}
Loading
Loading