diff --git a/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs b/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs new file mode 100644 index 000000000..fe0e2a66a --- /dev/null +++ b/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs @@ -0,0 +1,86 @@ +using Marten; +using Shouldly; +using WolverineWebApi.Accounts; + +namespace Wolverine.Http.Tests.Marten; + +public class working_against_multiple_streams : IntegrationContext +{ + public working_against_multiple_streams(AppFixture fixture) : base(fixture) + { + } + + private async Task createAccount(double amount) + { + var created = new AccountCreated(amount); + using var session = Host.DocumentStore().LightweightSession(); + var id = session.Events.StartStream(created).Id; + await session.SaveChangesAsync(); + return id; + } + + private async Task fetchAmount(Guid id) + { + using var session = Host.DocumentStore().LightweightSession(); + var account = await session.Events.FetchLatest(id); + return account.Amount; + } + + [Fact] + public async Task happy_path_found_both_accounts_append_to_both() + { + var from = await createAccount(1000); + var to = await createAccount(100); + + await Scenario(x => + { + x.Post.Json(new TransferMoney(from, to, 150)).ToUrl("/accounts/transfer"); + x.StatusCodeShouldBe(204); + }); + + (await fetchAmount(from)).ShouldBe(850); + (await fetchAmount(to)).ShouldBe(250); + } + + [Fact] + public async Task reject_when_the_from_does_not_have_enough_funds() + { + var from = await createAccount(1000); + var to = await createAccount(100); + + await Scenario(x => + { + x.Post.Json(new TransferMoney(from, to, 2000)).ToUrl("/accounts/transfer"); + x.StatusCodeShouldBe(204); + }); + + (await fetchAmount(from)).ShouldBe(1000); + (await fetchAmount(to)).ShouldBe(100); + } + + [Fact] + public async Task return_404_when_first_account_does_not_exist() + { + //var from = await createAccount(1000); + var to = await createAccount(100); + + await Scenario(x => + { + x.Post.Json(new TransferMoney(Guid.NewGuid(), to, 2000)).ToUrl("/accounts/transfer"); + x.StatusCodeShouldBe(404); + }); + } + + [Fact] + public async Task return_404_when_second_account_does_not_exist() + { + var from = await createAccount(1000); + //var to = await createAccount(100); + + await Scenario(x => + { + x.Post.Json(new TransferMoney(from, Guid.NewGuid(), 2000)).ToUrl("/accounts/transfer"); + x.StatusCodeShouldBe(404); + }); + } +} \ No newline at end of file diff --git a/src/Http/WolverineWebApi/Accounts/AccountCode.cs b/src/Http/WolverineWebApi/Accounts/AccountCode.cs new file mode 100644 index 000000000..ea9becc95 --- /dev/null +++ b/src/Http/WolverineWebApi/Accounts/AccountCode.cs @@ -0,0 +1,43 @@ +using JasperFx.Events; +using Marten.Events; +using Wolverine.Http; +using Wolverine.Http.Marten; + +namespace WolverineWebApi.Accounts; + +public record AccountCreated(double InitialAmount); +public record Debited(double Amount); +public record Withdrawn(double Amount); + +public class Account +{ + public Guid Id { get; set; } + public double Amount { get; set; } + + public static Account Create(IEvent e) + => new Account { Id = e.StreamId, Amount = e.Data.InitialAmount}; + + public void Apply(Debited e) => Amount += e.Amount; + public void Apply(Withdrawn e) => Amount -= e.Amount; +} + +public record TransferMoney(Guid FromId, Guid ToId, double Amount); + +public static class TransferMoneyEndpoint +{ + [WolverinePost("/accounts/transfer")] + public static void Post( + TransferMoney command, + + [Aggregate(nameof(TransferMoney.FromId))] IEventStream fromAccount, + + [Aggregate(nameof(TransferMoney.ToId))] IEventStream toAccount) + { + // Would already 404 if either referenced account does not exist + if (fromAccount.Aggregate.Amount >= command.Amount) + { + fromAccount.AppendOne(new Withdrawn(command.Amount)); + toAccount.AppendOne(new Debited(command.Amount)); + } + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/AggregateHandling.cs b/src/Persistence/Wolverine.Marten/AggregateHandling.cs index f513d7c70..9b241bdc1 100644 --- a/src/Persistence/Wolverine.Marten/AggregateHandling.cs +++ b/src/Persistence/Wolverine.Marten/AggregateHandling.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using System.Reflection; +using ImTools; using JasperFx.CodeGeneration.Frames; using JasperFx.CodeGeneration.Model; using JasperFx.Core; @@ -29,6 +30,7 @@ internal record AggregateHandling(IDataRequirement Requirement) public ConcurrencyStyle LoadStyle { get; init; } public Variable? Version { get; init; } + public ParameterInfo? Parameter { get; set; } public Variable Apply(IChain chain, IServiceContainer container) { @@ -40,6 +42,10 @@ public Variable Apply(IChain chain, IServiceContainer container) var firstCall = chain.HandlerCalls().First(); var eventStream = loader.ReturnVariable!; + if (Parameter != null) + { + eventStream.OverrideName("stream_" + Parameter.Name); + } if (AggregateType == firstCall.HandlerType) { @@ -52,6 +58,11 @@ public Variable Apply(IChain chain, IServiceContainer container) ValidateMethodSignatureForEmittedEvents(chain, firstCall, chain); var aggregate = RelayAggregateToHandlerMethod(eventStream, chain, firstCall, AggregateType); + if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>))) + { + return eventStream; + } + return aggregate; } @@ -203,7 +214,14 @@ internal Variable RelayAggregateToHandlerMethod(Variable eventStream, IChain cha } else { - firstCall.TrySetArgument(aggregateVariable); + if (!firstCall.TrySetArgument(aggregateVariable)) + { + if (Parameter != null && Parameter.ParameterType.Closes(typeof(IEventStream<>))) + { + var index = firstCall.Method.GetParameters().IndexOf(x => x.Name == Parameter.Name); + firstCall.Arguments[index] = eventStream; + } + }; } foreach (var methodCall in chain.Middleware.OfType()) diff --git a/src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs b/src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs index dc877958f..94fe47fac 100644 --- a/src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs +++ b/src/Persistence/Wolverine.Marten/WriteAggregateAttribute.cs @@ -4,6 +4,7 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using Marten; +using Marten.Events; using Wolverine.Attributes; using Wolverine.Configuration; using Wolverine.Persistence; @@ -41,19 +42,17 @@ public WriteAggregateAttribute(string? routeOrParameterName) public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceContainer container, GenerationRules rules) { - // TODO -- this goes away soon-ish - if (chain.HandlerCalls().First().Method.GetParameters().Count(x => x.HasAttribute()) > 1) - { - throw new InvalidOperationException( - "It is only possible (today) to use a single [Aggregate] or [WriteAggregate] attribute on an HTTP handler method. Maybe use [ReadAggregate] if all you need is the projected data"); - } - var aggregateType = parameter.ParameterType; if (aggregateType.IsNullable()) { aggregateType = aggregateType.GetInnerTypeFromNullable(); } + if (aggregateType.Closes(typeof(IEventStream<>))) + { + aggregateType = aggregateType.GetGenericArguments()[0]; + } + var store = container.GetInstance(); var idType = store.Options.FindOrResolveDocumentType(aggregateType).IdType; @@ -75,7 +74,8 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC AggregateType = aggregateType, AggregateId = identity, LoadStyle = LoadStyle, - Version = version + Version = version, + Parameter = parameter }; return handling.Apply(chain, container);