Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/guide/durability/efcore/outbox-and-inbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,51 @@ public async Task Post(
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/EFCoreSample/ItemService/CreateItemController.cs#L12-L41' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_dbcontext_outbox_1' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

### Multiple Flushes in Batching Loops

`IDbContextOutbox<T>` and `IDbContextOutbox` use the same `MessageContext` mechanics as Wolverine's
handler pipeline. By default, `MultiFlushMode` is `OnlyOnce`. That default prevents accidental duplicate
outgoing messages when Wolverine's generated handler pipeline reaches the flush step more than once for
the same incoming message.

If you are using the EF Core outbox directly from application code and intentionally call
`SaveChangesAndFlushMessagesAsync()` more than once on the same scoped outbox instance, opt into multiple
flushes for those calls:

```cs
public async Task SendInBatches(
IDbContextOutbox<ItemsDbContext> outbox,
IReadOnlyList<CreateItemCommand> commands,
CancellationToken cancellation)
{
foreach (var chunk in commands.Chunk(500))
{
foreach (var command in chunk)
{
var item = new Item { Name = command.Name };
outbox.DbContext.Items.Add(item);

await outbox.PublishAsync(new ItemCreated { Id = item.Id });
}

await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples, cancellation);
}
}
```

Without the explicit `AllowMultiples` setting, the first batch would flush normally, but later calls to
`SaveChangesAndFlushMessagesAsync()` in the same scoped outbox instance would be ignored by the default
`OnlyOnce` guard. For example, sending 2,000 messages in four batches of 500 would send the first 500,
then skip the next three flushes. With `AllowMultiples`, each batch flush is honored.

Passing the mode to `SaveChangesAndFlushMessagesAsync()` applies it only to that call and then restores the
outbox's previous mode, so the setting does not leak into other code using the same scoped outbox instance.

This setting only controls whether repeated EF Core outbox flushes are honored. It does not force a broker
transport such as Kafka to send one native producer batch of 500 messages. For Kafka, Wolverine's non-inline
sending path can group outgoing envelopes through its batching sender, but the Kafka sender still produces
the envelopes individually within that Wolverine batch.

Or use the `IDbContextOutbox` as shown below, but in this case you will need to explicitly call `Enroll()` on
the `IDbContextOutbox` to connect the outbox sending to the `DbContext`:

Expand Down
86 changes: 86 additions & 0 deletions src/Persistence/EfCoreTests/end_to_end_efcore_persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,92 @@ public async Task use_generic_outbox_raw()
}
}

[Fact]
public async Task DbContextOutbox_generic_can_opt_into_multiple_save_changes_and_flush_calls_in_one_scope()
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();

var waiter1 = OutboxedMessageHandler.WaitForNextMessage();

using (var nested = Host.Services.CreateScope())
{
var outbox = nested.ServiceProvider.GetRequiredService<IDbContextOutbox<ItemsDbContext>>();
var context = outbox.ShouldBeOfType<DbContextOutbox<ItemsDbContext>>();

context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

outbox.DbContext.Items.Add(new Item { Id = id1, Name = "First" });
await outbox.SendAsync(new OutboxedMessage { Id = id1 });
await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples);
context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

var message1 = await waiter1;
message1.Id.ShouldBe(id1);

var waiter2 = OutboxedMessageHandler.WaitForNextMessage();

outbox.DbContext.Items.Add(new Item { Id = id2, Name = "Second" });
await outbox.SendAsync(new OutboxedMessage { Id = id2 });
await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples);
context.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

var message2 = await waiter2;
message2.Id.ShouldBe(id2);
}

using (var nested = Host.Services.CreateScope())
{
var context = nested.ServiceProvider.GetRequiredService<ItemsDbContext>();
(await context.Items.FindAsync(id1)).ShouldNotBeNull();
(await context.Items.FindAsync(id2)).ShouldNotBeNull();
}
}

[Fact]
public async Task DbContextOutbox_non_generic_can_opt_into_multiple_save_changes_and_flush_calls_in_one_scope()
{
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();

var waiter1 = OutboxedMessageHandler.WaitForNextMessage();

using (var nested = Host.Services.CreateScope())
{
var context = nested.ServiceProvider.GetRequiredService<ItemsDbContext>();
var outbox = nested.ServiceProvider.GetRequiredService<IDbContextOutbox>();
var messageContext = outbox.ShouldBeOfType<DbContextOutbox>();

outbox.Enroll(context);
messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

context.Items.Add(new Item { Id = id1, Name = "First" });
await outbox.SendAsync(new OutboxedMessage { Id = id1 });
await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples);
messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

var message1 = await waiter1;
message1.Id.ShouldBe(id1);

var waiter2 = OutboxedMessageHandler.WaitForNextMessage();

context.Items.Add(new Item { Id = id2, Name = "Second" });
await outbox.SendAsync(new OutboxedMessage { Id = id2 });
await outbox.SaveChangesAndFlushMessagesAsync(MultiFlushMode.AllowMultiples);
messageContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);

var message2 = await waiter2;
message2.Id.ShouldBe(id2);
}

using (var nested = Host.Services.CreateScope())
{
var context = nested.ServiceProvider.GetRequiredService<ItemsDbContext>();
(await context.Items.FindAsync(id1)).ShouldNotBeNull();
(await context.Items.FindAsync(id2)).ShouldNotBeNull();
}
}

[Fact]
public async Task use_generic_outbox_mapped()
{
Expand Down
30 changes: 30 additions & 0 deletions src/Persistence/Wolverine.EntityFrameworkCore/DbContextOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ public async Task SaveChangesAndFlushMessagesAsync(CancellationToken token = def

await FlushOutgoingMessagesAsync();
}

public async Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default)
{
var previous = MultiFlushMode;
MultiFlushMode = multiFlushMode;

try
{
await SaveChangesAndFlushMessagesAsync(token).ConfigureAwait(false);
}
finally
{
MultiFlushMode = previous;
}
}
}

public class DbContextOutbox : MessageContext, IDbContextOutbox
Expand Down Expand Up @@ -74,4 +89,19 @@ public async Task SaveChangesAndFlushMessagesAsync(CancellationToken token = def

await FlushOutgoingMessagesAsync();
}

public async Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default)
{
var previous = MultiFlushMode;
MultiFlushMode = multiFlushMode;

try
{
await SaveChangesAndFlushMessagesAsync(token).ConfigureAwait(false);
}
finally
{
MultiFlushMode = previous;
}
}
}
19 changes: 19 additions & 0 deletions src/Persistence/Wolverine.EntityFrameworkCore/IDbContextOutbox.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.EntityFrameworkCore;
using Wolverine.Runtime;

namespace Wolverine.EntityFrameworkCore;

Expand All @@ -21,6 +22,15 @@ public interface IDbContextOutbox<T> : IMessageBus where T : DbContext
/// <returns></returns>
Task SaveChangesAndFlushMessagesAsync(CancellationToken token = default);

/// <summary>
/// Saves outstanding changes in the DbContext and flushes outbox messages to the
/// sending agents with an explicit multi-flush mode for just this call
/// </summary>
/// <param name="multiFlushMode"></param>
/// <param name="token"></param>
/// <returns></returns>
Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default);

/// <summary>
/// Calling this
/// method will force the outbox to send out any outstanding messages
Expand Down Expand Up @@ -55,6 +65,15 @@ public interface IDbContextOutbox : IMessageBus
/// <returns></returns>
Task SaveChangesAndFlushMessagesAsync(CancellationToken token = default);

/// <summary>
/// Saves outstanding changes in the DbContext and flushes outbox messages to the
/// sending agents with an explicit multi-flush mode for just this call
/// </summary>
/// <param name="multiFlushMode"></param>
/// <param name="token"></param>
/// <returns></returns>
Task SaveChangesAndFlushMessagesAsync(MultiFlushMode multiFlushMode, CancellationToken token = default);

/// <summary>
/// Calling this
/// method will force the outbox to send out any outstanding messages
Expand Down
Loading