diff --git a/docs/guide/durability/efcore/outbox-and-inbox.md b/docs/guide/durability/efcore/outbox-and-inbox.md index 3e6384deb..6549e1c39 100644 --- a/docs/guide/durability/efcore/outbox-and-inbox.md +++ b/docs/guide/durability/efcore/outbox-and-inbox.md @@ -103,6 +103,51 @@ public async Task Post( snippet source | anchor +### Multiple Flushes in Batching Loops + +`IDbContextOutbox` and `IDbContextOutbox` use the same `MessageContext` mechanics as Wolverine's +handler pipeline. By default, `MultiFlushMode` is `OnlyOnce`. That default prevents accidental duplicate +outgoing messages when Wolverine's generated handler pipeline reaches the flush step more than once for +the same incoming message. + +If you are using the EF Core outbox directly from application code and intentionally call +`SaveChangesAndFlushMessagesAsync()` more than once on the same scoped outbox instance, opt into multiple +flushes for those calls: + +```cs +public async Task SendInBatches( + IDbContextOutbox outbox, + IReadOnlyList commands, + CancellationToken cancellation) +{ + foreach (var chunk in commands.Chunk(500)) + { + foreach (var command in chunk) + { + var item = new Item { Name = command.Name }; + outbox.DbContext.Items.Add(item); + + await outbox.PublishAsync(new ItemCreated { Id = item.Id }); + } + + await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples, cancellation); + } +} +``` + +Without the explicit `AllowMultiples` setting, the first batch would flush normally, but later calls to +`SaveChangesAndFlushMessagesAsync()` in the same scoped outbox instance would be ignored by the default +`OnlyOnce` guard. For example, sending 2,000 messages in four batches of 500 would send the first 500, +then skip the next three flushes. With `AllowMultiples`, each batch flush is honored. + +Passing the mode to `SaveChangesAndFlushMessagesAsync()` applies it only to that call and then restores the +outbox's previous mode, so the setting does not leak into other code using the same scoped outbox instance. + +This setting only controls whether repeated EF Core outbox flushes are honored. It does not force a broker +transport such as Kafka to send one native producer batch of 500 messages. For Kafka, Wolverine's non-inline +sending path can group outgoing envelopes through its batching sender, but the Kafka sender still produces +the envelopes individually within that Wolverine batch. + Or use the `IDbContextOutbox` as shown below, but in this case you will need to explicitly call `Enroll()` on the `IDbContextOutbox` to connect the outbox sending to the `DbContext`: diff --git a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs index e273b527f..6c773e460 100644 --- a/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs +++ b/src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs @@ -370,6 +370,92 @@ public async Task use_generic_outbox_raw() } } + [Fact] + public async Task DbContextOutbox_generic_can_opt_into_multiple_save_changes_and_flush_calls_in_one_scope() + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + + var waiter1 = OutboxedMessageHandler.WaitForNextMessage(); + + using (var nested = Host.Services.CreateScope()) + { + var outbox = nested.ServiceProvider.GetRequiredService>(); + var context = outbox.ShouldBeOfType>(); + + context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + outbox.DbContext.Items.Add(new Item { Id = id1, Name = "First" }); + await outbox.SendAsync(new OutboxedMessage { Id = id1 }); + await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples); + context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + var message1 = await waiter1; + message1.Id.ShouldBe(id1); + + var waiter2 = OutboxedMessageHandler.WaitForNextMessage(); + + outbox.DbContext.Items.Add(new Item { Id = id2, Name = "Second" }); + await outbox.SendAsync(new OutboxedMessage { Id = id2 }); + await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples); + context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + var message2 = await waiter2; + message2.Id.ShouldBe(id2); + } + + using (var nested = Host.Services.CreateScope()) + { + var context = nested.ServiceProvider.GetRequiredService(); + (await context.Items.FindAsync(id1)).ShouldNotBeNull(); + (await context.Items.FindAsync(id2)).ShouldNotBeNull(); + } + } + + [Fact] + public async Task DbContextOutbox_non_generic_can_opt_into_multiple_save_changes_and_flush_calls_in_one_scope() + { + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + + var waiter1 = OutboxedMessageHandler.WaitForNextMessage(); + + using (var nested = Host.Services.CreateScope()) + { + var context = nested.ServiceProvider.GetRequiredService(); + var outbox = nested.ServiceProvider.GetRequiredService(); + var messageContext = outbox.ShouldBeOfType(); + + outbox.Enroll(context); + messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + context.Items.Add(new Item { Id = id1, Name = "First" }); + await outbox.SendAsync(new OutboxedMessage { Id = id1 }); + await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples); + messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + var message1 = await waiter1; + message1.Id.ShouldBe(id1); + + var waiter2 = OutboxedMessageHandler.WaitForNextMessage(); + + context.Items.Add(new Item { Id = id2, Name = "Second" }); + await outbox.SendAsync(new OutboxedMessage { Id = id2 }); + await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples); + messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + + var message2 = await waiter2; + message2.Id.ShouldBe(id2); + } + + using (var nested = Host.Services.CreateScope()) + { + var context = nested.ServiceProvider.GetRequiredService(); + (await context.Items.FindAsync(id1)).ShouldNotBeNull(); + (await context.Items.FindAsync(id2)).ShouldNotBeNull(); + } + } + [Fact] public async Task use_generic_outbox_mapped() { diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/DbContextOutbox.cs b/src/Persistence/Wolverine.EntityFrameworkCore/DbContextOutbox.cs index 80b8cef28..c6ce59826 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/DbContextOutbox.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/DbContextOutbox.cs @@ -33,6 +33,21 @@ public async Task SaveChangesAndFlushMessagesAsync(CancellationToken token = def await FlushOutgoingMessagesAsync(); } + + public async Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default) + { + var previous = MultiFlushMode; + MultiFlushMode = multiFlushMode; + + try + { + await SaveChangesAndFlushMessagesAsync(token).ConfigureAwait(false); + } + finally + { + MultiFlushMode = previous; + } + } } public class DbContextOutbox : MessageContext, IDbContextOutbox @@ -74,4 +89,19 @@ public async Task SaveChangesAndFlushMessagesAsync(CancellationToken token = def await FlushOutgoingMessagesAsync(); } + + public async Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default) + { + var previous = MultiFlushMode; + MultiFlushMode = multiFlushMode; + + try + { + await SaveChangesAndFlushMessagesAsync(token).ConfigureAwait(false); + } + finally + { + MultiFlushMode = previous; + } + } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/IDbContextOutbox.cs b/src/Persistence/Wolverine.EntityFrameworkCore/IDbContextOutbox.cs index beadae534..c68d6af75 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/IDbContextOutbox.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/IDbContextOutbox.cs @@ -1,4 +1,5 @@ using Microsoft.EntityFrameworkCore; +using Wolverine.Runtime; namespace Wolverine.EntityFrameworkCore; @@ -21,6 +22,15 @@ public interface IDbContextOutbox : IMessageBus where T : DbContext /// Task SaveChangesAndFlushMessagesAsync(CancellationToken token = default); + /// + /// Saves outstanding changes in the DbContext and flushes outbox messages to the + /// sending agents with an explicit multi-flush mode for just this call + /// + /// + /// + /// + Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default); + /// /// Calling this /// method will force the outbox to send out any outstanding messages @@ -55,6 +65,15 @@ public interface IDbContextOutbox : IMessageBus /// Task SaveChangesAndFlushMessagesAsync(CancellationToken token = default); + /// + /// Saves outstanding changes in the DbContext and flushes outbox messages to the + /// sending agents with an explicit multi-flush mode for just this call + /// + /// + /// + /// + Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default); + /// /// Calling this /// method will force the outbox to send out any outstanding messages