Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using Marten;
using Shouldly;
using WolverineWebApi.Accounts;

namespace Wolverine.Http.Tests.Marten;

public class working_against_multiple_streams : IntegrationContext
{
public working_against_multiple_streams(AppFixture fixture) : base(fixture)
{
}

private async Task<Guid> createAccount(double amount)
{
var created = new AccountCreated(amount);
using var session = Host.DocumentStore().LightweightSession();
var id = session.Events.StartStream<Account>(created).Id;
await session.SaveChangesAsync();
return id;
}

private async Task<double> fetchAmount(Guid id)
{
using var session = Host.DocumentStore().LightweightSession();
var account = await session.Events.FetchLatest<Account>(id);
return account.Amount;
}

[Fact]
public async Task happy_path_found_both_accounts_append_to_both()
{
var from = await createAccount(1000);
var to = await createAccount(100);

await Scenario(x =>
{
x.Post.Json(new TransferMoney(from, to, 150)).ToUrl("/accounts/transfer");
x.StatusCodeShouldBe(204);
});

(await fetchAmount(from)).ShouldBe(850);
(await fetchAmount(to)).ShouldBe(250);
}

[Fact]
public async Task reject_when_the_from_does_not_have_enough_funds()
{
var from = await createAccount(1000);
var to = await createAccount(100);

await Scenario(x =>
{
x.Post.Json(new TransferMoney(from, to, 2000)).ToUrl("/accounts/transfer");
x.StatusCodeShouldBe(204);
});

(await fetchAmount(from)).ShouldBe(1000);
(await fetchAmount(to)).ShouldBe(100);
}

[Fact]
public async Task return_404_when_first_account_does_not_exist()
{
//var from = await createAccount(1000);
var to = await createAccount(100);

await Scenario(x =>
{
x.Post.Json(new TransferMoney(Guid.NewGuid(), to, 2000)).ToUrl("/accounts/transfer");
x.StatusCodeShouldBe(404);
});
}

[Fact]
public async Task return_404_when_second_account_does_not_exist()
{
var from = await createAccount(1000);
//var to = await createAccount(100);

await Scenario(x =>
{
x.Post.Json(new TransferMoney(from, Guid.NewGuid(), 2000)).ToUrl("/accounts/transfer");
x.StatusCodeShouldBe(404);
});
}
}
43 changes: 43 additions & 0 deletions src/Http/WolverineWebApi/Accounts/AccountCode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using JasperFx.Events;
using Marten.Events;
using Wolverine.Http;
using Wolverine.Http.Marten;

namespace WolverineWebApi.Accounts;

public record AccountCreated(double InitialAmount);
public record Debited(double Amount);
public record Withdrawn(double Amount);

public class Account
{
public Guid Id { get; set; }
public double Amount { get; set; }

public static Account Create(IEvent<AccountCreated> e)
=> new Account { Id = e.StreamId, Amount = e.Data.InitialAmount};

public void Apply(Debited e) => Amount += e.Amount;
public void Apply(Withdrawn e) => Amount -= e.Amount;
}

public record TransferMoney(Guid FromId, Guid ToId, double Amount);

public static class TransferMoneyEndpoint
{
[WolverinePost("/accounts/transfer")]
public static void Post(
TransferMoney command,

[Aggregate(nameof(TransferMoney.FromId))] IEventStream<Account> fromAccount,

[Aggregate(nameof(TransferMoney.ToId))] IEventStream<Account> toAccount)
{
// Would already 404 if either referenced account does not exist
if (fromAccount.Aggregate.Amount >= command.Amount)
{
fromAccount.AppendOne(new Withdrawn(command.Amount));
toAccount.AppendOne(new Debited(command.Amount));
}
}
}
20 changes: 19 additions & 1 deletion src/Persistence/Wolverine.Marten/AggregateHandling.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Reflection;
using ImTools;
using JasperFx.CodeGeneration.Frames;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core;
Expand Down Expand Up @@ -29,6 +30,7 @@ internal record AggregateHandling(IDataRequirement Requirement)

public ConcurrencyStyle LoadStyle { get; init; }
public Variable? Version { get; init; }
public ParameterInfo? Parameter { get; set; }

public Variable Apply(IChain chain, IServiceContainer container)
{
Expand All @@ -40,6 +42,10 @@ public Variable Apply(IChain chain, IServiceContainer container)
var firstCall = chain.HandlerCalls().First();

var eventStream = loader.ReturnVariable!;
if (Parameter != null)
{
eventStream.OverrideName("stream_" + Parameter.Name);
}

if (AggregateType == firstCall.HandlerType)
{
Expand All @@ -52,6 +58,11 @@ public Variable Apply(IChain chain, IServiceContainer container)
ValidateMethodSignatureForEmittedEvents(chain, firstCall, chain);
var aggregate = RelayAggregateToHandlerMethod(eventStream, chain, firstCall, AggregateType);

if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>)))
{
return eventStream;
}

return aggregate;
}

Expand Down Expand Up @@ -203,7 +214,14 @@ internal Variable RelayAggregateToHandlerMethod(Variable eventStream, IChain cha
}
else
{
firstCall.TrySetArgument(aggregateVariable);
if (!firstCall.TrySetArgument(aggregateVariable))
{
if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>)))
{
var index = firstCall.Method.GetParameters().IndexOf(x => x.Name == Parameter.Name);
firstCall.Arguments[index] = eventStream;
}
};
}

foreach (var methodCall in chain.Middleware.OfType<MethodCall>())
Expand Down
16 changes: 8 additions & 8 deletions src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten;
using Marten.Events;
using Wolverine.Attributes;
using Wolverine.Configuration;
using Wolverine.Persistence;
Expand Down Expand Up @@ -41,19 +42,17 @@ public WriteAggregateAttribute(string? routeOrParameterName)

public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceContainer container, GenerationRules rules)
{
// TODO -- this goes away soon-ish
if (chain.HandlerCalls().First().Method.GetParameters().Count(x => x.HasAttribute<WriteAggregateAttribute>()) > 1)
{
throw new InvalidOperationException(
"It is only possible (today) to use a single [Aggregate] or [WriteAggregate] attribute on an HTTP handler method. Maybe use [ReadAggregate] if all you need is the projected data");
}

var aggregateType = parameter.ParameterType;
if (aggregateType.IsNullable())
{
aggregateType = aggregateType.GetInnerTypeFromNullable();
}

if (aggregateType.Closes(typeof(IEventStream<>)))
{
aggregateType = aggregateType.GetGenericArguments()[0];
}

var store = container.GetInstance<IDocumentStore>();
var idType = store.Options.FindOrResolveDocumentType(aggregateType).IdType;

Expand All @@ -75,7 +74,8 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC
AggregateType = aggregateType,
AggregateId = identity,
LoadStyle = LoadStyle,
Version = version
Version = version,
Parameter = parameter
};

return handling.Apply(chain, container);
Expand Down
Loading