Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ await Scenario(x =>
x.StatusCodeShouldBe(404);
});
}

[Fact]
public async Task happy_path_found_both_accounts_append_to_both_with_exclusive_lock()
{
var from = await createAccount(1000);
var to = await createAccount(100);

await Scenario(x =>
{
x.Post.Json(new TransferMoney(from, to, 150)).ToUrl("/accounts/transfer2");
x.StatusCodeShouldBe(204);
});

(await fetchAmount(from)).ShouldBe(850);
(await fetchAmount(to)).ShouldBe(250);
}
}

#region sample_when_transfering_money
Expand Down
13 changes: 12 additions & 1 deletion src/Http/Wolverine.Http/HttpGraph.ParameterMatching.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Reflection;
using JasperFx.CodeGeneration.Frames;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Wolverine.Http.CodeGen;
using Wolverine.Persistence;

namespace Wolverine.Http;

Expand Down Expand Up @@ -57,7 +59,16 @@ internal bool TryMatchParameter(HttpChain chain, MethodCall methodCall, Paramete
{
if (variable.Creator != null)
{
chain.Middleware.Add(variable.Creator);
// TODO -- THIS IS SMELLY!!!!!! Only happens because of IEventStream<T> that does not get
// "mirrored" by LoadEntityFrameBlock
if (chain.Middleware.OfType<LoadEntityFrameBlock>()
.Any(x => object.ReferenceEquals(x.Creator, variable.Creator)))
{
continue;
}

// Do this idempotently!
chain.Middleware.Fill(variable.Creator);
}

methodCall.Arguments[i] = variable;
Expand Down
40 changes: 40 additions & 0 deletions src/Http/WolverineWebApi/Accounts/AccountCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,43 @@ public static void Handle(

#endregion

public static class TransferMoneyHandler2
{
[WolverinePost("/accounts/transfer2")]
public static void Handle(
TransferMoney command,

[WriteAggregate(nameof(TransferMoney.FromId), LoadStyle = ConcurrencyStyle.Exclusive)] IEventStream<Account> fromAccount,

[WriteAggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
{
// Would already 404 if either referenced account does not exist
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}

public record TransferMoney2(Guid FromId, Guid ToId, double Amount, int FromVersion);

public static class TransferMoneyHandler3
{
[WolverinePost("/accounts/transfer3")]
public static void Handle(
TransferMoney command,

[WriteAggregate(nameof(TransferMoney.FromId))] IEventStream<Account> fromAccount,

[WriteAggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
{
// Would already 404 if either referenced account does not exist
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}

This file was deleted.

132 changes: 132 additions & 0 deletions src/Persistence/MartenTests/batch_querying_support.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using IntegrationTests;
using JasperFx.Resources;
using Marten;
using MartenTests.AggregateHandlerWorkflow;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Persistence;

namespace MartenTests;

public class batch_querying_support : PostgresqlContext, IAsyncLifetime
{
private IHost theHost;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(o =>
{
o.Connection(Servers.PostgresConnectionString);
o.DatabaseSchemaName = "batching";
o.DisableNpgsqlLogging = true;

}).UseLightweightSessions().IntegrateWithWolverine();

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
theHost.Dispose();
}

[Fact]
public async Task try_batch_querying_end_to_end()
{
using var session = theHost.DocumentStore().LightweightSession();
var doc1 = new Doc1();
session.Store(doc1);

var doc2 = new Doc2();
session.Store(doc2);

var doc3 = new Doc3{Id = Guid.NewGuid().ToString()};
session.Store(doc3);

await session.SaveChangesAsync();

await theHost.InvokeAsync(new DoStuffWithDocs(doc1.Id, doc2.Id, doc3.Id));
}

[Fact]
public async Task try_batch_querying_with_read_aggregate()
{
using var session = theHost.DocumentStore().LightweightSession();
var doc1 = new Doc1();
session.Store(doc1);

var doc2 = new Doc2();
session.Store(doc2);

var streamId = Guid.NewGuid();
session.Events.StartStream<LetterAggregate>(streamId, new AEvent(), new BEvent(), new BEvent(), new DEvent());

await session.SaveChangesAsync();

await theHost.InvokeAsync(new ReadAggregateWithDocs(doc1.Id, doc2.Id, streamId));
}
}

public record DoStuffWithDocs(Guid Doc1Id, Guid Doc2Id, string Doc3Id);

public static class DoStuffWithDocsHandler
{
public static void Handle(
DoStuffWithDocs command,
[Entity] Doc1 doc1,
[Entity] Doc2 doc2,
[Entity] Doc3 doc3

)
{
doc1.ShouldNotBeNull();
doc2.ShouldNotBeNull();
doc3.ShouldNotBeNull();
}
}

public record ReadAggregateWithDocs(Guid Doc1Id, Guid Doc2Id, Guid LetterAggregateId);

public static class ReadAggregateWithDocsHandler
{
public static void Handle(
ReadAggregateWithDocs message,
[Entity] Doc1 doc1,
[Entity] Doc2 doc2,
[ReadAggregate] LetterAggregate letters
)
{
doc1.ShouldNotBeNull();
doc2.ShouldNotBeNull();
letters.ShouldNotBeNull();

letters.ACount.ShouldBe(1);
letters.BCount.ShouldBe(2);
}
}

public class Doc1
{
public Guid Id { get; set; }
public string Name { get; set; } = "Somebody";
}

public class Doc2
{
public Guid Id { get; set; }
public string Name { get; set; } = "Somebody";
}

public class Doc3
{
public string Id { get; set; }
public string Name { get; set; } = "Somebody";
}

23 changes: 7 additions & 16 deletions src/Persistence/Wolverine.Marten/AggregateHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ internal record AggregateHandling(IDataRequirement Requirement)
public Variable Apply(IChain chain, IServiceContainer container)
{
Store(chain);

new MartenPersistenceFrameProvider().ApplyTransactionSupport(chain, container);

var loader = GenerateLoadAggregateCode(chain);
var loader = new LoadAggregateFrame(this);
chain.Middleware.Add(loader);

var firstCall = chain.HandlerCalls().First();

var eventStream = loader.ReturnVariable!;
var eventStream = loader.Stream!;
if (Parameter != null)
{
eventStream.OverrideName("stream_" + Parameter.Name);
Expand Down Expand Up @@ -87,19 +89,6 @@ public static bool TryLoad(IChain chain, out AggregateHandling handling)
return false;
}

public MethodCall GenerateLoadAggregateCode(IChain chain)
{
if (!chain.Middleware.OfType<EventStoreFrame>().Any())
{
chain.Middleware.Add(new EventStoreFrame());
}

var loader = typeof(LoadAggregateFrame<>).CloseAndBuildAs<MethodCall>(this, AggregateType!);

chain.Middleware.Add(loader);
return loader;
}

internal static (MemberInfo, MemberInfo?) DetermineAggregateIdAndVersion(Type aggregateType, Type commandType,
IServiceContainer container)
{
Expand Down Expand Up @@ -197,12 +186,14 @@ internal Variable RelayAggregateToHandlerMethod(Variable eventStream, IChain cha
{
Variable aggregateVariable = new MemberAccessVariable(eventStream,
typeof(IEventStream<>).MakeGenericType(aggregateType).GetProperty(nameof(IEventStream<string>.Aggregate)));


if (Requirement.Required)
{
var otherFrames = chain.AddStopConditionIfNull(aggregateVariable, AggregateId, Requirement);

var block = new LoadEntityFrameBlock(aggregateVariable, otherFrames);
block.AlsoMirrorAsTheCreator(eventStream);
chain.Middleware.Add(block);

aggregateVariable = block.Mirror;
Expand Down
Loading
Loading