Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
using System.Text.Json.Serialization;
using IntegrationTests;
using JasperFx.Events;
using Marten;
using Marten.Events;
using Microsoft.Extensions.Hosting;
using Shouldly;
using StronglyTypedIds;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.AggregateHandlerWorkflow;

public class strong_named_identifiers : IAsyncLifetime
{
private IHost theHost;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "strong_named";
}).IntegrateWithWolverine();
}).StartAsync();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
}

[Fact]
public async Task use_read_aggregate_by_itself()
{
var streamId = Guid.NewGuid();
using var session = theHost.DocumentStore().LightweightSession();
session.Events.StartStream<StrongLetterAggregate>(streamId, new AEvent(), new BEvent(), new CEvent(),
new CEvent());
await session.SaveChangesAsync();

var bus = theHost.MessageBus();
var aggregate = await bus.InvokeAsync<StrongLetterAggregate>(new FetchCounts(new LetterId(streamId)));

aggregate.ACount.ShouldBe(1);
aggregate.BCount.ShouldBe(1);
aggregate.CCount.ShouldBe(2);
}

[Fact]
public async Task single_usage_of_write_aggregate()
{
var streamId = Guid.NewGuid();
using var session = theHost.DocumentStore().LightweightSession();
session.Events.StartStream<StrongLetterAggregate>(streamId, new AEvent(), new BEvent(), new CEvent(),
new CEvent());
await session.SaveChangesAsync();

await theHost.InvokeAsync(new IncrementStrongA(new LetterId(streamId)));

var bus = theHost.MessageBus();
var aggregate = await bus.InvokeAsync<StrongLetterAggregate>(new FetchCounts(new LetterId(streamId)));

aggregate.ACount.ShouldBe(2);
aggregate.BCount.ShouldBe(1);
aggregate.CCount.ShouldBe(2);
}

[Fact]
public async Task batch_query_usage_of_write_aggregate()
{
var stream1Id = Guid.NewGuid();
var stream2Id = Guid.NewGuid();
using var session = theHost.DocumentStore().LightweightSession();
session.Events.StartStream<StrongLetterAggregate>(stream1Id, new AEvent(), new BEvent(), new CEvent(),
new CEvent());

session.Events.StartStream<StrongLetterAggregate>(stream2Id, new AEvent(), new BEvent(), new BEvent(),
new AEvent());
await session.SaveChangesAsync();

await theHost.InvokeMessageAndWaitAsync(new IncrementBOnBoth(new LetterId(stream1Id), new LetterId(stream2Id)));

var aggregate1 = await session.Events.FetchLatest<StrongLetterAggregate>(stream1Id);
aggregate1.BCount.ShouldBe(2);

var aggregate2 = await session.Events.FetchLatest<StrongLetterAggregate>(stream2Id);
aggregate2.BCount.ShouldBe(3);

}

[Fact]
public async Task batch_query_with_both_read_and_write_aggregate()
{
var stream1Id = Guid.NewGuid();
var stream2Id = Guid.NewGuid();
using var session = theHost.DocumentStore().LightweightSession();
session.Events.StartStream<StrongLetterAggregate>(stream1Id, new AEvent(), new BEvent(), new CEvent(),
new CEvent());

session.Events.StartStream<StrongLetterAggregate>(stream2Id, new AEvent(), new BEvent(), new BEvent(),
new AEvent(), new DEvent());
await session.SaveChangesAsync();

await theHost.InvokeMessageAndWaitAsync(new AddFrom(new LetterId(stream1Id), new LetterId(stream2Id)));

var aggregate1 = await session.Events.FetchLatest<StrongLetterAggregate>(stream1Id);
aggregate1.BCount.ShouldBe(3);
aggregate1.ACount.ShouldBe(3);
aggregate1.DCount.ShouldBe(1);

var aggregate2 = await session.Events.FetchLatest<StrongLetterAggregate>(stream2Id);
aggregate2.BCount.ShouldBe(2);
}


}

public static class StrongLetterHandler
{
public static StrongLetterAggregate Handle(FetchCounts counts,
[ReadAggregate] StrongLetterAggregate aggregate) => aggregate;

public static AEvent Handle(IncrementStrongA command, [WriteAggregate] StrongLetterAggregate aggregate)
{
return new();
}

public static void Handle(
IncrementBOnBoth command,
[WriteAggregate(nameof(IncrementBOnBoth.Id1))] IEventStream<StrongLetterAggregate> stream1,
[WriteAggregate(nameof(IncrementBOnBoth.Id2))] IEventStream<StrongLetterAggregate> stream2
)
{
stream1.AppendOne(new BEvent());
stream2.AppendOne(new BEvent());
}

public static IEnumerable<object> Handle(
AddFrom command,
[WriteAggregate(nameof(AddFrom.Id1))] StrongLetterAggregate _,
[ReadAggregate(nameof(AddFrom.Id2))] StrongLetterAggregate readOnly)
{
for (int i = 0; i < readOnly.ACount; i++)
{
yield return new AEvent();
}

for (int i = 0; i < readOnly.BCount; i++)
{
yield return new BEvent();
}

for (int i = 0; i < readOnly.CCount; i++)
{
yield return new CEvent();
}

for (int i = 0; i < readOnly.DCount; i++)
{
yield return new DEvent();
}
}
}

public record IncrementStrongA(LetterId Id);

public record AddFrom(LetterId Id1, LetterId Id2);

public record IncrementBOnBoth(LetterId Id1, LetterId Id2);

public record FetchCounts(LetterId Id);



[StronglyTypedId(Template.Guid)]
public readonly partial struct LetterId;

public class StrongLetterAggregate
{
public StrongLetterAggregate()
{
}

public LetterId Id { get; set; }
public int ACount { get; set; }
public int BCount { get; set; }
public int CCount { get; set; }
public int DCount { get; set; }

public void Apply(AEvent e)
{
ACount++;
}

public void Apply(BEvent e)
{
BCount++;
}

public void Apply(CEvent e)
{
CCount++;
}

public void Apply(DEvent e)
{
DCount++;
}
}
1 change: 1 addition & 0 deletions src/Persistence/MartenTests/MartenTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="StronglyTypedId" Version="1.0.0-beta08" />
</ItemGroup>

<ItemGroup>
Expand Down
11 changes: 10 additions & 1 deletion src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ internal class FetchLatestAggregateFrame : AsyncFrame, IBatchableFrame

public FetchLatestAggregateFrame(Type aggregateType, Variable identity)
{
_identity = identity;
if (identity.VariableType == typeof(Guid) || identity.VariableType == typeof(string))
{
_identity = identity;
}
else
{
var valueType = ValueTypeInfo.ForType(identity.VariableType);
_identity = new MemberAccessVariable(identity, valueType.ValueProperty);
}

Aggregate = new Variable(aggregateType, this);
}

Expand Down
Loading