diff --git a/Directory.Packages.props b/Directory.Packages.props index 8fc6fc442..7b3c35645 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -158,6 +158,7 @@ + diff --git a/src/Persistence/EfCoreTests/DomainEvents/DomainEventScraperStateFilterTests.cs b/src/Persistence/EfCoreTests/DomainEvents/DomainEventScraperStateFilterTests.cs new file mode 100644 index 000000000..de36900cd --- /dev/null +++ b/src/Persistence/EfCoreTests/DomainEvents/DomainEventScraperStateFilterTests.cs @@ -0,0 +1,118 @@ +using Microsoft.EntityFrameworkCore; +using SharedPersistenceModels.Items; +using Shouldly; +using Wolverine.EntityFrameworkCore; + +namespace EfCoreTests.DomainEvents; + +/// +/// Unit tests verifying that DomainEventScraper only processes Added and Modified entities, +/// skipping Unchanged and Deleted ones (issue #2476 optimization). +/// +public class DomainEventScraperStateFilterTests +{ + private static DbContextOptions BuildOptions() => + new DbContextOptionsBuilder() + .UseInMemoryDatabase(Guid.NewGuid().ToString()) + .Options; + + [Fact] + public void change_tracker_filter_only_returns_added_and_modified_entries() + { + using var ctx = new ScraperTestDbContext(BuildOptions()); + + var added = new Item { Id = Guid.CreateVersion7(), Name = "Added" }; + var modified = new Item { Id = Guid.CreateVersion7(), Name = "Modified" }; + var unchanged = new Item { Id = Guid.CreateVersion7(), Name = "Unchanged" }; + var deleted = new Item { Id = Guid.CreateVersion7(), Name = "Deleted" }; + + ctx.Entry(added).State = EntityState.Added; + ctx.Entry(modified).State = EntityState.Modified; + ctx.Entry(unchanged).State = EntityState.Unchanged; + ctx.Entry(deleted).State = EntityState.Deleted; + + // This mirrors the filter introduced in DomainEventScraper.ScrapeEvents + var scraped = ctx.ChangeTracker.Entries() + .Where(e => e.State == EntityState.Added || e.State == EntityState.Modified) + .Select(x => x.Entity) + .OfType() + .ToList(); + + scraped.Count.ShouldBe(2); + scraped.ShouldContain(added); + scraped.ShouldContain(modified); + scraped.ShouldNotContain(unchanged); + scraped.ShouldNotContain(deleted); + } + + [Fact] + public async Task domain_event_scraper_collects_events_from_added_and_modified_but_not_unchanged_or_deleted() + { + var options = BuildOptions(); + + // Seed two items so they can be loaded in Unchanged / Deleted states + using (var seed = new ScraperTestDbContext(options)) + { + seed.Items.Add(new Item { Id = Guid.Parse("00000000-0000-0000-0000-000000000001"), Name = "WillBeUnchanged" }); + seed.Items.Add(new Item { Id = Guid.Parse("00000000-0000-0000-0000-000000000002"), Name = "WillBeDeleted" }); + await seed.SaveChangesAsync(); + } + + using var ctx = new ScraperTestDbContext(options); + + // Added – new entity not yet persisted + var addedItem = new Item { Id = Guid.CreateVersion7(), Name = "Added" }; + ctx.Items.Add(addedItem); + addedItem.Approve(); // raises ItemApproved event + + // Modified – load, change, and let EF detect it + var modifiedItem = await ctx.Items.FindAsync(Guid.Parse("00000000-0000-0000-0000-000000000001")); + modifiedItem!.Approve(); // raises event AND sets Approved=true → Modified state + + // Unchanged – load but do not touch + // (we manually add an event to the unchanged item to prove the scraper skips it) + var unchangedItem = await ctx.Items.FindAsync(Guid.Parse("00000000-0000-0000-0000-000000000002")); + unchangedItem!.Publish(new ItemApproved(unchangedItem.Id)); // event added, but state stays Unchanged + + // Verify states are as expected + ctx.Entry(addedItem).State.ShouldBe(EntityState.Added); + ctx.Entry(modifiedItem).State.ShouldBe(EntityState.Modified); + ctx.Entry(unchangedItem).State.ShouldBe(EntityState.Unchanged); + + // Collect the entities that the optimized scraper would target + var scraped = ctx.ChangeTracker.Entries() + .Where(e => e.State == EntityState.Added || e.State == EntityState.Modified) + .Select(x => x.Entity) + .OfType() + .ToList(); + + scraped.Count.ShouldBe(2); + scraped.ShouldContain(addedItem); + scraped.ShouldContain(modifiedItem); + scraped.ShouldNotContain(unchangedItem); + + // Events from the two targeted entities + var events = scraped.SelectMany(e => e.Events).ToList(); + events.Count.ShouldBe(2); + events.OfType().ShouldContain(e => e.Id == addedItem.Id); + events.OfType().ShouldContain(e => e.Id == modifiedItem.Id); + events.OfType().ShouldNotContain(e => e.Id == unchangedItem.Id); + } +} + +// Minimal DbContext for these unit tests – no Wolverine envelope storage needed. +public class ScraperTestDbContext(DbContextOptions options) : DbContext(options) +{ + public DbSet Items { get; set; } = null!; + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity(map => + { + map.ToTable("scraper_test_items"); + map.HasKey(x => x.Id); + map.Property(x => x.Name); + map.Property(x => x.Approved); + }); + } +} diff --git a/src/Persistence/EfCoreTests/EfCoreTests.csproj b/src/Persistence/EfCoreTests/EfCoreTests.csproj index 25a72c0f0..c30d65840 100644 --- a/src/Persistence/EfCoreTests/EfCoreTests.csproj +++ b/src/Persistence/EfCoreTests/EfCoreTests.csproj @@ -23,6 +23,7 @@ + diff --git a/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs b/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs index 1152bc5e4..ad00497d6 100644 --- a/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs +++ b/src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs @@ -134,6 +134,43 @@ public async Task mapping_to_incoming_envelopes() envelope.SentAt.ShouldBe(theIncomingEnvelope.SentAt); } + [Fact] + public async Task persist_outgoing_batch_uses_add_range() + { + var runtime = _host.GetRuntime(); + var context = new MessageContext(runtime); + + using var scope = _host.Services.CreateScope(); + var dbContext = scope.ServiceProvider.GetRequiredService(); + + var envelopes = Enumerable.Range(0, 3).Select(_ => new Envelope + { + Id = Guid.NewGuid(), + Status = EnvelopeStatus.Outgoing, + OwnerId = 5, + Data = [1, 2, 3], + MessageType = "batch-test", + Destination = new Uri("rabbitmq://queue/batch") + }).ToArray(); + + var transaction = new EfCoreEnvelopeTransaction(dbContext, context); + await transaction.PersistOutgoingAsync(envelopes); + await dbContext.SaveChangesAsync(); + + if (dbContext.Database.CurrentTransaction != null) + { + await dbContext.Database.CurrentTransaction.CommitAsync(); + } + + var storage = _host.Services.GetRequiredService(); + var persisted = await storage.Admin.AllOutgoingAsync(); + + foreach (var envelope in envelopes) + { + persisted.ShouldContain(x => x.Id == envelope.Id); + } + } + [Fact] public async Task mapping_to_outgoing_envelopes() { diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs index cd97b2ed4..5839b1014 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs @@ -79,11 +79,7 @@ public async Task PersistOutgoingAsync(Envelope[] envelopes) if (DbContext.IsWolverineEnabled()) { - foreach (var envelope in envelopes) - { - var outgoing = new OutgoingMessage(envelope); - DbContext.Add(outgoing); - } + DbContext.AddRange(envelopes.Select(e => new OutgoingMessage(e))); } else { diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/OutgoingDomainEvents.cs b/src/Persistence/Wolverine.EntityFrameworkCore/OutgoingDomainEvents.cs index 20c38bd3a..cf55f5ce7 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/OutgoingDomainEvents.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/OutgoingDomainEvents.cs @@ -39,8 +39,11 @@ public DomainEventScraper(Func> source) public async Task ScrapeEvents(DbContext dbContext, MessageContext bus) { - var eventMessages = dbContext.ChangeTracker.Entries().Select(x => x.Entity) - .OfType().SelectMany(_source); + var eventMessages = dbContext.ChangeTracker.Entries() + .Where(e => e.State == EntityState.Added || e.State == EntityState.Modified) + .Select(x => x.Entity) + .OfType() + .SelectMany(_source); foreach (var eventMessage in eventMessages) {