Skip to content

Commit 1aaa438

Browse files
committed
automatic support for Marten's new async aggregation side effect model. Closes GH-938. Also 3.0-beta-2
1 parent 5717f1d commit 1aaa438

8 files changed

+172
-4
lines changed

Directory.Build.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<ImplicitUsings>true</ImplicitUsings>
1313
<Nullable>enable</Nullable>
1414
<VersionPrefix>3.0.0</VersionPrefix>
15-
<VersionSuffix>beta-1</VersionSuffix>
15+
<VersionSuffix>beta-2</VersionSuffix>
1616
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
1717
<PublishRepositoryUrl>true</PublishRepositoryUrl>
1818
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System.Diagnostics;
2+
using IntegrationTests;
3+
using JasperFx.Core;
4+
using Marten;
5+
using Marten.Events;
6+
using Marten.Events.Aggregation;
7+
using Marten.Events.Daemon.Resiliency;
8+
using Marten.Events.Projections;
9+
using Marten.Metadata;
10+
using Microsoft.Extensions.Hosting;
11+
using Npgsql;
12+
using Shouldly;
13+
using Weasel.Postgresql;
14+
using Wolverine;
15+
using Wolverine.Marten;
16+
using Wolverine.Tracking;
17+
using Xunit.Sdk;
18+
19+
namespace MartenTests.AsyncDaemonIntegration;
20+
21+
public class end_to_end_publish_messages_through_marten_to_wolverine
22+
{
23+
[Fact]
24+
public async Task can_publish_messages_through_outbox()
25+
{
26+
await dropSchema();
27+
28+
using var host = await Host.CreateDefaultBuilder()
29+
.UseWolverine(opts =>
30+
{
31+
opts.Services.AddMarten(m =>
32+
{
33+
m.Connection(Servers.PostgresConnectionString);
34+
m.DatabaseSchemaName = "wolverine_side_effects";
35+
36+
m.Projections.Add<Projection3>(ProjectionLifecycle.Async);
37+
})
38+
.IntegrateWithWolverine()
39+
.AddAsyncDaemon(DaemonMode.Solo);
40+
41+
opts.Policies.UseDurableLocalQueues();
42+
}).StartAsync();
43+
44+
var streamId = Guid.NewGuid();
45+
46+
Func<IMessageContext, Task> publish = async _ =>
47+
{
48+
using var session = host.DocumentStore().LightweightSession();
49+
session.Events.StartStream<SideEffects1>(streamId, new AEvent(), new AEvent(), new BEvent());
50+
await session.SaveChangesAsync();
51+
};
52+
53+
var tracked = await host
54+
.TrackActivity()
55+
.Timeout(30.Seconds())
56+
.WaitForMessageToBeReceivedAt<GotB>(host)
57+
.ExecuteAndWaitAsync(publish);
58+
59+
tracked.Executed.SingleMessage<GotB>()
60+
.StreamId.ShouldBe(streamId);
61+
}
62+
63+
private static async Task dropSchema()
64+
{
65+
using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
66+
await conn.OpenAsync();
67+
await conn.DropSchemaAsync("wolverine_side_effects");
68+
await conn.CloseAsync();
69+
}
70+
}
71+
72+
public class Projection3: SingleStreamProjection<SideEffects1>
73+
{
74+
public void Apply(SideEffects1 aggregate, AEvent _)
75+
{
76+
aggregate.A++;
77+
}
78+
79+
public void Apply(SideEffects1 aggregate, BEvent _)
80+
{
81+
82+
}
83+
84+
public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<SideEffects1> slice)
85+
{
86+
if (slice.Aggregate != null && slice.Events().OfType<IEvent<BEvent>>().Any())
87+
{
88+
slice.PublishMessage(new GotB(slice.Aggregate.Id));
89+
}
90+
91+
return new ValueTask();
92+
}
93+
}
94+
95+
public record GotB(Guid StreamId);
96+
97+
public static class GotBHandler
98+
{
99+
public static void Handle(GotB message) => Debug.WriteLine("Got B for stream " + message.StreamId);
100+
}
101+
102+
public class SideEffects1: IRevisioned
103+
{
104+
public Guid Id { get; set; }
105+
public int A { get; set; }
106+
public int B { get; set; }
107+
public int C { get; set; }
108+
public int D { get; set; }
109+
public int Version { get; set; }
110+
}

src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using JasperFx.Core.IoC;
33
using JasperFx.Core.Reflection;
44
using Marten;
5+
using Marten.Internal;
56
using Marten.Storage;
67
using Marten.Subscriptions;
78
using Microsoft.Extensions.DependencyInjection;
@@ -56,6 +57,8 @@ public static MartenServiceCollectionExtensions.MartenStoreExpression<T> Integra
5657
"The schema name must be in all lower case characters");
5758
}
5859

60+
expression.Services.AddSingleton<IConfigureMarten<T>, MartenOverrides<T>>();
61+
5962
expression.Services.AddSingleton<IAncillaryMessageStore>(s =>
6063
{
6164
var store = s.GetRequiredService<T>().As<DocumentStore>();

src/Persistence/Wolverine.Marten/MartenIntegration.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using JasperFx.Core.Reflection;
22
using Marten;
33
using Marten.Events;
4+
using Marten.Internal;
45
using Marten.Schema;
56
using Microsoft.Extensions.DependencyInjection;
67
using Wolverine.Marten.Codegen;
78
using Wolverine.Marten.Persistence.Sagas;
9+
using Wolverine.Marten.Publishing;
810
using Wolverine.Persistence.Sagas;
911
using Wolverine.Postgresql.Transport;
1012
using Wolverine.Runtime;
@@ -57,10 +59,12 @@ EventForwardingTransform<T> IEventForwarding.SubscribeToEvent<T>()
5759
}
5860
}
5961

60-
internal class SagasShouldUseNumericRevisions : IConfigureMarten
62+
internal class MartenOverrides : IConfigureMarten
6163
{
6264
public void Configure(IServiceProvider services, StoreOptions options)
6365
{
66+
options.Events.MessageOutbox = new MartenToWolverineOutbox(services);
67+
6468
options.Policies.ForAllDocuments(mapping =>
6569
{
6670
if (mapping.DocumentType.CanBeCastTo<Saga>())
@@ -72,6 +76,8 @@ public void Configure(IServiceProvider services, StoreOptions options)
7276
}
7377
}
7478

79+
internal class MartenOverrides<T> : MartenOverrides, IConfigureMarten<T> where T : IDocumentStore{}
80+
7581
internal class EventWrapperForwarder : IHandledTypeRule
7682
{
7783
public bool TryFindHandledType(Type concreteType, out Type handlerType)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using Marten;
2+
using Marten.Events.Aggregation;
3+
using Marten.Internal.Sessions;
4+
using Marten.Services;
5+
using Wolverine.Runtime;
6+
7+
namespace Wolverine.Marten.Publishing;
8+
9+
internal class MartenToWolverineMessageBatch(MessageContext Context, DocumentSessionBase Session) : IMessageBatch
10+
{
11+
public async ValueTask PublishAsync<T>(T message)
12+
{
13+
await Context.PublishAsync(message);
14+
}
15+
16+
public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
17+
{
18+
return Context.FlushOutgoingMessagesAsync();
19+
}
20+
21+
public Task BeforeCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
22+
{
23+
return Task.CompletedTask;
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Marten.Events.Aggregation;
2+
using Marten.Internal.Sessions;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Wolverine.Runtime;
5+
6+
namespace Wolverine.Marten.Publishing;
7+
8+
internal class MartenToWolverineOutbox : IMessageOutbox
9+
{
10+
private readonly Lazy<IWolverineRuntime> _runtime;
11+
12+
public MartenToWolverineOutbox(IServiceProvider services)
13+
{
14+
_runtime = new Lazy<IWolverineRuntime>(() => services.GetRequiredService<IWolverineRuntime>());
15+
}
16+
17+
public async ValueTask<IMessageBatch> CreateBatch(DocumentSessionBase session)
18+
{
19+
var context = new MessageContext(_runtime.Value, session.TenantId);
20+
await context.EnlistInOutboxAsync(new MartenEnvelopeTransaction(session, context));
21+
22+
return new MartenToWolverineMessageBatch(context, session);
23+
}
24+
}

src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<ProjectReference Include="..\Wolverine.Postgresql\Wolverine.Postgresql.csproj"/>
1313
</ItemGroup>
1414
<ItemGroup>
15-
<PackageReference Include="Marten.CommandLine" Version="7.26.2" />
15+
<PackageReference Include="Marten.CommandLine" Version="7.27.0" />
1616
</ItemGroup>
1717
<ItemGroup>
1818
<Compile Remove="CritterStackHostBuilder.cs" />

src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In
9494
expression.Services.AddType(typeof(IDatabaseSource), typeof(MartenMessageDatabaseDiscovery),
9595
ServiceLifetime.Singleton);
9696

97-
expression.Services.AddSingleton<IConfigureMarten, SagasShouldUseNumericRevisions>();
97+
expression.Services.AddSingleton<IConfigureMarten, MartenOverrides>();
9898

9999
expression.Services.AddSingleton<IWolverineExtension>(new MartenIntegration
100100
{

0 commit comments

Comments
 (0)