diff --git a/docs/guide/http/marten.md b/docs/guide/http/marten.md index da0d484d2..c37ad2710 100644 --- a/docs/guide/http/marten.md +++ b/docs/guide/http/marten.md @@ -344,6 +344,17 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command, snippet source | anchor +If you should happen to have a message handler or HTTP endpoint signature that uses multiple event streams, +but you want the `UpdatedAggregate` to **only** apply to one of the streams, you can use the `UpdatedAggregate` +to tip off Wolverine about that like in this sample: + +snippet: sample_MakePurchaseHandler + +::: info +Wolverine can't (yet) handle a signature with multiple event streams of the same aggregate type and +`UpdatedAggregate`. +::: + ## Reading the Latest Version of an Aggregate ::: info diff --git a/src/Persistence/MartenTests/AggregateHandlerWorkflow/mixed_aggregate_handler_with_multiple_streams.cs b/src/Persistence/MartenTests/AggregateHandlerWorkflow/mixed_aggregate_handler_with_multiple_streams.cs new file mode 100644 index 000000000..703ce5ea8 --- /dev/null +++ b/src/Persistence/MartenTests/AggregateHandlerWorkflow/mixed_aggregate_handler_with_multiple_streams.cs @@ -0,0 +1,115 @@ +using IntegrationTests; +using Marten; +using Marten.Events; +using Marten.Events.Projections; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.AggregateHandlerWorkflow; + +public class mixed_aggregate_handler_with_multiple_streams +{ + [Fact] + public async Task get_the_correct_aggregate_back_out() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + MartenServiceCollectionExtensions.AddMarten(opts.Services, m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "accounts"; + + m.Projections.Snapshot(SnapshotLifecycle.Inline); + m.Projections.Snapshot(SnapshotLifecycle.Inline); + }).IntegrateWithWolverine(); + }).StartAsync(); + + using var session = host.DocumentStore().LightweightSession(); + var inventoryId = session.Events.StartStream(new InventoryStarted("XFX", 100, 10)).Id; + var accountId = session.Events.StartStream(new XAccountOpened(2000)).Id; + await session.SaveChangesAsync(); + + var (tracked, account) = await host.InvokeMessageAndWaitAsync(new MakePurchase(accountId, inventoryId, 30)); + account.Balance.ShouldBe(1700); + + } +} + +public record XAccountOpened(double Balance); + +public record ItemPurchased(Guid InventoryId, int Number, double UnitPrice); + +public class XAccount +{ + public Guid Id { get; set; } + public double Balance { get; set; } + + public XAccount() + { + } + + public static XAccount Create(XAccountOpened opened) => new XAccount { Balance = opened.Balance }; + + public void Apply(ItemPurchased purchased) + { + Balance -= (purchased.Number * purchased.UnitPrice); + } +} + +public record InventoryStarted(string Name, int Quantity, double UnitPrice); + +public record Drawdown(int Quantity); + +public class Inventory +{ + public Guid Id { get; set; } + public string Name { get; set; } + public int Quantity { get; set; } + public double UnitPrice { get; set; } + + public static Inventory Create(InventoryStarted started) => new Inventory + { + Name = started.Name, + Quantity = started.Quantity, + UnitPrice = started.UnitPrice + }; + + public void Apply(Drawdown down) => Quantity -= down.Quantity; +} + +public record MakePurchase(Guid XAccountId, Guid InventoryId, int Number); + +#region sample_MakePurchaseHandler + +public static class MakePurchaseHandler +{ + // See how we used the generic version + // of UpdatedAggregate to tell Wolverine we + // want *only* the XAccount as the response + // from this handler + public static UpdatedAggregate Handle( + MakePurchase command, + + [WriteAggregate] IEventStream account, + + [WriteAggregate] IEventStream inventory) + { + if (command.Number > inventory.Aggregate.Quantity || + (command.Number * inventory.Aggregate.UnitPrice) > account.Aggregate.Balance) + { + // Do Nothing! + return new UpdatedAggregate(); + } + + account.AppendOne(new ItemPurchased(command.InventoryId, command.Number, inventory.Aggregate.UnitPrice)); + inventory.AppendOne(new Drawdown(command.Number)); + + return new UpdatedAggregate(); + } +} + +#endregion \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/AggregateHandling.cs b/src/Persistence/Wolverine.Marten/AggregateHandling.cs index 85ce56cf7..e96843259 100644 --- a/src/Persistence/Wolverine.Marten/AggregateHandling.cs +++ b/src/Persistence/Wolverine.Marten/AggregateHandling.cs @@ -71,7 +71,23 @@ public Variable Apply(IChain chain, IServiceContainer container) public void Store(IChain chain) { - chain.Tags[nameof(AggregateHandling)] = this; + if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw)) + { + if (raw is AggregateHandling handling) + { + if (ReferenceEquals(handling, this)) return; + + chain.Tags[nameof(AggregateHandling)] = new List { handling, this }; + } + else if (raw is List list) + { + list.Add(this); + } + } + else + { + chain.Tags[nameof(AggregateHandling)] = this; + } } public static bool TryLoad(IChain chain, out AggregateHandling handling) @@ -88,6 +104,27 @@ public static bool TryLoad(IChain chain, out AggregateHandling handling) handling = default; return false; } + + public static bool TryLoad(IChain chain, out AggregateHandling handling) + { + if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw)) + { + if (raw is AggregateHandling h && h.AggregateType == typeof(T)) + { + handling = h; + return true; + } + + if (raw is List list) + { + handling = list.FirstOrDefault(x => x.AggregateType == typeof(T)); + return handling != null; + } + } + + handling = default; + return false; + } internal static (MemberInfo, MemberInfo?) DetermineAggregateIdAndVersion(Type aggregateType, Type commandType, IServiceContainer container) diff --git a/src/Persistence/Wolverine.Marten/UpdatedAggregate.cs b/src/Persistence/Wolverine.Marten/UpdatedAggregate.cs index f3c50f33c..8d0e69b22 100644 --- a/src/Persistence/Wolverine.Marten/UpdatedAggregate.cs +++ b/src/Persistence/Wolverine.Marten/UpdatedAggregate.cs @@ -38,6 +38,36 @@ public static void ConfigureResponse(IChain chain) } } +/// +/// Use this as a response from a message handler +/// or HTTP endpoint using the aggregate handler workflow +/// to response with the updated version of the aggregate being +/// altered *after* any new events have been applied +/// +/// The aggregate type. Use this version of UpdatedAggregate if you need to help Wolverine "know" which of multiple event streams should be the "updated aggregate" +public class UpdatedAggregate : IResponseAware +{ + public static void ConfigureResponse(IChain chain) + { + if (AggregateHandling.TryLoad(chain, out var handling)) + { + var idType = handling.AggregateId.VariableType; + + // TODO -- with https://github.com/JasperFx/wolverine/issues/1167, this might need to try to create value + // type first + var openType = idType == typeof(Guid) ? typeof(FetchLatestByGuid<>) : typeof(FetchLatestByString<>); + var frame = openType.CloseAndBuildAs(handling.AggregateId, handling.AggregateType); + + chain.UseForResponse(frame); + } + else + { + throw new InvalidOperationException($"UpdatedAggregate cannot be used because Chain {chain} is not marked as an aggregate handler. Are you missing an [AggregateHandler] or [Aggregate] attribute on the handler?"); + } + + } +} + internal class FetchLatestByGuid : MethodCall where T : class { public FetchLatestByGuid(Variable id) : base(typeof(IEventStoreOperations), ReflectionHelper.GetMethod(x => x.FetchLatest(Guid.Empty, CancellationToken.None))) diff --git a/src/Wolverine/Runtime/Handlers/HandlerChain.cs b/src/Wolverine/Runtime/Handlers/HandlerChain.cs index c6ad5e531..f14f15685 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerChain.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerChain.cs @@ -528,9 +528,7 @@ protected void applyCustomizations(GenerationRules rules, IServiceContainer cont if (!_hasConfiguredFrames) { _hasConfiguredFrames = true; - - applyAttributesAndConfigureMethods(rules, container); - + foreach (var attribute in MessageType .GetCustomAttributes(typeof(ModifyHandlerChainAttribute)) .OfType()) attribute.Modify(this, rules); @@ -538,8 +536,13 @@ protected void applyCustomizations(GenerationRules rules, IServiceContainer cont foreach (var attribute in MessageType.GetCustomAttributes(typeof(ModifyChainAttribute)) .OfType()) attribute.Modify(this, rules, container); + // THIS has to go before the baseline attributes and configure foreach (var handlerCall in HandlerCalls()) WolverineParameterAttribute.TryApply(handlerCall, container, rules, this); + + applyAttributesAndConfigureMethods(rules, container); + + } ApplyImpliedMiddlewareFromHandlers(rules);