Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.0" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="9.0.5" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.5" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.5" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.5" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.SqlServer" Version="9.0.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Workspaces.MSBuild" Version="4.14.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using Microsoft.EntityFrameworkCore;
using SharedPersistenceModels.Items;
using Shouldly;
using Wolverine.EntityFrameworkCore;

namespace EfCoreTests.DomainEvents;

/// <summary>
/// Unit tests verifying that DomainEventScraper only processes Added and Modified entities,
/// skipping Unchanged and Deleted ones (issue #2476 optimization).
/// </summary>
public class DomainEventScraperStateFilterTests
{
private static DbContextOptions<ScraperTestDbContext> BuildOptions() =>
new DbContextOptionsBuilder<ScraperTestDbContext>()
.UseInMemoryDatabase(Guid.NewGuid().ToString())
.Options;

[Fact]
public void change_tracker_filter_only_returns_added_and_modified_entries()
{
using var ctx = new ScraperTestDbContext(BuildOptions());

var added = new Item { Id = Guid.CreateVersion7(), Name = "Added" };
var modified = new Item { Id = Guid.CreateVersion7(), Name = "Modified" };
var unchanged = new Item { Id = Guid.CreateVersion7(), Name = "Unchanged" };
var deleted = new Item { Id = Guid.CreateVersion7(), Name = "Deleted" };

ctx.Entry(added).State = EntityState.Added;
ctx.Entry(modified).State = EntityState.Modified;
ctx.Entry(unchanged).State = EntityState.Unchanged;
ctx.Entry(deleted).State = EntityState.Deleted;

// This mirrors the filter introduced in DomainEventScraper<T, TEvent>.ScrapeEvents
var scraped = ctx.ChangeTracker.Entries()
.Where(e => e.State == EntityState.Added || e.State == EntityState.Modified)
.Select(x => x.Entity)
.OfType<Item>()
.ToList();

scraped.Count.ShouldBe(2);
scraped.ShouldContain(added);
scraped.ShouldContain(modified);
scraped.ShouldNotContain(unchanged);
scraped.ShouldNotContain(deleted);
}

[Fact]
public async Task domain_event_scraper_collects_events_from_added_and_modified_but_not_unchanged_or_deleted()
{
var options = BuildOptions();

// Seed two items so they can be loaded in Unchanged / Deleted states
using (var seed = new ScraperTestDbContext(options))
{
seed.Items.Add(new Item { Id = Guid.Parse("00000000-0000-0000-0000-000000000001"), Name = "WillBeUnchanged" });
seed.Items.Add(new Item { Id = Guid.Parse("00000000-0000-0000-0000-000000000002"), Name = "WillBeDeleted" });
await seed.SaveChangesAsync();
}

using var ctx = new ScraperTestDbContext(options);

// Added – new entity not yet persisted
var addedItem = new Item { Id = Guid.CreateVersion7(), Name = "Added" };
ctx.Items.Add(addedItem);
addedItem.Approve(); // raises ItemApproved event

// Modified – load, change, and let EF detect it
var modifiedItem = await ctx.Items.FindAsync(Guid.Parse("00000000-0000-0000-0000-000000000001"));
modifiedItem!.Approve(); // raises event AND sets Approved=true → Modified state

// Unchanged – load but do not touch
// (we manually add an event to the unchanged item to prove the scraper skips it)
var unchangedItem = await ctx.Items.FindAsync(Guid.Parse("00000000-0000-0000-0000-000000000002"));
unchangedItem!.Publish(new ItemApproved(unchangedItem.Id)); // event added, but state stays Unchanged

// Verify states are as expected
ctx.Entry(addedItem).State.ShouldBe(EntityState.Added);
ctx.Entry(modifiedItem).State.ShouldBe(EntityState.Modified);
ctx.Entry(unchangedItem).State.ShouldBe(EntityState.Unchanged);

// Collect the entities that the optimized scraper would target
var scraped = ctx.ChangeTracker.Entries()
.Where(e => e.State == EntityState.Added || e.State == EntityState.Modified)
.Select(x => x.Entity)
.OfType<IEntity>()
.ToList();

scraped.Count.ShouldBe(2);
scraped.ShouldContain(addedItem);
scraped.ShouldContain(modifiedItem);
scraped.ShouldNotContain(unchangedItem);

// Events from the two targeted entities
var events = scraped.SelectMany(e => e.Events).ToList();
events.Count.ShouldBe(2);
events.OfType<ItemApproved>().ShouldContain(e => e.Id == addedItem.Id);
events.OfType<ItemApproved>().ShouldContain(e => e.Id == modifiedItem.Id);
events.OfType<ItemApproved>().ShouldNotContain(e => e.Id == unchangedItem.Id);
}
}

// Minimal DbContext for these unit tests – no Wolverine envelope storage needed.
public class ScraperTestDbContext(DbContextOptions<ScraperTestDbContext> options) : DbContext(options)
{
public DbSet<Item> Items { get; set; } = null!;

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Item>(map =>
{
map.ToTable("scraper_test_items");
map.HasKey(x => x.Id);
map.Property(x => x.Name);
map.Property(x => x.Approved);
});
}
}
1 change: 1 addition & 0 deletions src/Persistence/EfCoreTests/EfCoreTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer"/>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL"/>
</ItemGroup>
Expand Down
37 changes: 37 additions & 0 deletions src/Persistence/EfCoreTests/persisting_envelopes_with_sqlserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,43 @@ public async Task mapping_to_incoming_envelopes()
envelope.SentAt.ShouldBe(theIncomingEnvelope.SentAt);
}

[Fact]
public async Task persist_outgoing_batch_uses_add_range()
{
var runtime = _host.GetRuntime();
var context = new MessageContext(runtime);

using var scope = _host.Services.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<SampleMappedDbContext>();

var envelopes = Enumerable.Range(0, 3).Select(_ => new Envelope
{
Id = Guid.NewGuid(),
Status = EnvelopeStatus.Outgoing,
OwnerId = 5,
Data = [1, 2, 3],
MessageType = "batch-test",
Destination = new Uri("rabbitmq://queue/batch")
}).ToArray();

var transaction = new EfCoreEnvelopeTransaction(dbContext, context);
await transaction.PersistOutgoingAsync(envelopes);
await dbContext.SaveChangesAsync();

if (dbContext.Database.CurrentTransaction != null)
{
await dbContext.Database.CurrentTransaction.CommitAsync();
}

var storage = _host.Services.GetRequiredService<IMessageStore>();
var persisted = await storage.Admin.AllOutgoingAsync();

foreach (var envelope in envelopes)
{
persisted.ShouldContain(x => x.Id == envelope.Id);
}
}

[Fact]
public async Task mapping_to_outgoing_envelopes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ public async Task PersistOutgoingAsync(Envelope[] envelopes)

if (DbContext.IsWolverineEnabled())
{
foreach (var envelope in envelopes)
{
var outgoing = new OutgoingMessage(envelope);
DbContext.Add(outgoing);
}
DbContext.AddRange(envelopes.Select(e => new OutgoingMessage(e)));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ public DomainEventScraper(Func<T, IEnumerable<TEvent>> source)

public async Task ScrapeEvents(DbContext dbContext, MessageContext bus)
{
var eventMessages = dbContext.ChangeTracker.Entries().Select(x => x.Entity)
.OfType<T>().SelectMany(_source);
var eventMessages = dbContext.ChangeTracker.Entries()
.Where(e => e.State == EntityState.Added || e.State == EntityState.Modified)
.Select(x => x.Entity)
.OfType<T>()
.SelectMany(_source);

foreach (var eventMessage in eventMessages)
{
Expand Down
Loading