Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using Microsoft.EntityFrameworkCore;
using NSubstitute;
using SharedPersistenceModels.Items;
using Shouldly;
using Wolverine;
using Wolverine.EntityFrameworkCore;
using Wolverine.Runtime;

namespace EfCoreTests.DomainEvents;

Expand Down Expand Up @@ -98,6 +101,54 @@ public async Task domain_event_scraper_collects_events_from_added_and_modified_b
events.OfType<ItemApproved>().ShouldContain(e => e.Id == modifiedItem.Id);
events.OfType<ItemApproved>().ShouldNotContain(e => e.Id == unchangedItem.Id);
}

/// <summary>
/// Regression test for https://github.com/JasperFx/wolverine/issues/2585.
///
/// DomainEventScraper.ScrapeEvents used to enumerate
/// dbContext.ChangeTracker.Entries() lazily and call PublishAsync per
/// event inside the foreach. When PublishAsync runs through the EF-backed
/// outbox, it adds an IncomingMessage entity to the SAME DbContext —
/// mutating ChangeTracker mid-enumeration and throwing
/// InvalidOperationException: "Collection was modified; enumeration
/// operation may not execute."
///
/// We reproduce the same hazard with an ISendMyself domain event whose
/// ApplyAsync mutates the DbContext, so the test stays self-contained
/// (no PostgreSQL/SqlServer outbox required). Without the .ToArray()
/// materialization in DomainEventScraper, this test throws.
///
/// Adapted from the closed PR #2586 by @jf2s; same approach.
/// </summary>
[Fact]
public async Task domain_event_scraper_materializes_events_before_publishing()
{
using var ctx = new ScraperTestDbContext(BuildOptions());

var item = new Item { Id = Guid.CreateVersion7(), Name = "Added" };
item.Events.Add(new MutatingDomainEvent2585(ctx));
ctx.Items.Add(item);

var runtime = Substitute.For<IWolverineRuntime>();
var context = new MessageContext(runtime);
var scraper = new DomainEventScraper<Item, object>(x => x.Events);

await Should.NotThrowAsync(() => scraper.ScrapeEvents(ctx, context));
}
}

/// <summary>
/// Domain event used by the GH-2585 regression test. Mutates the DbContext
/// during PublishAsync, simulating what EfCoreEnvelopeTransaction.Persist-
/// IncomingAsync does in a real outbox-enrolled handler.
/// </summary>
public class MutatingDomainEvent2585(ScraperTestDbContext dbContext) : ISendMyself
{
public ValueTask ApplyAsync(IMessageContext context)
{
dbContext.Items.Add(new Item { Id = Guid.CreateVersion7(), Name = "Added by PublishAsync" });
return ValueTask.CompletedTask;
}
}

// Minimal DbContext for these unit tests – no Wolverine envelope storage needed.
Expand Down
11 changes: 10 additions & 1 deletion src/Persistence/PostgresqlTests/Transport/basic_functionality.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ public async Task InitializeAsync()
.UseWolverine(opts =>
{
opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, schema:"transports", transportSchema:"transports");
opts.ListenToPostgresqlQueue("one");

// Register the "one" queue but neutralize the host's auto-started listener.
// None of the tests in this fixture rely on the host listener; every test
// that exercises receiving creates its own PostgresqlQueueListener and
// calls TryPopAsync / TryPopDurablyAsync / DeleteExpiredAsync directly.
// Without this, the host listener can poll mid-test on slow CI (default
// polling interval is 5s; CI runs of pop_off_buffered take ~9s under
// load) and consume messages out from under the test's manual pop,
// making CountAsync assertions flaky.
opts.ListenToPostgresqlQueue("one").PollingInterval(1.Hours());
}).StartAsync();

theTransport = theHost.GetRuntime().Options.Transports.GetOrCreate<PostgresqlTransport>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,20 @@ public DomainEventScraper(Func<T, IEnumerable<TEvent>> source)

public async Task ScrapeEvents(DbContext dbContext, MessageContext bus)
{
// IMPORTANT: materialize the LINQ pipeline before publishing.
//
// dbContext.ChangeTracker.Entries() enumerates EF's internal entity
// state dictionary; PublishAsync flows through EfCoreEnvelopeTransaction.
// PersistIncomingAsync, which adds an IncomingMessage entity to the
// SAME DbContext. Mutating ChangeTracker mid-enumeration throws
// InvalidOperationException: "Collection was modified; enumeration
// operation may not execute." Reported in GH-2585.
//
// ToArray() also covers the case where _source(entity) returns a
// live, mutable List<TEvent> that PublishAsync (e.g. via ISendMyself)
// could mutate while we're iterating it.
var eventMessages = dbContext.ChangeTracker.Entries().Select(x => x.Entity)
.OfType<T>().SelectMany(_source);
.OfType<T>().SelectMany(_source).ToArray();

foreach (var eventMessage in eventMessages)
{
Expand Down
Loading