Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>6.4.3</Version>
<Version>6.4.4</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
103 changes: 103 additions & 0 deletions src/Persistence/MartenTests/single_marten_op_side_effect_persists.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using IntegrationTests;
using JasperFx.Resources;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests;

// GH-3025: a handler returning a SINGLE IMartenOp (e.g. MartenOps.StartStream / MartenOps.Store)
// must persist even without opts.Policies.AutoApplyTransactions(). Previously MartenOpPolicy only
// applied Marten transaction support (the SaveChangesAsync postprocessor) for IEnumerable<IMartenOp>
// returns, so a single op was Execute()'d onto the session and then silently dropped.
public class single_marten_op_side_effect_persists : PostgresqlContext, IAsyncLifetime
{
private IHost theHost = null!;
private IDocumentStore theStore = null!;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "single_op_3025";
m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine();

// Deliberately NO opts.Policies.AutoApplyTransactions().
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(StartViaOpHandler))
.IncludeType(typeof(StoreViaOpHandler));
opts.Durability.Mode = DurabilityMode.Solo;
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

theStore = theHost.Services.GetRequiredService<IDocumentStore>();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
theHost.Dispose();
}

[Fact]
public async Task single_start_stream_op_persists_without_auto_transactions()
{
var id = Guid.NewGuid();
await theHost.InvokeMessageAndWaitAsync(new StartViaOp(id));

await using var session = theStore.LightweightSession();
var events = await session.Events.FetchStreamAsync(id);
events.Count.ShouldBe(1); // was 0 (op dropped) before GH-3025
}

[Fact]
public async Task single_store_op_persists_without_auto_transactions()
{
var id = Guid.NewGuid();
await theHost.InvokeMessageAndWaitAsync(new StoreViaOp(id));

await using var session = theStore.LightweightSession();
(await session.LoadAsync<OpDoc>(id)).ShouldNotBeNull();
}
}

public record StartViaOp(Guid Id);

public record OpStarted(string Name);

public class OpTally
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
public void Apply(OpStarted e) => Name = e.Name;
}

public static class StartViaOpHandler
{
public static IMartenOp Handle(StartViaOp command)
=> MartenOps.StartStream<OpTally>(command.Id, new OpStarted("created"));
}

public record StoreViaOp(Guid Id);

public class OpDoc
{
public Guid Id { get; set; }
}

public static class StoreViaOpHandler
{
public static IMartenOp Handle(StoreViaOp command)
=> MartenOps.Store(new OpDoc { Id = command.Id });
}
20 changes: 16 additions & 4 deletions src/Persistence/Wolverine.Marten/IMartenOp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,25 @@ public void Apply(IReadOnlyList<IChain> chains, GenerationRules rules, IServiceC
{
foreach (var chain in chains)
{
var candidates = chain.ReturnVariablesOfType<IEnumerable<IMartenOp>>().ToArray();
if (candidates.Any())
var collections = chain.ReturnVariablesOfType<IEnumerable<IMartenOp>>().ToArray();

// A single IMartenOp return is an ISideEffect — Wolverine already generates the Execute()
// call — but it still needs Marten transaction support so SaveChangesAsync actually runs.
// Without it the op is applied to the session and then silently dropped unless the app
// happens to have AutoApplyTransactions enabled. Collections were already handled below;
// single returns were not, so e.g. a handler returning MartenOps.StartStream(...) never
// persisted. ApplyTransactionSupport is idempotent, so this composes with aggregate
// handlers / AutoApplyTransactions. GH-3025.
var singles = chain.ReturnVariablesOfType<IMartenOp>().ToArray();

if (collections.Any() || singles.Any())
{
new MartenPersistenceFrameProvider().ApplyTransactionSupport(chain, container);
}

foreach (var collection in candidates)

// Only collections need the explicit foreach-Execute frame; single IMartenOp returns are
// executed by the ISideEffect machinery (adding a frame here would double-execute).
foreach (var collection in collections)
{
collection.UseReturnAction(v => new ForEachMartenOpFrame(v));
}
Expand Down