From 343ccd2ad9f1dc11e3a36abdf3f545afe9b12970 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 23 Sep 2025 07:01:12 -0500 Subject: [PATCH] Using Marten batch querying for code generation when necessary. Closes GH-1560 Boom, able to support the Marten batch querying for the aggregate handler workflow too WIP: not quite working again WIP: work on batching aggregate handler workflow --- .../working_against_multiple_streams.cs | 16 +++ .../HttpGraph.ParameterMatching.cs | 13 +- .../WolverineWebApi/Accounts/AccountCode.cs | 40 ++++++ .../ReservationTimeoutHandler457905910.cs | 75 ---------- .../MartenTests/batch_querying_support.cs | 132 ++++++++++++++++++ .../Wolverine.Marten/AggregateHandling.cs | 23 +-- .../Codegen/LoadAggregateFrame.cs | 105 ++++++++++---- .../Codegen/MartenQueryingFrame.cs | 111 +++++++++++++++ .../Codegen/SessionVariableSource.cs | 41 ++++++ .../Wolverine.Marten/MartenIntegration.cs | 3 + .../Persistence/Sagas/LoadDocumentFrame.cs | 41 +++++- .../ReadAggregateAttribute.cs | 38 ++++- src/Wolverine/Persistence/EntityAttribute.cs | 21 ++- src/Wolverine/Wolverine.csproj | 2 +- 14 files changed, 531 insertions(+), 130 deletions(-) delete mode 100644 src/Http/WolverineWebApi/Internal/Generated/WolverineHandlers/ReservationTimeoutHandler457905910.cs create mode 100644 src/Persistence/MartenTests/batch_querying_support.cs create mode 100644 src/Persistence/Wolverine.Marten/Codegen/MartenQueryingFrame.cs diff --git a/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs b/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs index b9eecc7e9..0c5206aae 100644 --- a/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs +++ b/src/Http/Wolverine.Http.Tests/Marten/working_against_multiple_streams.cs @@ -84,6 +84,22 @@ await Scenario(x => x.StatusCodeShouldBe(404); }); } + + [Fact] + public async Task happy_path_found_both_accounts_append_to_both_with_exclusive_lock() + { + var from = await createAccount(1000); + var to = await createAccount(100); + + await Scenario(x => + { + x.Post.Json(new TransferMoney(from, to, 150)).ToUrl("/accounts/transfer2"); + x.StatusCodeShouldBe(204); + }); + + (await fetchAmount(from)).ShouldBe(850); + (await fetchAmount(to)).ShouldBe(250); + } } #region sample_when_transfering_money diff --git a/src/Http/Wolverine.Http/HttpGraph.ParameterMatching.cs b/src/Http/Wolverine.Http/HttpGraph.ParameterMatching.cs index 26b3b8947..483e82f9e 100644 --- a/src/Http/Wolverine.Http/HttpGraph.ParameterMatching.cs +++ b/src/Http/Wolverine.Http/HttpGraph.ParameterMatching.cs @@ -1,8 +1,10 @@ using System.Reflection; using JasperFx.CodeGeneration.Frames; using JasperFx.CodeGeneration.Model; +using JasperFx.Core; using JasperFx.Core.Reflection; using Wolverine.Http.CodeGen; +using Wolverine.Persistence; namespace Wolverine.Http; @@ -57,7 +59,16 @@ internal bool TryMatchParameter(HttpChain chain, MethodCall methodCall, Paramete { if (variable.Creator != null) { - chain.Middleware.Add(variable.Creator); + // TODO -- THIS IS SMELLY!!!!!! Only happens because of IEventStream that does not get + // "mirrored" by LoadEntityFrameBlock + if (chain.Middleware.OfType() + .Any(x => object.ReferenceEquals(x.Creator, variable.Creator))) + { + continue; + } + + // Do this idempotently! + chain.Middleware.Fill(variable.Creator); } methodCall.Arguments[i] = variable; diff --git a/src/Http/WolverineWebApi/Accounts/AccountCode.cs b/src/Http/WolverineWebApi/Accounts/AccountCode.cs index 0b4feb8fa..3cd3a6e86 100644 --- a/src/Http/WolverineWebApi/Accounts/AccountCode.cs +++ b/src/Http/WolverineWebApi/Accounts/AccountCode.cs @@ -55,3 +55,43 @@ public static void Handle( #endregion +public static class TransferMoneyHandler2 +{ + [WolverinePost("/accounts/transfer2")] + public static void Handle( + TransferMoney command, + + [WriteAggregate(nameof(TransferMoney.FromId), LoadStyle = ConcurrencyStyle.Exclusive)] IEventStream fromAccount, + + [WriteAggregate(nameof(TransferMoney.ToId))] IEventStream toAccount) + { + // Would already 404 if either referenced account does not exist + if (fromAccount.Aggregate.Amount >= command.Amount) + { + fromAccount.AppendOne(new Withdrawn(command.Amount)); + toAccount.AppendOne(new Debited(command.Amount)); + } + } +} + +public record TransferMoney2(Guid FromId, Guid ToId, double Amount, int FromVersion); + +public static class TransferMoneyHandler3 +{ + [WolverinePost("/accounts/transfer3")] + public static void Handle( + TransferMoney command, + + [WriteAggregate(nameof(TransferMoney.FromId))] IEventStream fromAccount, + + [WriteAggregate(nameof(TransferMoney.ToId))] IEventStream toAccount) + { + // Would already 404 if either referenced account does not exist + if (fromAccount.Aggregate.Amount >= command.Amount) + { + fromAccount.AppendOne(new Withdrawn(command.Amount)); + toAccount.AppendOne(new Debited(command.Amount)); + } + } +} + diff --git a/src/Http/WolverineWebApi/Internal/Generated/WolverineHandlers/ReservationTimeoutHandler457905910.cs b/src/Http/WolverineWebApi/Internal/Generated/WolverineHandlers/ReservationTimeoutHandler457905910.cs deleted file mode 100644 index c0db20dcd..000000000 --- a/src/Http/WolverineWebApi/Internal/Generated/WolverineHandlers/ReservationTimeoutHandler457905910.cs +++ /dev/null @@ -1,75 +0,0 @@ -// -#pragma warning disable -using Microsoft.Extensions.Logging; -using Wolverine.Marten.Publishing; - -namespace Internal.Generated.WolverineHandlers -{ - // START: ReservationTimeoutHandler457905910 - [global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")] - public sealed class ReservationTimeoutHandler457905910 : Wolverine.Runtime.Handlers.MessageHandler - { - private readonly Microsoft.Extensions.Logging.ILogger _logger; - private readonly Wolverine.Marten.Publishing.OutboxedSessionFactory _outboxedSessionFactory; - - public ReservationTimeoutHandler457905910(Microsoft.Extensions.Logging.ILogger logger, Wolverine.Marten.Publishing.OutboxedSessionFactory outboxedSessionFactory) - { - _logger = logger; - _outboxedSessionFactory = outboxedSessionFactory; - } - - - - public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation) - { - // Building the Marten session - await using var documentSession = _outboxedSessionFactory.OpenSession(context); - // The actual message body - var reservationTimeout = (WolverineWebApi.ReservationTimeout)context.Envelope.Message; - - string sagaId = context.Envelope.SagaId ?? reservationTimeout.Id; - if (string.IsNullOrEmpty(sagaId)) throw new Wolverine.Persistence.Sagas.IndeterminateSagaStateIdException(context.Envelope); - - // Try to load the existing saga document - var reservation = await documentSession.LoadAsync(sagaId, cancellation).ConfigureAwait(false); - if (reservation == null) - { - return; - } - - else - { - - // The actual message execution - reservation.Handle(reservationTimeout, _logger); - - // Delete the saga if completed, otherwise update it - if (reservation.IsCompleted()) - { - - // Register the document operation with the current session - documentSession.Delete(reservation); - } - - else - { - - // Register the document operation with the current session - documentSession.Update(reservation); - } - - - // Save all pending changes to this Marten session - await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false); - - } - - } - - } - - // END: ReservationTimeoutHandler457905910 - - -} - diff --git a/src/Persistence/MartenTests/batch_querying_support.cs b/src/Persistence/MartenTests/batch_querying_support.cs new file mode 100644 index 000000000..27a4b2957 --- /dev/null +++ b/src/Persistence/MartenTests/batch_querying_support.cs @@ -0,0 +1,132 @@ +using IntegrationTests; +using JasperFx.Resources; +using Marten; +using MartenTests.AggregateHandlerWorkflow; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Persistence; + +namespace MartenTests; + +public class batch_querying_support : PostgresqlContext, IAsyncLifetime +{ + private IHost theHost; + + public async Task InitializeAsync() + { + theHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(o => + { + o.Connection(Servers.PostgresConnectionString); + o.DatabaseSchemaName = "batching"; + o.DisableNpgsqlLogging = true; + + }).UseLightweightSessions().IntegrateWithWolverine(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } + + [Fact] + public async Task try_batch_querying_end_to_end() + { + using var session = theHost.DocumentStore().LightweightSession(); + var doc1 = new Doc1(); + session.Store(doc1); + + var doc2 = new Doc2(); + session.Store(doc2); + + var doc3 = new Doc3{Id = Guid.NewGuid().ToString()}; + session.Store(doc3); + + await session.SaveChangesAsync(); + + await theHost.InvokeAsync(new DoStuffWithDocs(doc1.Id, doc2.Id, doc3.Id)); + } + + [Fact] + public async Task try_batch_querying_with_read_aggregate() + { + using var session = theHost.DocumentStore().LightweightSession(); + var doc1 = new Doc1(); + session.Store(doc1); + + var doc2 = new Doc2(); + session.Store(doc2); + + var streamId = Guid.NewGuid(); + session.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new DEvent()); + + await session.SaveChangesAsync(); + + await theHost.InvokeAsync(new ReadAggregateWithDocs(doc1.Id, doc2.Id, streamId)); + } +} + +public record DoStuffWithDocs(Guid Doc1Id, Guid Doc2Id, string Doc3Id); + +public static class DoStuffWithDocsHandler +{ + public static void Handle( + DoStuffWithDocs command, + [Entity] Doc1 doc1, + [Entity] Doc2 doc2, + [Entity] Doc3 doc3 + + ) + { + doc1.ShouldNotBeNull(); + doc2.ShouldNotBeNull(); + doc3.ShouldNotBeNull(); + } +} + +public record ReadAggregateWithDocs(Guid Doc1Id, Guid Doc2Id, Guid LetterAggregateId); + +public static class ReadAggregateWithDocsHandler +{ + public static void Handle( + ReadAggregateWithDocs message, + [Entity] Doc1 doc1, + [Entity] Doc2 doc2, + [ReadAggregate] LetterAggregate letters + ) + { + doc1.ShouldNotBeNull(); + doc2.ShouldNotBeNull(); + letters.ShouldNotBeNull(); + + letters.ACount.ShouldBe(1); + letters.BCount.ShouldBe(2); + } +} + +public class Doc1 +{ + public Guid Id { get; set; } + public string Name { get; set; } = "Somebody"; +} + +public class Doc2 +{ + public Guid Id { get; set; } + public string Name { get; set; } = "Somebody"; +} + +public class Doc3 +{ + public string Id { get; set; } + public string Name { get; set; } = "Somebody"; +} + diff --git a/src/Persistence/Wolverine.Marten/AggregateHandling.cs b/src/Persistence/Wolverine.Marten/AggregateHandling.cs index 7e55794d3..1c25410af 100644 --- a/src/Persistence/Wolverine.Marten/AggregateHandling.cs +++ b/src/Persistence/Wolverine.Marten/AggregateHandling.cs @@ -36,13 +36,15 @@ internal record AggregateHandling(IDataRequirement Requirement) public Variable Apply(IChain chain, IServiceContainer container) { Store(chain); - + new MartenPersistenceFrameProvider().ApplyTransactionSupport(chain, container); - var loader = GenerateLoadAggregateCode(chain); + var loader = new LoadAggregateFrame(this); + chain.Middleware.Add(loader); + var firstCall = chain.HandlerCalls().First(); - var eventStream = loader.ReturnVariable!; + var eventStream = loader.Stream!; if (Parameter != null) { eventStream.OverrideName("stream_" + Parameter.Name); @@ -87,19 +89,6 @@ public static bool TryLoad(IChain chain, out AggregateHandling handling) return false; } - public MethodCall GenerateLoadAggregateCode(IChain chain) - { - if (!chain.Middleware.OfType().Any()) - { - chain.Middleware.Add(new EventStoreFrame()); - } - - var loader = typeof(LoadAggregateFrame<>).CloseAndBuildAs(this, AggregateType!); - - chain.Middleware.Add(loader); - return loader; - } - internal static (MemberInfo, MemberInfo?) DetermineAggregateIdAndVersion(Type aggregateType, Type commandType, IServiceContainer container) { @@ -197,12 +186,14 @@ internal Variable RelayAggregateToHandlerMethod(Variable eventStream, IChain cha { Variable aggregateVariable = new MemberAccessVariable(eventStream, typeof(IEventStream<>).MakeGenericType(aggregateType).GetProperty(nameof(IEventStream.Aggregate))); + if (Requirement.Required) { var otherFrames = chain.AddStopConditionIfNull(aggregateVariable, AggregateId, Requirement); var block = new LoadEntityFrameBlock(aggregateVariable, otherFrames); + block.AlsoMirrorAsTheCreator(eventStream); chain.Middleware.Add(block); aggregateVariable = block.Mirror; diff --git a/src/Persistence/Wolverine.Marten/Codegen/LoadAggregateFrame.cs b/src/Persistence/Wolverine.Marten/Codegen/LoadAggregateFrame.cs index 0aa205279..726c2eccb 100644 --- a/src/Persistence/Wolverine.Marten/Codegen/LoadAggregateFrame.cs +++ b/src/Persistence/Wolverine.Marten/Codegen/LoadAggregateFrame.cs @@ -3,54 +3,111 @@ using JasperFx.CodeGeneration.Frames; using JasperFx.CodeGeneration.Model; using JasperFx.Core.Reflection; +using Marten; using Marten.Events; namespace Wolverine.Marten.Codegen; -internal class LoadAggregateFrame : MethodCall where T : class +internal class LoadAggregateFrame : AsyncFrame, IBatchableFrame { private readonly AggregateHandling _att; + private Variable? _session; + private Variable? _token; + private Variable? _batchQuery; + private Variable? _batchQueryItem; + private readonly Variable _identity; + private readonly Variable? _version; + private readonly Type _eventStreamType; + private readonly Variable _rawIdentity; - public LoadAggregateFrame(AggregateHandling att) : base(typeof(IEventStoreOperations), FindMethod(att)) + + public LoadAggregateFrame(AggregateHandling att) { _att = att; - CommentText = "Loading Marten aggregate"; + _identity = _att.AggregateId; - // Placeholder to keep the HTTP chains from trying to use QueryString - Arguments[0] = Constant.ForString("temp"); + if (_att is { LoadStyle: ConcurrencyStyle.Optimistic, Version: not null }) + { + _version = _att.Version; + } + + _eventStreamType = typeof(IEventStream<>).MakeGenericType(_att.AggregateType); + Stream = new Variable(_eventStreamType, this); + + _rawIdentity = _identity; + if (_rawIdentity.VariableType != typeof(Guid) && _rawIdentity.VariableType != typeof(string)) + { + var valueType = ValueTypeInfo.ForType(_rawIdentity.VariableType); + _rawIdentity = new MemberAccessVariable(_identity, valueType.ValueProperty); + } } + + public Variable Stream { get; } - public override IEnumerable FindVariables(IMethodVariables chain) + public void WriteCodeToEnlistInBatchQuery(GeneratedMethod method, ISourceWriter writer) { - Arguments[0] = _att.AggregateId; - if (_att is { LoadStyle: ConcurrencyStyle.Optimistic, Version: not null }) + if (_att.LoadStyle == ConcurrencyStyle.Exclusive) { - Arguments[1] = _att.Version; + writer.WriteLine($"var {_batchQueryItem.Usage} = {_batchQuery!.Usage}.Events.FetchForExclusiveWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage});"); } + else if (_version == null) + { + writer.WriteLine($"var {_batchQueryItem.Usage} = {_batchQuery!.Usage}.Events.FetchForWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage});"); + } + else + { + writer.WriteLine($"var {_batchQueryItem.Usage} = {_batchQuery!.Usage}.Events.FetchForWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage}, {_version.Usage});"); + } + } - foreach (var variable in base.FindVariables(chain)) yield return variable; + public void EnlistInBatchQuery(Variable batchQuery) + { + _batchQueryItem = new Variable(typeof(Task<>).MakeGenericType(_eventStreamType), Stream.Usage + "_BatchItem", + this); + _batchQuery = batchQuery; } - internal static MethodInfo FindMethod(AggregateHandling att) + public override IEnumerable FindVariables(IMethodVariables chain) { - var isGuidIdentified = att.AggregateId!.VariableType == typeof(Guid); + yield return _identity; + if (_version != null) yield return _version; + + _session = chain.FindVariable(typeof(IDocumentSession)); + yield return _session; + + _token = chain.FindVariable(typeof(CancellationToken)); + yield return _token; - if (att.LoadStyle == ConcurrencyStyle.Exclusive) + if (_batchQuery != null) { - return isGuidIdentified - ? ReflectionHelper.GetMethod(x => x.FetchForExclusiveWriting(Guid.Empty, default))! - : ReflectionHelper.GetMethod(x => x.FetchForExclusiveWriting(string.Empty, default))!; + yield return _batchQuery; } + } - if (att.Version == null) + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteComment("Loading Marten aggregate as part of the aggregate handler workflow"); + if (_batchQueryItem == null) { - return isGuidIdentified - ? ReflectionHelper.GetMethod(x => x.FetchForWriting(Guid.Empty, default))! - : ReflectionHelper.GetMethod(x => x.FetchForWriting(string.Empty, default))!; + if (_att.LoadStyle == ConcurrencyStyle.Exclusive) + { + writer.WriteLine($"var {Stream.Usage} = await {_session!.Usage}.Events.FetchForExclusiveWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage}, {_token.Usage});"); + } + else if (_version == null) + { + writer.WriteLine($"var {Stream.Usage} = await {_session!.Usage}.Events.FetchForWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage}, {_token.Usage});"); + } + else + { + writer.WriteLine($"var {Stream.Usage} = await {_session!.Usage}.Events.FetchForWriting<{_att.AggregateType.FullNameInCode()}>({_rawIdentity.Usage}, {_version.Usage}, {_token.Usage});"); + } } - - return isGuidIdentified - ? ReflectionHelper.GetMethod(x => x.FetchForWriting(Guid.Empty, long.MaxValue, default))! - : ReflectionHelper.GetMethod(x => x.FetchForWriting(string.Empty, long.MaxValue, default))!; + else + { + writer.Write( + $"var {Stream.Usage} = await {_batchQueryItem.Usage}.ConfigureAwait(false);"); + } + + Next?.GenerateCode(method, writer); } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Codegen/MartenQueryingFrame.cs b/src/Persistence/Wolverine.Marten/Codegen/MartenQueryingFrame.cs new file mode 100644 index 000000000..44ba754ef --- /dev/null +++ b/src/Persistence/Wolverine.Marten/Codegen/MartenQueryingFrame.cs @@ -0,0 +1,111 @@ +using JasperFx; +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using Marten; +using Marten.Services.BatchQuerying; +using Wolverine.Persistence; + +namespace Wolverine.Marten.Codegen; + +internal interface IBatchableFrame +{ + void WriteCodeToEnlistInBatchQuery(GeneratedMethod method, ISourceWriter writer); + void EnlistInBatchQuery(Variable batchQuery); +} + +internal class MartenBatchingPolicy : IMethodPreCompilationPolicy +{ + public void Apply(IGeneratedMethod method) + { + var (i, frames) = sortThroughFrames(method); + if (frames.Count <= 1) return; + + var batchFrame = new MartenBatchFrame(); + method.Frames.Insert(i, batchFrame); + + foreach (var frame in frames) + { + batchFrame.Enlist(frame); + } + } + + private static (int, IReadOnlyList frames) sortThroughFrames(IGeneratedMethod method) + { + var list = new List(); + + var index = -1; + for (int i = 0; i < method.Frames.Count; i++) + { + var frame = method.Frames[i]; + if (frame is LoadEntityFrameBlock block && block.Creator is IBatchableFrame b) + { + list.Add(b); + if (index == -1) + { + index = i; + } + } + else if (frame is IBatchableFrame batchable) + { + list.Add(batchable); + if (index == -1) + { + index = i; + } + } + } + + return (index, list); + } +} + +internal class MartenBatchFrame : AsyncFrame +{ + private Variable _session; + private Variable _cancellation; + + private List _operations = new(); + + public MartenBatchFrame() + { + BatchQuery = new Variable(typeof(IBatchedQuery), this); + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteLine($"var {BatchQuery.Usage} = {_session.Usage}.{nameof(IQuerySession.CreateBatchQuery)}();"); + foreach (var op in _operations) + { + writer.BlankLine(); + op.WriteCodeToEnlistInBatchQuery(method, writer); + } + + writer.BlankLine(); + + writer.WriteLine($"await {BatchQuery.Usage}.{nameof(IBatchedQuery.Execute)}({_cancellation.Usage});"); + + writer.BlankLine(); + + Next?.GenerateCode(method, writer); + } + + public Variable BatchQuery { get; } + + public void Enlist(IBatchableFrame frame) + { + if (_operations.Contains(frame)) return; + + frame.EnlistInBatchQuery(BatchQuery); + _operations.Add(frame); + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _session = chain.FindVariable(typeof(IDocumentSession)); + yield return _session; + + _cancellation = chain.FindVariable(typeof(CancellationToken)); + yield return _cancellation; + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Codegen/SessionVariableSource.cs b/src/Persistence/Wolverine.Marten/Codegen/SessionVariableSource.cs index d7da1d906..97ed65d29 100644 --- a/src/Persistence/Wolverine.Marten/Codegen/SessionVariableSource.cs +++ b/src/Persistence/Wolverine.Marten/Codegen/SessionVariableSource.cs @@ -1,7 +1,9 @@ using JasperFx.CodeGeneration; using JasperFx.CodeGeneration.Frames; using JasperFx.CodeGeneration.Model; +using JasperFx.Events; using Marten; +using Marten.Events; namespace Wolverine.Marten.Codegen; @@ -31,6 +33,7 @@ public Variable Create(Type type) } } + internal class DocumentOperationsFrame : SyncFrame { private Variable _session; @@ -53,4 +56,42 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) writer.Write($"{typeof(IDocumentOperations)} {Variable.Usage} = {_session.Usage};"); Next?.GenerateCode(method, writer); } +} + +internal class EventStoreOperationsSource : IVariableSource +{ + public bool Matches(Type type) + { + return type == typeof(IEventStoreOperations); + } + + public Variable Create(Type type) + { + return new EventStoreOperationsFrame().Variable; + } +} + + +internal class EventStoreOperationsFrame : SyncFrame +{ + private Variable _session; + + public EventStoreOperationsFrame() + { + Variable = new Variable(typeof(IEventStoreOperations), this); + } + + public Variable Variable { get; } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _session = chain.FindVariable(typeof(IDocumentSession)); + yield return _session; + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.Write($"{typeof(IEventStoreOperations)} {Variable.Usage} = {_session.Usage}.{nameof(IDocumentSession.Events)};"); + Next?.GenerateCode(method, writer); + } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/MartenIntegration.cs b/src/Persistence/Wolverine.Marten/MartenIntegration.cs index 7991e83b3..ca36f97f4 100644 --- a/src/Persistence/Wolverine.Marten/MartenIntegration.cs +++ b/src/Persistence/Wolverine.Marten/MartenIntegration.cs @@ -45,8 +45,11 @@ public void Configure(WolverineOptions options) options.CodeGeneration.InsertFirstPersistenceStrategy(); options.CodeGeneration.Sources.Add(new SessionVariableSource()); options.CodeGeneration.Sources.Add(new DocumentOperationsSource()); + options.CodeGeneration.Sources.Add(new EventStoreOperationsSource()); options.Policies.Add(); + + options.CodeGeneration.MethodPreCompilation.Add(new MartenBatchingPolicy()); options.Discovery.CustomizeHandlerDiscovery(x => { diff --git a/src/Persistence/Wolverine.Marten/Persistence/Sagas/LoadDocumentFrame.cs b/src/Persistence/Wolverine.Marten/Persistence/Sagas/LoadDocumentFrame.cs index ea829ba85..8914b2187 100644 --- a/src/Persistence/Wolverine.Marten/Persistence/Sagas/LoadDocumentFrame.cs +++ b/src/Persistence/Wolverine.Marten/Persistence/Sagas/LoadDocumentFrame.cs @@ -4,16 +4,19 @@ using JasperFx.Core.Reflection; using Marten; using Marten.Metadata; +using Wolverine.Marten.Codegen; namespace Wolverine.Marten.Persistence.Sagas; -internal class LoadDocumentFrame : AsyncFrame +internal class LoadDocumentFrame : AsyncFrame, IBatchableFrame { public const string ExpectedSagaRevision = "expectedSagaRevision"; private readonly Variable _sagaId; private Variable? _cancellation; private Variable? _session; + private Variable? _batchQuery; + private Variable? _batchQueryItem; public LoadDocumentFrame(Type sagaType, Variable sagaId) { @@ -23,6 +26,22 @@ public LoadDocumentFrame(Type sagaType, Variable sagaId) Saga = new Variable(sagaType, this); } + public void WriteCodeToEnlistInBatchQuery(GeneratedMethod method, ISourceWriter writer) + { + if (_batchQueryItem == null) + throw new InvalidOperationException("This frame has not been enlisted in a MartenBatchFrame"); + + writer.Write( + $"var {_batchQueryItem.Usage} = {_batchQuery!.Usage}.Load<{Saga.VariableType.FullNameInCode()}>({_sagaId.Usage});"); + } + + public void EnlistInBatchQuery(Variable batchQuery) + { + _batchQuery = batchQuery; + _batchQueryItem = new Variable(typeof(Task<>).MakeGenericType(Saga.VariableType), Saga.Usage + "_BatchItem", + this); + } + public Variable Saga { get; } @@ -33,15 +52,29 @@ public override IEnumerable FindVariables(IMethodVariables chain) _cancellation = chain.FindVariable(typeof(CancellationToken)); yield return _cancellation; + + if (_batchQuery != null) + { + yield return _batchQuery; + } } public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) { writer.WriteLine(""); writer.WriteComment("Try to load the existing saga document"); - writer.Write( - $"var {Saga.Usage} = await {_session!.Usage}.LoadAsync<{Saga.VariableType.FullNameInCode()}>({_sagaId.Usage}, {_cancellation!.Usage}).ConfigureAwait(false);"); - if (Saga.VariableType.CanBeCastTo()) + if (_batchQueryItem == null) + { + writer.Write( + $"var {Saga.Usage} = await {_session!.Usage}.LoadAsync<{Saga.VariableType.FullNameInCode()}>({_sagaId.Usage}, {_cancellation!.Usage}).ConfigureAwait(false);"); + } + else + { + writer.Write( + $"var {Saga.Usage} = await {_batchQueryItem.Usage}.ConfigureAwait(false);"); + } + + if (Saga.VariableType.CanBeCastTo() && Saga.VariableType.CanBeCastTo()) { writer.WriteComment($"{Saga.VariableType.FullNameInCode()} implements {typeof(IRevisioned).FullNameInCode()}, so Wolverine will try to update based on the revision as a concurrency protection"); writer.Write($"var {ExpectedSagaRevision} = 0;"); diff --git a/src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs b/src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs index 117d84bfb..8bb7480f7 100644 --- a/src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs +++ b/src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs @@ -6,8 +6,10 @@ using JasperFx.Core.Reflection; using Marten; using Marten.Events; +using Marten.Services.BatchQuerying; using Wolverine.Attributes; using Wolverine.Configuration; +using Wolverine.Marten.Codegen; using Wolverine.Marten.Persistence.Sagas; using Wolverine.Persistence; using Wolverine.Runtime; @@ -66,11 +68,13 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC } } -internal class FetchLatestAggregateFrame : AsyncFrame +internal class FetchLatestAggregateFrame : AsyncFrame, IBatchableFrame { private readonly Variable _identity; private Variable _session; private Variable _token; + private Variable _batchQuery; + private Variable _batchQueryItem; public FetchLatestAggregateFrame(Type aggregateType, Variable identity) { @@ -80,6 +84,22 @@ public FetchLatestAggregateFrame(Type aggregateType, Variable identity) public Variable Aggregate { get; } + public void WriteCodeToEnlistInBatchQuery(GeneratedMethod method, ISourceWriter writer) + { + if (_batchQueryItem == null) + throw new InvalidOperationException("This frame has not been enlisted in a MartenBatchFrame"); + + writer.Write( + $"var {_batchQueryItem.Usage} = {_batchQuery!.Usage}.Events.{nameof(IBatchEvents.FetchLatest)}<{Aggregate.VariableType.FullNameInCode()}>({_identity.Usage});"); + } + + public void EnlistInBatchQuery(Variable batchQuery) + { + _batchQueryItem = new Variable(typeof(Task<>).MakeGenericType(Aggregate.VariableType), Aggregate.Usage + "_BatchItem", + this); + _batchQuery = batchQuery; + } + public override IEnumerable FindVariables(IMethodVariables chain) { _session = chain.FindVariable(typeof(IDocumentSession)); @@ -88,12 +108,26 @@ public override IEnumerable FindVariables(IMethodVariables chain) _token = chain.FindVariable(typeof(CancellationToken)); yield return _token; + if (_batchQuery != null) + { + yield return _batchQuery; + } + yield return _identity; } public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) { - writer.Write($"var {Aggregate.Usage} = await {_session.Usage}.Events.{nameof(IEventStoreOperations.FetchLatest)}<{Aggregate.VariableType.FullNameInCode()}>({_identity.Usage}, {_token.Usage});"); + if (_batchQueryItem == null) + { + writer.Write($"var {Aggregate.Usage} = await {_session.Usage}.Events.{nameof(IEventStoreOperations.FetchLatest)}<{Aggregate.VariableType.FullNameInCode()}>({_identity.Usage}, {_token.Usage});"); + } + else + { + writer.Write( + $"var {Aggregate.Usage} = await {_batchQueryItem.Usage}.ConfigureAwait(false);"); + } + Next?.GenerateCode(method, writer); } } \ No newline at end of file diff --git a/src/Wolverine/Persistence/EntityAttribute.cs b/src/Wolverine/Persistence/EntityAttribute.cs index 802b07b99..56f0b75cf 100644 --- a/src/Wolverine/Persistence/EntityAttribute.cs +++ b/src/Wolverine/Persistence/EntityAttribute.cs @@ -19,15 +19,22 @@ namespace Wolverine.Persistence; public class LoadEntityFrameBlock : Frame { private readonly Frame[] _guardFrames; - private readonly Frame _creator; public LoadEntityFrameBlock(Variable entity, params Frame[] guardFrames) : base(entity.Creator.IsAsync || guardFrames.Any(x => x.IsAsync)) { _guardFrames = guardFrames; Mirror = new Variable(entity.VariableType, entity.Usage, this); - _creator = entity.Creator; + Creator = entity.Creator; } + public void AlsoMirrorAsTheCreator(Variable variable) + { + // Seems goofy, but adds it to the creates + new Variable(variable.VariableType, variable.Usage, this); + } + + public Frame Creator { get; } + public Variable Mirror { get; } public override IEnumerable Creates => [Mirror]; @@ -35,7 +42,7 @@ public LoadEntityFrameBlock(Variable entity, params Frame[] guardFrames) : base( public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) { // The [WriteAggregate] somehow causes this - if (_creator.Next == this || _creator.Next != null) + if (Creator.Next == this || Creator.Next != null) { for (int i = 1; i < _guardFrames.Length; i++) { @@ -46,14 +53,14 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) } else { - var previous = _creator; + var previous = Creator; foreach (var next in _guardFrames) { previous.Next = next; previous = next; } - _creator.GenerateCode(method, writer); + Creator.GenerateCode(method, writer); } Next?.GenerateCode(method, writer); @@ -61,7 +68,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) public override IEnumerable FindVariables(IMethodVariables chain) { - return _creator + return Creator .FindVariables(chain) .Concat(_guardFrames.SelectMany(x => x.FindVariables(chain))).Distinct(); } @@ -70,7 +77,7 @@ public override bool CanReturnTask() { if (_guardFrames.Any()) return _guardFrames.Last().CanReturnTask(); - return _creator.CanReturnTask(); + return Creator.CanReturnTask(); } } diff --git a/src/Wolverine/Wolverine.csproj b/src/Wolverine/Wolverine.csproj index 062f29b2b..3fb299d8c 100644 --- a/src/Wolverine/Wolverine.csproj +++ b/src/Wolverine/Wolverine.csproj @@ -4,7 +4,7 @@ WolverineFx - +